Можно ли использовать concurrent.futures для выполнения функции/метода внутри класса tkinter после события? Если да, то как?

Я пытаюсь использовать пул рабочих, предоставленный concurrent.futures.ProcessPoolExecutor, чтобы ускорить работу метода внутри класса tkinter. Это связано с тем, что выполнение метода интенсивно использует процессор, а «распараллеливание» должно сократить время его выполнения. Я надеюсь сравнить его производительность с контролем - последовательное выполнение того же метода. Я написал тестовый код tkinter GUI для выполнения этого теста. Последовательное выполнение метода работает, но параллельная часть не работает. Ценю любую помощь, чтобы заставить параллельную часть моего кода работать.

Обновление: я убедился, что правильно реализовал concurrent.futures.ProcessPoolExecutor для решения моей проблемы вне Tk(), то есть из стандартного скрипта python3. Это объясняется в этом ответе. Теперь я хочу реализовать параллельный метод, описанный в этом ответе, для работы с кнопкой в ​​моем графическом интерфейсе tkinter.Tk().

Мой тестовый код приведен ниже. Когда вы запустите его, появится графический интерфейс. Когда вы нажимаете кнопку «НАЙТИ», функция _findmatch будет выполняться последовательно и параллельно, чтобы найти, сколько раз число 5 встречается в диапазоне чисел от 0 до 1E8. Последовательная часть работает, но параллельная часть жалуется (см. ниже). Кто-нибудь знает, как исправить эту ошибку Pickling?

Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
    obj = ForkingPickler.dumps(obj)
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '_tkinter.tkapp'>: attribute lookup tkapp on _tkinter failed

Тестовый код:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import tkinter as tk # Python 3 tkinter modules
import tkinter.ttk as ttk
import concurrent.futures as cf
from time import time, sleep
from itertools import repeat, chain 

class App(ttk.Frame):
    def __init__(self, parent):
        # Initialise App Frame
        ttk.Frame.__init__(self, parent, style='App.TFrame')
        self.parent=parent

        self.button = ttk.Button(self, style='start.TButton', text = 'FIND',
                                 command=self._check)
        self.label0 = ttk.Label(self, foreground='blue')
        self.label1 = ttk.Label(self, foreground='red')
        self.label2 = ttk.Label(self, foreground='green')
        self._labels()
        self.button.grid(row=0, column=1, rowspan=3, sticky='nsew')
        self.label0.grid(row=0, column=0, sticky='nsew')
        self.label1.grid(row=1, column=0, sticky='nsew')
        self.label2.grid(row=2, column=0, sticky='nsew')

    def _labels(self):
        self.label0.configure(text='Click "FIND" to see how many times the number 5 appears.')
        self.label1.configure(text='Serial Method:')
        self.label2.configure(text='Concurrent Method:')

    def _check(self):
        # Initialisation
        self._labels()
        nmax = int(1E7)
        smatch=[]
        cmatch=[]
        number = '5'
        self.label0.configure(
            text='Finding the number of times {0} appears in 0 to {1}'.format(
                number, nmax))
        self.parent.update_idletasks()

        # Run serial code
        start = time()
        smatch = self._findmatch(0, nmax, number)
        end = time() - start
        self.label1.configure(
            text='Serial: Found {0} occurances,  Time to Find: {1:.6f}sec'.format(
                len(smatch), end))

        # Run serial code concurrently with concurrent.futures
        workers = 6     # Pool of workers
        chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
        num_of_chunks = chunks_vs_workers * workers
        start = time()
        cmatch = self._concurrent_map(nmax, number, workers, num_of_chunks)
        end = time() - start
        self.label2.configure(
            text='Concurrent: Found {0} occurances,  Time to Find: {1:.6f}sec'.format(
                len(cmatch), end))

    def _findmatch(self, nmin, nmax, number):
        '''Function to find the occurence of number in range nmin to nmax and return
           the found occurences in a list.'''
        start = time()
        match=[]
        for n in range(nmin, nmax):
            if number in str(n): match.append(n)
        end = time() - start
        #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
        #      format(nmin, nmax, number, len(match),end))
        return match

    def _concurrent_map(self, nmax, number, workers, num_of_chunks):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
           find the occurrences of a given number in a number range in a concurrent
           manner.'''
        # 1. Local variables
        start = time()
        chunksize = nmax // num_of_chunks
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            cstart = (chunksize * i for i in range(num_of_chunks))
            cstop = (chunksize * i if i != num_of_chunks else nmax
                     for i in range(1, num_of_chunks + 1))
            futures = executor.map(self._findmatch, cstart, cstop, repeat(number))
        end = time() - start
        print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
        print("found in {0:.4f}sec".format(end))
        return list(chain.from_iterable(futures))


if __name__ == '__main__':
    root = tk.Tk()
    root.title('App'), root.geometry('550x60')
    app = App(root)
    app.grid(row=0, column=0, sticky='nsew')

    root.rowconfigure(0, weight=1)
    root.columnconfigure(0, weight=1)
    app.columnconfigure(0, weight=1)

    app.mainloop()

person Sun Bear    schedule 01.02.2017    source источник
comment
под производительностью виджета вы на самом деле подразумеваете производительность блока кода, который запускается при нажатии на виджет?   -  person Bryan Oakley    schedule 02.02.2017
comment
@BryanOakley Мне нравится сравнивать время, затраченное self._serial и self._concurrent на поиск совпадения. self._serial служит контролем. Я думаю, что хорошо написанный код сначала попытается реализовать в self._check 2 отдельных потока, чтобы запустить оба этих метода параллельно. Во-вторых, self._concurrent должен содержать cmds, чтобы использовать ProcessPoolExecutor для поиска совпадений и вывода результатов в label2 графического интерфейса. Я пытаюсь добиться этого, но пока не нашел способа сделать это. Я читал из другого поста, что параллельные cmds должны выполняться в основном, поэтому я сослался на self.parent .   -  person Sun Bear    schedule 02.02.2017
comment
@BryanOakley Мой ответ на ваш вопрос - да. Возможно ли то, что я хочу сделать? Если да, то как мне это сделать? Ценю вашу помощь.   -  person Sun Bear    schedule 03.02.2017
comment
Я уверен, что это возможно, но я никогда не использовал одновременный доступ, поэтому я не могу помочь.   -  person Bryan Oakley    schedule 03.02.2017
comment
@BryanOakley У вас есть примеры или ссылки, показывающие, как multiprocessing используется для моей цели? Я спрашиваю, потому что читал, что concurrent.futures — это реализация API более высокого уровня для multiprocessing и threading. Если это так, я думаю, что способ их использования для моего сценария должен применяться к concurrent.futures.   -  person Sun Bear    schedule 03.02.2017


Ответы (1)


Наконец-то я нашел способ ответить на свой вопрос.

В книге Марка Саммерфилдса «Практика Python» (2014) упоминается, что модуль multiprocessing, вызываемый concurrent.futures.ProcessPoolExecutor, может вызывать только функции, которые можно импортировать, и использовать данные модулей (вызываемые функциями), которые можно выбрать. Таким образом, необходимо, чтобы concurrent.futures.ProcessPoolExecutor и функции (с его аргументом), которые он вызывал, находились в отдельном модуле, чем модуль графического интерфейса tkinter, иначе он не работал бы.

Таким образом, я создал отдельный класс для размещения всех кодов, связанных с concurrent.futures.ProcessPoolExecutor, а также функций и данных, которые он вызывает, вместо того, чтобы помещать их в приложение класса, мой класс графического интерфейса tkinter.Tk(), как я делал ранее. Это сработало!

Мне также удалось использовать threading.Threads для одновременного выполнения моих последовательных и параллельных задач.

Я делюсь своим исправленным тестовым кодом ниже, чтобы продемонстрировать, как я это сделал, и надеюсь, что это поможет любому, кто пытается использовать concurrent.futures с tkinter.

Очень приятно видеть, как все процессоры работают с графическим интерфейсом Tk. :)

Пересмотренный тестовый код:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
''' Code to demonstrate how to use concurrent.futures.Executor object with tkinter.'''

import tkinter as tk # Python 3 tkinter modules
import tkinter.ttk as ttk
import concurrent.futures as cf
import threading
from time import time, sleep
from itertools import chain 


class App(ttk.Frame):
    def __init__(self, parent):
        # Initialise App Frame
        ttk.Frame.__init__(self, parent)
        self.parent=parent

        self.button = ttk.Button(self, text = 'FIND', command=self._check)
        self.label0 = ttk.Label(self, foreground='blue')
        self.label1 = ttk.Label(self, foreground='red')
        self.label2 = ttk.Label(self, foreground='green')
        self._labels()
        self.button.grid(row=0, column=1, rowspan=3, sticky='nsew')
        self.label0.grid(row=0, column=0, sticky='nsew')
        self.label1.grid(row=1, column=0, sticky='nsew')
        self.label2.grid(row=2, column=0, sticky='nsew')

    def _labels(self):
        self.label0.configure(text='Click "FIND" to see how many times the number 5 appears.')
        self.label1.configure(text='Serial Method:')
        self.label2.configure(text='Concurrent Method:')

    def _check(self):
        # Initialisation
        self._labels()
        nmax = int(1E8)
        workers = 6     # Pool of workers
        chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
        num_of_chunks = chunks_vs_workers * workers
        number = '5'
        self.label0.configure(
            text='Finding the number of times {0} appears in 0 to {1}'.format(
                number, nmax))
        self.parent.update_idletasks()
        # Concurrent management of serial and concurrent tasks using threading
        self.serworker = threading.Thread(target=self._serial,
                                          args=(0, nmax, number))
        self.subworker  = threading.Thread(target=self._concurrent,
                                           args=(nmax, number, workers,
                                                 num_of_chunks))
        self.serworker.start()         
        self.subworker.start()         

    def _serial(self, nmin, nmax, number):
        fm = Findmatch
        # Run serial code
        start = time()
        smatch = fm._findmatch(fm, 0, nmax, number)
        end = time() - start
        self.label1.configure(
            text='Serial Method: {0} occurrences, Compute Time: {1:.6f}sec'.format(
                len(smatch), end))
        self.parent.update_idletasks()
        #print('smatch = ', smatch) 

    def _concurrent(self, nmax, number, workers, num_of_chunks): 
        fm = Findmatch
        # Run serial code concurrently with concurrent.futures .submit()
        start = time()
        cmatch = fm._concurrent_submit(fm, nmax, number, workers,
                                        num_of_chunks)
        end = time() - start
        self.label2.configure(
            text='Concurrent Method: {0} occurrences, Compute Time: {1:.6f}sec'.format(
                len(cmatch), end))
        self.parent.update_idletasks()
        #print('cmatch = ', cmatch) 


class Findmatch:
    ''' A class specially created to host concurrent.futures.ProcessPoolExecutor
        so that the function(s) it calls can be accessible by multiprocessing
        module. Multiprocessing requirements: codes must be importable and code
        data must be pickerable. ref. Python in Practice, by Mark Summerfields,
        section 4.3.2, pg 173, 2014'''
    def __init__(self):
        self.__init__(self)

    def _findmatch(self, nmin, nmax, number):
        '''Function to find the occurence of number in range nmin to nmax and return
           the found occurences in a list.'''
        start = time()
        match=[]
        for n in range(nmin, nmax):
            if number in str(n): match.append(n)
        end = time() - start
        #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
        #      format(nmin, nmax, number, len(match),end))
        return match

    def _concurrent_submit(self, nmax, number, workers, num_of_chunks):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
           find the occurrences of a given number in a number range in a concurrent
           manner.'''
        # 1. Local variables
        start = time()
        chunksize = nmax // num_of_chunks
        self.futures = []
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            for i in range(num_of_chunks):
                cstart = chunksize * i
                cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
                self.futures.append(executor.submit(
                    self._findmatch, self, cstart, cstop, number))
        end = time() - start
        print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
        print("found in {0:.4f}sec".format(end))
        return list(chain.from_iterable(f.result() for f in cf.as_completed(
            self.futures)))


if __name__ == '__main__':
    root = tk.Tk()
    root.title('App'), root.geometry('550x60')
    app = App(root)
    app.grid(row=0, column=0, sticky='nsew')

    root.rowconfigure(0, weight=1)
    root.columnconfigure(0, weight=1)
    app.columnconfigure(0, weight=1)

    app.mainloop()
person Sun Bear    schedule 14.02.2017
comment
В stackoverflow есть десятки ответов на вопросы о многопроцессорности и concurrent.futures. Это лучший ответ и единственный, который сработал для меня после проверки не менее 30 различных вопросов о стеке. Ты жжешь! Спасибо - person Damiox; 14.04.2020