Лучшие примеры параллельной обработки в Python

Надеюсь, на этот раз меня не минусуют. Я некоторое время боролся с параллельной обработкой в ​​Python (точно 2 дня). Я проверяю эти ресурсы (неполный список показан здесь:

(a) http://eli.thegreenplace.net/2013/01/16/python-parallizing-cpu-bound-tasks-with-concurrent-futures

(б) https://pythonadventures.wordpress.com/tag/processpoolexecutor/

Я вышел из строя. Я хочу сделать следующее:

Мастер:

Break up the file into chunks(strings or numbers)
Broadcast a pattern to be searched to all the workers
Receive the offsets in the file where the pattern was found

Рабочие:

Receive pattern and chunk of text from the master
Compute()
Send back the offsets to the master.

Я попытался реализовать это с помощью MPI/concurrent.futures/multiprocessing и не застрял.

Моя наивная реализация с использованием многопроцессорного модуля

import multiprocessing

filename = "file1.txt"
pat = "afow"
N = 1000

""" This is the naive string search algorithm"""

def search(pat, txt):

    patLen = len(pat)
    txtLen = len(txt)
    offsets = []

    # A loop to slide pattern[] one by one
    # Range generates numbers up to but not including that number
    for i in range ((txtLen - patLen) + 1):

    # Can not use a for loop here
    # For loops in C with && statements must be
    # converted to while statements in python
        counter = 0
        while(counter < patLen) and pat[counter] == txt[counter + i]:
           counter += 1
           if counter >= patLen:
               offsets.append(i)
        return str(offsets).strip('[]')

       """"
       This is what I want 
if __name__ == "__main__":
     tasks = []
     pool_outputs = []
     pool = multiprocessing.Pool(processes=5)
     with open(filename, 'r') as infile:
           lines = []
           for line in infile:
                lines.append(line.rstrip())
                if len(lines) > N:
                     pool_output = pool.map(search, tasks)
                     pool_outputs.append(pool_output)
                     lines = []
                if len(lines) > 0:
                     pool_output = pool.map(search, tasks)
                     pool_outputs.append(pool_output)
     pool.close()
     pool.join()
     print('Pool:', pool_outputs)
         """""

with open(filename, 'r') as infile:
    for line in infile:
        print(search(pat, line))

Я был бы признателен за любое руководство, особенно с concurrent.futures. Спасибо за ваше время. Валерий помог мне с добавлением, за что ему большое спасибо.

Но если кто-то может просто побаловать меня на мгновение, это код, над которым я работал для concurrent.futures (работа над примером, который я где-то видел)

from concurrent.futures import ProcessPoolExecutor, as_completed
import math

def search(pat, txt):

    patLen = len(pat)
    txtLen = len(txt)
    offsets = []

# A loop to slide pattern[] one by one
# Range generates numbers up to but not including that number
    for i in range ((txtLen - patLen) + 1):

    # Can not use a for loop here
    # For loops in C with && statements must be
    # converted to while statements in python
        counter = 0
        while(counter < patLen) and pat[counter] == txt[counter + i]:
            counter += 1
            if counter >= patLen:
                offsets.append(i)
return str(offsets).strip('[]')

#Check a list of strings
def chunked_worker(lines):
    return {0: search("fmo", line) for line in lines}


def pool_bruteforce(filename, nprocs):
    lines = []
    with open(filename) as f:
        lines = [line.rstrip('\n') for line in f]
    chunksize = int(math.ceil(len(lines) / float(nprocs)))
    futures = []

    with ProcessPoolExecutor() as executor:
        for i in range(nprocs):
            chunk = lines[(chunksize * i): (chunksize * (i + 1))]
            futures.append(executor.submit(chunked_worker, chunk))

    resultdict = {}
    for f in as_completed(futures):
        resultdict.update(f.result())
    return resultdict


filename = "file1.txt"
pool_bruteforce(filename, 5)

Еще раз спасибо Валерию и всем, кто попытается помочь мне разгадать мою загадку.


person corax    schedule 25.04.2016    source источник


Ответы (1)


Вы используете несколько аргументов, поэтому:

import multiprocessing
from functools import partial
filename = "file1.txt"
pat = "afow"
N = 1000

""" This is the naive string search algorithm"""

def search(pat, txt):
    patLen = len(pat)
    txtLen = len(txt)
    offsets = []

    # A loop to slide pattern[] one by one
    # Range generates numbers up to but not including that number
    for i in range ((txtLen - patLen) + 1):

    # Can not use a for loop here
    # For loops in C with && statements must be
    # converted to while statements in python
        counter = 0
        while(counter < patLen) and pat[counter] == txt[counter + i]:
           counter += 1
           if counter >= patLen:
               offsets.append(i)
        return str(offsets).strip('[]')


if __name__ == "__main__":
     tasks = []
     pool_outputs = []
     pool = multiprocessing.Pool(processes=5)
     lines = []
     with open(filename, 'r') as infile:
         for line in infile:
             lines.append(line.rstrip())                 
     tasks = lines
     func = partial(search, pat)
     if len(lines) > N:
        pool_output = pool.map(func, lines )
        pool_outputs.append(pool_output)     
     elif len(lines) > 0:
        pool_output = pool.map(func, lines )
        pool_outputs.append(pool_output)
     pool.close()
     pool.join()
     print('Pool:', pool_outputs)
person Valeriy Solovyov    schedule 25.04.2016
comment
Валерий: Спасибо. Что делает партиал в любом случае? Знаете ли вы какие-либо ресурсы, которые полностью посвящены параллельной обработке в python? Спасибо еще раз. - person corax; 25.04.2016
comment
Валерий: Я это читал и не мог толком понять. Извините, но я имел в виду как правильный пример в функции. Спасибо. - person corax; 25.04.2016