С++ ThreadPool не работает параллельно

Я пытался реализовать ThreadPool, но, к сожалению, столкнулся с некоторыми проблемами.

Это то, что у меня уже есть.

//includes ...

void call()
{
    std::cout << "Hi i'm thread no " << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "ready " << std::this_thread::get_id() << std::endl;
};

//Implementation is not shown here to reduce code
class WorkQueue {
    public:
        bool push(std::function<void()> const& value);
        void pop();
        bool empty();
};

std::condition_variable g_queuecheck;
std::mutex              g_lockqueue;
std::atomic<bool>       g_notified;

class ThreadPool
{
    public:
        ThreadPool(int iNoThread) :
            m_noThread(iNoThread)
        {
            g_notified.store(false);
            m_threads.resize(iNoThread);
            bIsReady.store(false);
            for (int i = 0; i < iNoThread; ++i)
                m_threads[i] = std::thread(&ThreadPool::run, this);
        }

        void run()
        {
            while (!bIsReady || !m_workQueue.empty())
            {
                std::unique_lock<std::mutex> locker(g_lockqueue);
                if (m_workQueue.empty())
                {
                    while (!g_notified) // Used to avoid spurious wakeups
                    {
                        g_queuecheck.wait(locker);
                    }
                    if(!bIsReady)
                        g_notified.store(false);
                }

                m_workQueue.pop();
            }
        };

        void addWork(std::function<void()> func)
        {
            m_workQueue.push(func);
            g_notified.store(true);
            g_queuecheck.notify_one();
        }

        void join()
        {
            bIsReady.store(true);
            g_notified.store(true);
            g_queuecheck.notify_all();

            for (int i = 0; i < m_noThread; ++i)
                m_threads[i].join();
        }

        ~ThreadPool()
        {}


        WorkQueue m_workQueue;
        int m_noThread;
        std::vector<std::thread> m_threads;
        std::atomic<bool> bIsReady;
};

int _tmain(int argc, _TCHAR* argv[])
{
    {
        ThreadPool pool(4);

        for (int i = 0; i < 8; ++i)
            pool.addWork(call); //This work is done sequentially

        pool.join();
    }

    std::cin.ignore();
    return 0;
}

Моя проблема в том, что работа выполняется последовательно.

  1. Как я могу это исправить?
  2. Что-то еще не так с моим ThreadPool?
  3. Является ли ожидание лучшей практикой?

person user1235183    schedule 05.08.2015    source источник
comment
где ваша функция на самом деле вызывается в run(), все, что я вижу, это только всплывающая очередь работы.   -  person marcinj    schedule 05.08.2015
comment
Кажется, что фактическое выполнение обратного вызова выполняется в WorkQueue::pop (похоже, нет другого подходящего места). Если это так, обратите внимание, что это происходит, пока удерживается мьютекс g_lockqueue.   -  person Igor Tandetnik    schedule 05.08.2015
comment
Вы на самом деле не делаете никакой работы. Я предполагаю, что когда вы всплываете, вы должны запустить функцию. Убедитесь, что вы запускаете функцию НЕ под замком. Кроме того, ваше использование g_notified не является необходимым и, вероятно, является пессимизацией. Вы должны просто проверить наличие m_workQueue.empty(). Также я не уверен, что вы пытаетесь сделать с bIsReady.   -  person Mike Vine    schedule 05.08.2015
comment
Переменные-члены в классе ThreadPool являются общедоступными. Это объектно-ориентированный?   -  person Peter Mortensen    schedule 07.08.2015


Ответы (3)


Я использую boost::asio для реализации пула потоков. Надеюсь это поможет. Эта реализация была взята из пула потоков Asio. Для меня ключом к тому, чтобы пример работал, является область asio::io_service::work и наличие join_all за пределами этой области.

#include <boost/chrono.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <boost/scoped_ptr.hpp>
#include <iostream>


boost::mutex output_mutex;
void call(size_t job_number)
{
    {
       boost::mutex::scoped_lock print_lock(output_mutex);
       std::cout << "Hi i'm job << " << job_number <<" and thread: " << boost::this_thread::get_id() << std::endl;
    }

    boost::this_thread::sleep_for(boost::chrono::seconds(2));
    {
       boost::mutex::scoped_lock print_lock(output_mutex);
       std::cout << "job " << job_number << " finished. thread " << boost::this_thread::get_id() << " ready." << std::endl;
    }
};

int main(int argc, char **argv)
{
    size_t number_of_threads = boost::thread::hardware_concurrency();

    // the number of jobs does not have to equal the number of
    // threads.  they will be processed in turn.
    size_t number_of_jobs = 3 * number_of_threads;

    boost::asio::io_service io_service;
    boost::thread_group threads;
    {
        boost::scoped_ptr< boost::asio::io_service::work > work( new boost::asio::io_service::work(io_service) );
        for(size_t t = 0; t < number_of_threads; t++)
        {
            threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
        }

        // post the jobs for work
        // notice that the thread id is reused
        for( size_t t = 0; t < number_of_jobs; t++ )
        {
            io_service.post(boost::bind(call,t) );
        }
    }
    threads.join_all();

    return 0;
}
person DannyK    schedule 05.08.2015
comment
Я использовал этот код (шаблон) для потоковых функций, включая методы класса, требующие параметров. - person DannyK; 10.08.2015

Проблема в функции run: g_lockqueue слишком долго заблокирована. Он заблокирован, пока locker находится в области действия, поэтому g_lockqueue блокируется при вызове pop.
Но, поскольку pop и empty не должны выполняться одновременно, вам нужно вернуть "Работу", пока она заблокирована, а затем снять блокировку. , и, наконец, выполнить работу.

run может выглядеть так:

void run()
{
    while (!bIsReady || !m_workQueue.empty())
    {
        Work work;
        {
            std::unique_lock<std::mutex> locker(g_lockqueue);
            if (m_workQueue.empty())
            {

                while (!g_notified) // used to avoid spurious wakeups 
                {
                    g_queuecheck.wait(locker);
                }
                if(!bIsReady)
                    g_notified.store(false);
            }
            work = m_workQueue.pop();  // get the work to be done while locked
        } // g_lockqueue released here
        work.do();  // do the work
    }
};
person alain    schedule 05.08.2015

  1. Как я могу это исправить?

Вы не показываете фактическую работу, только pop вызывается. Предполагая, что это действительно выполняет функцию, вы должны отметить, что ваш std::unique_lock все еще находится в области видимости, когда вы это делаете, поэтому он синхронизирован. Вам нужно освободить блокировку, чтобы вызовы были одновременными, что означает, что блокировщик должен быть вне области действия.

Например,

    { // begin locker scope
        std::unique_lock<std::mutex> locker(g_lockqueue);
        if (m_workQueue.empty())
        {

            while (!g_notified) // used to avoid spurious wakeups 
            {
                g_queuecheck.wait(locker);
            }
            if(!bIsReady)
                g_notified.store(false);
        }
    } // end locker scope
    m_workQueue.pop();

Обратите внимание, что теперь ваш метод pop будет запускать функции параллельно, но он также будет параллельно изменять очередь. Это проблема. Вам нужно будет сделать что-то вроде: переместить функцию в локальную переменную, а затем вытолкнуть, удерживая мьютекс; затем вызовите функцию вне области шкафчика.

  1. Что-то еще не так с моим ThreadPool?

Вы используете глобальные переменные, что некрасиво, и ваши переменные плохо названы: совершенно не ясно, что означают g_notified и bIsReady.

  1. Является ли ожидание лучшей практикой?

Нет. Даже если не считать самой ошибки, ее намерения неясны.

person Useless    schedule 05.08.2015