Могу ли я обновить коллекцию, которую использует Parallel.For?

У меня есть ситуация, когда я запускаю некоторые задачи, каждая из которых занимает от нескольких секунд до минут. У меня также есть возможность добавления дополнительных данных, которые необходимо добавить в уже запущенный цикл Parallel. Можно ли обновить текущую коллекцию, которую использует Parallel.For, и продолжать повторять ее до тех пор, пока не останется объектов для извлечения? Вот пример кода, показывающий мою проблему:

[Test]
public void DoesParallelForGetNewEntriesInLoop()
{
    ConcurrentDictionary<int, string> dict = new ConcurrentDictionary<int, string>();
    ConcurrentBag<string> bag = new ConcurrentBag<string>();
    int i = 0;
    // write to dictionary every 10ms simulating new additions
    Timer t = new Timer(callback =>
    {
        dict.TryAdd(i++, "Value" + i);
    }, dict, 0, 10);
    // Add initial values
    dict.TryAdd(i++, "Value" + i);
    dict.TryAdd(i++, "Value" + i);
    dict.TryAdd(i++, "Value" + i);

    Parallel.For(0, dict.Count, (a, state) =>
    {
        string val = string.Empty;
        if (dict.TryGetValue(a, out val))
        {
            bag.Add(val + Environment.NewLine);
        }
        if (i++ == 50)
            state.Stop();
        Thread.Sleep(5000);

    });
    foreach (var item in bag)
    {
        File.AppendAllText("parallelWrite.txt", item);
    }            
}

Когда я запускаю это, я получаю просто результат:

Value2
Value1
Value3
Value4

Есть ли лучший подход к тому, что я пытаюсь сделать здесь?


person Kyle C    schedule 05.02.2015    source источник


Ответы (2)


Как насчет использования BlockingCollection и вызова GetConsumingEnumerable() в вашем Parallel.ForEach

BlockingCollection<string> collection = new BlockingCollection<string>();

Parallel.ForEach(collection.GetConsumingEnumerable(), (x) => Console.WriteLine(x));

Вы можете добавлять материалы в коллекцию, используя метод BlockingCollection Add().

Технически происходит «двойная блокировка», поскольку Parallel.ForEach блокирует коллекцию, когда она берет фрагменты элементов из перечислимого для обработки, а BlockingCollection был создан для поддержки нескольких потребителей, поэтому он также реализует блокировку. Если это становится проблемой производительности (это вполне может быть), вы можете реализовать свой собственный разделитель для своей BlockingCollection, поскольку Parallel.ForEach имеет перегрузки, которые принимают OrderablePartitioner и Partitioner. Существует очень хорошая статья, в которой описывается, как здесь: http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

person Michael Humelsine    schedule 05.02.2015
comment
Это имеет большой смысл, я попробую и отмечу это как ответ, если это сработает :) - person Kyle C; 06.02.2015
comment
В итоге я использовал немного другой подход, но это помогло мне встать на правильный путь. - person Kyle C; 06.03.2015

Параметры from и to в Parallel.Forgets вычисляются только один раз перед запуском цикла. Используйте Parallel.ForEach для перебора новых элементов. Я не уверен, чего вы пытаетесь достичь, но лучшим подходом может быть размещение новых данных в стеке/очереди и периодическое извлечение данных и их обработка.

person eladcon    schedule 05.02.2015
comment
Спасибо за информацию о Parallel.For, Parallel.ForEach кажется немного лучше, но кажется, что он выходит из цикла до того, как будут добавлены все 50 записей (он никогда не доходит до state.Stop()) - person Kyle C; 05.02.2015