Каков эффективный способ обновления значения внутри RDD Spark?

Я пишу программу, связанную с графами, в Scala с Spark. Набор данных имеет 4 миллиона узлов и 4 миллиона ребер (вы можете рассматривать это как дерево), но каждый раз (Iteration) я редактирую только его часть, а именно поддерево, укорененное в данном узле, и узлы на пути между данным узлом и корнем.

Iteration имеет зависимость, что означает, что i+1 Iteration нужен результат, исходящий от i. Поэтому мне нужно сохранить результат каждого Iteration для следующего шага.

Я пытаюсь найти эффективный способ обновления RDD, но пока понятия не имею. Я обнаружил, что PairRDD имеет функцию lookup, которая может сократить время вычислений с O(N) до O(M), N обозначает общее количество объектов в RDD и M обозначают количество элементов в каждом разделе.

Итак, я думаю, могу ли я в любом случае обновить объект в RDD с помощью O(M)? Или, в идеале, O(1)? (Я вижу электронное письмо в списке рассылки Spark, в котором говорится, что lookup можно изменить для достижения O(1))

Другое дело, если бы я смог добиться O(M) для обновления RDD, мог бы я увеличить раздел до некоторого числа, большего, чем количество ядер, которое у меня есть, и добиться лучшей производительности?


person bxshi    schedule 10.06.2014    source источник
comment
RDD неизменяем, вы можете создать новый RDD только путем преобразования, но не можете его обновить.   -  person cloud    schedule 10.06.2014
comment
@cloud Спасибо за ваш комментарий, значит ли это, что мне нужно создать совершенно новый RDD, а не только раздел?   -  person bxshi    schedule 10.06.2014
comment
Вот и все. Я напишу ответ, чтобы объяснить это подробно.   -  person cloud    schedule 10.06.2014


Ответы (3)


RDD — это распределенный набор данных, раздел — это единица хранения RDD, единица для обработки, а RDD — это элемент.

Например, вы читаете большой файл из HDFS как RDD, затем элементом этого RDD является String (строки в этом файле), и spark сохраняет этот RDD в кластере по разделам. Вам, как пользователю spark, нужно заботиться только о том, как работать со строками этих файлов, точно так же, как вы пишете обычную программу и читаете файл из локальной файловой системы построчно. Это сила искры :)

В любом случае, вы понятия не имеете, какие элементы будут храниться в определенном разделе, поэтому нет смысла обновлять определенный раздел.

person cloud    schedule 10.06.2014
comment
Итак, основываясь на вашем ответе и ответе maasg, я должен рассматривать RDD как обычный объект и не пытаться настраивать производительность на более низком уровне, поскольку фреймворк сделает это за меня и создаст новый RDD с повторным использованием объекта (поэтому в основном создание просто итерация и замена какого-то объекта на новые) не так медленно, как я думал? - person bxshi; 10.06.2014
comment
Объект @bxshi RDD дешев, но данные внутри него дороги. Например, вы пишете приложение: data_source -> rdd1 -> rdd2 -> rdd3 -> get_result. Что на самом деле делает искра: запомните ваше преобразование t1, t2, t3 и примените это преобразование к источнику данных и получите результат. Spark не сохранит данные RDD, если вы не вызовете RDD.cache(). - person cloud; 11.06.2014
comment
@cloud: Означает ли это, что в любой момент времени будет существовать только один RDD? - person Shankar; 22.06.2015
comment
@cloud, почему я спрашиваю, если у нас несколько RDD, это займет больше места на диске / памяти .. верно? - person Shankar; 22.06.2015

Как функциональные структуры данных, RDD неизменяемы, и операция над RDD генерирует новый RDD.

Неизменяемость структуры не обязательно означает полную репликацию. Постоянные структуры данных — это распространенный функциональный шаблон, в котором операции с неизменяемыми структурами дают новую структуру, но предыдущие версии сохраняются и часто используются повторно.

GraphX ​​(«модуль» поверх Spark) — это API-интерфейс графа поверх Spark, который использует такую ​​концепцию: Из документов:

Изменения значений или структуры графика выполняются путем создания нового графика с желаемыми изменениями. Обратите внимание, что существенные части исходного графа (то есть неизмененная структура, атрибуты и индексы) повторно используются в новом графе, что снижает стоимость этой изначально функциональной структуры данных.

Это может быть решением проблемы: http://spark.apache.org/docs/1.0.0/graphx-programming-guide.html

person maasg    schedule 10.06.2014
comment
Да, они используются повторно, но вам все равно нужно перебрать все элементы, чтобы создать новый объект. - person bxshi; 10.06.2014
comment
Когда вы говорите, что я пытаюсь найти эффективный способ обновления RDD, я думал, что вы имеете в виду мутации на месте. Вы скорее говорите о поиске? - person maasg; 10.06.2014
comment
@massg Что ж, я хотел поговорить об обновлении RDD, но ошибся в определении iteration. Когда вы выполняете карту или другие операции манипуляции для создания нового RDD, у вас есть параллелизм в таких операциях, но вам все равно нужно получить доступ ко всем элементам внутри старого RDD. - person bxshi; 10.06.2014
comment
@bxshi Действительно невозможно. RDD — это просто последовательность преобразований, единственное, что вы можете изменить, — это исходные данные. (Или взломать кеш, но я бы не советовал) - person maasg; 10.06.2014

Модель программирования MapReduce (и FP) на самом деле не поддерживает обновление отдельных значений. Скорее предполагается определить последовательность преобразований.

Теперь, когда у вас есть взаимозависимые значения, то есть вы не можете выполнить преобразование с помощью простого map, но вам нужно агрегировать несколько значений и обновлять на основе этого значения, вам нужно подумать о способе группировки этих значений вместе, а затем преобразовать каждую группу - или определить моноидальную операцию, чтобы операцию можно было распределить и разбить на подшаги.

Группировать по подходу

Теперь я постараюсь быть немного более конкретным для вашего конкретного случая. Вы говорите, что у вас есть поддеревья, возможно ли сначала сопоставить каждый узел с ключом, указывающим соответствующее поддерево? Если это так, вы можете сделать что-то вроде этого:

nodes.map(n => (getSubTreeKey(n), n)).grouByKey().map ...

Моноид

(строго говоря, вам нужен коммутативный моноид) Лучше всего прочитать http://en.wikipedia.org/wiki/Monoid#Commutative_monoid

Например, + является моноидальной операцией, потому что, когда кто-то хочет вычислить сумму, скажем, RDD целых чисел, базовая структура может разбить данные на куски, выполнить суммирование для каждого фрагмента, а затем суммировать полученные суммы (возможно, также более чем в 2 шага). Если вы можете найти моноид, который в конечном итоге будет давать те же результаты, которые вам требуются от одиночных обновлений, то у вас есть способ распределить свою обработку. Например.

nodes.reduce(_ myMonoid _)

person samthebest    schedule 11.06.2014