Библиотека hadoop bz2 в задании Spark дает сбой при работе на нескольких ядрах

В настоящее время у меня проблема со Spark и чтением файлов bz2. Я использую Spark 1.2.0 (предварительно созданный для Hadoop 2.4, но в настоящее время файлы читаются только локально). Для тестирования есть ~1500 файлов, каждый файл размером около 50Кб.

Следующий скрипт count_loglines.py иллюстрирует проблему:

 from pyspark import SparkConf, SparkContext
 spark_conf = SparkConf().setAppName("SparkTest")
 sc = SparkContext(conf=spark_conf)

 overall_log_lines = sc.textFile('/files/bzipped/*.log.bz2')
 line_count = overall_log_lines.count()
 print line_count

Запустив скрипт локально на одном ядре, он работает как положено.

spark/bin/spark-submit --master local[1] count_log_lines.py

Запуск скрипта на 2-х ядрах с помощью

spark/bin/spark-submit --master local[2] count_log_lines.py

заканчивается сообщениями об ошибках библиотеки hadoop bzip2, такими как

 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 60 in stage 0.0 failed 1 times, most recent failure: Lost task 60.0 in stage 0.0 (TID 60, localhost): java.io.IOException: unexpected end of stream
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.bsGetBit(CBZip2InputStream.java:626)

Когда я заранее распаковываю файлы и читаю несжатые файлы журналов вместо сжатых, т. Е. sc.textFile('/files/unzipped/*.log'), сценарий работает, как и ожидалось, также на нескольких ядрах.

Мой вопрос: что здесь не так? Почему задание Spark неправильно считывает файлы bz2, если работает на нескольких ядрах?

Спасибо за помощь!


person siggi_42    schedule 01.02.2015    source источник
comment
У меня точно такая же проблема. У кого-нибудь есть ответ?   -  person zbinsd    schedule 16.03.2015
comment
Какую версию хаупа вы использовали?   -  person Fabiano Francesconi    schedule 27.08.2015
comment
Вероятно, вы столкнулись с ошибкой в ​​Hadoop: issues.apache.org/jira/browse/HADOOP -10614   -  person    schedule 10.02.2016
comment
Привет, ты получил ответ на то же самое. Пожалуйста, дайте нам знать ваши комментарии   -  person dinesh028    schedule 27.06.2017


Ответы (1)


Я не совсем уверен, что любой текстовый файл поддерживает файлы bz2.

Вы можете взглянуть на pyspark newAPIHadoopFile или API-интерфейсы hadoopfile. Если разделенный файл bz2 содержит текст (например, журнал), вы можете использовать:

stdout = sc.newAPIHadoopFile(path="/HDFSpath/to/folder/containing/bz2/", inputFormatClass="org.apache.hadoop.mapreduce.lib.input.TextInputFormat", keyClass="org.apache.hadoop.io.Text", valueClass="org.apache.hadoop.io.Text", keyConverter=None, valueConverter=None, conf=None, batchSize=5)

Источник: http://spark.apache.org/docs/1.2.0/api/python/pyspark.html

hadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)

Прочитайте «старый» Hadoop InputFormat с произвольным классом ключа и значения из HDFS, локальной файловой системы (доступной на всех узлах) или любого URI файловой системы, поддерживаемого Hadoop. Механизм тот же, что и для sc.sequenceFile.

Конфигурация Hadoop может быть передана в виде словаря Python. Это будет преобразовано в Конфигурацию на Java.

Параметры: путь — путь к файлу Hadoop. Text») valueClass — полное имя класса значения Доступный для записи класс (например, «org.apache.hadoop.io.LongWritable») keyConverter — (по умолчанию нет) valueConverter — (по умолчанию нет) conf — конфигурация Hadoop, переданная в виде dict (По умолчанию нет) batchSize — количество объектов Python, представленных в виде одного объекта Java. (по умолчанию 0, автоматически выбирается размер партии)

or

newAPIHadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)

Прочитайте «новый API» Hadoop InputFormat с произвольным классом ключа и значения из HDFS, локальной файловой системы (доступной на всех узлах) или любого URI файловой системы, поддерживаемого Hadoop. Механизм тот же, что и для sc.sequenceFile.

Конфигурация Hadoop может быть передана в виде словаря Python. Это будет преобразовано в конфигурацию в Java

Параметры: path — путь к файлу Hadoop. hadoop.io.Text») valueClass — полное имя класса значения Доступный для записи класс (например, «org.apache.hadoop.io.LongWritable») keyConverter — (по умолчанию нет) valueConverter — (по умолчанию нет) conf — конфигурация Hadoop, переданная in as a dict (по умолчанию нет). batchSize — количество объектов Python, представленных в виде одного объекта Java. (по умолчанию 0, автоматически выбирается размер партии)

ргс,

K

person KiiMo    schedule 27.03.2015