Лучшие примеры параллельной обработки в Python

Надеюсь, на этот раз меня не минусуют. Я некоторое время боролся с параллельной обработкой в ​​Python (точно 2 дня). Я проверяю эти ресурсы (неполный список показан здесь:

(a) http://eli.thegreenplace.net/2013/01/16/python-parallizing-cpu-bound-tasks-with-concurrent-futures

(б) https://pythonadventures.wordpress.com/tag/processpoolexecutor/

Я вышел из строя. Я хочу сделать следующее:


Break up the file into chunks(strings or numbers)
Broadcast a pattern to be searched to all the workers
Receive the offsets in the file where the pattern was found


Receive pattern and chunk of text from the master
Send back the offsets to the master.

Я попытался реализовать это с помощью MPI/concurrent.futures/multiprocessing и не застрял.

Моя наивная реализация с использованием многопроцессорного модуля

import multiprocessing

filename = "file1.txt"
pat = "afow"
N = 1000

""" This is the naive string search algorithm"""

def search(pat, txt):

    patLen = len(pat)
    txtLen = len(txt)
    offsets = []

    # A loop to slide pattern[] one by one
    # Range generates numbers up to but not including that number
    for i in range ((txtLen - patLen) + 1):

    # Can not use a for loop here
    # For loops in C with && statements must be
    # converted to while statements in python
        counter = 0
        while(counter < patLen) and pat[counter] == txt[counter + i]:
           counter += 1
           if counter >= patLen:
        return str(offsets).strip('[]')

       This is what I want 
if __name__ == "__main__":
     tasks = []
     pool_outputs = []
     pool = multiprocessing.Pool(processes=5)
     with open(filename, 'r') as infile:
           lines = []
           for line in infile:
                if len(lines) > N:
                     pool_output = pool.map(search, tasks)
                     lines = []
                if len(lines) > 0:
                     pool_output = pool.map(search, tasks)
     print('Pool:', pool_outputs)

with open(filename, 'r') as infile:
    for line in infile:
        print(search(pat, line))

Я был бы признателен за любое руководство, особенно с concurrent.futures. Спасибо за ваше время. Валерий помог мне с добавлением, за что ему большое спасибо.

Но если кто-то может просто побаловать меня на мгновение, это код, над которым я работал для concurrent.futures (работа над примером, который я где-то видел)

from concurrent.futures import ProcessPoolExecutor, as_completed
import math

def search(pat, txt):

    patLen = len(pat)
    txtLen = len(txt)
    offsets = []

# A loop to slide pattern[] one by one
# Range generates numbers up to but not including that number
    for i in range ((txtLen - patLen) + 1):

    # Can not use a for loop here
    # For loops in C with && statements must be
    # converted to while statements in python
        counter = 0
        while(counter < patLen) and pat[counter] == txt[counter + i]:
            counter += 1
            if counter >= patLen:
return str(offsets).strip('[]')

#Check a list of strings
def chunked_worker(lines):
    return {0: search("fmo", line) for line in lines}

def pool_bruteforce(filename, nprocs):
    lines = []
    with open(filename) as f:
        lines = [line.rstrip('\n') for line in f]
    chunksize = int(math.ceil(len(lines) / float(nprocs)))
    futures = []

    with ProcessPoolExecutor() as executor:
        for i in range(nprocs):
            chunk = lines[(chunksize * i): (chunksize * (i + 1))]
            futures.append(executor.submit(chunked_worker, chunk))

    resultdict = {}
    for f in as_completed(futures):
    return resultdict

filename = "file1.txt"
pool_bruteforce(filename, 5)

Еще раз спасибо Валерию и всем, кто попытается помочь мне разгадать мою загадку.

person corax    schedule 25.04.2016    source источник

Ответы (1)

Вы используете несколько аргументов, поэтому:

import multiprocessing
from functools import partial
filename = "file1.txt"
pat = "afow"
N = 1000

""" This is the naive string search algorithm"""

def search(pat, txt):
    patLen = len(pat)
    txtLen = len(txt)
    offsets = []

    # A loop to slide pattern[] one by one
    # Range generates numbers up to but not including that number
    for i in range ((txtLen - patLen) + 1):

    # Can not use a for loop here
    # For loops in C with && statements must be
    # converted to while statements in python
        counter = 0
        while(counter < patLen) and pat[counter] == txt[counter + i]:
           counter += 1
           if counter >= patLen:
        return str(offsets).strip('[]')

if __name__ == "__main__":
     tasks = []
     pool_outputs = []
     pool = multiprocessing.Pool(processes=5)
     lines = []
     with open(filename, 'r') as infile:
         for line in infile:
     tasks = lines
     func = partial(search, pat)
     if len(lines) > N:
        pool_output = pool.map(func, lines )
     elif len(lines) > 0:
        pool_output = pool.map(func, lines )
     print('Pool:', pool_outputs)
person Valeriy Solovyov    schedule 25.04.2016
Валерий: Спасибо. Что делает партиал в любом случае? Знаете ли вы какие-либо ресурсы, которые полностью посвящены параллельной обработке в python? Спасибо еще раз. - person corax; 25.04.2016
Валерий: Я это читал и не мог толком понять. Извините, но я имел в виду как правильный пример в функции. Спасибо. - person corax; 25.04.2016