В настоящее время у меня проблема со Spark и чтением файлов bz2. Я использую Spark 1.2.0 (предварительно созданный для Hadoop 2.4, но в настоящее время файлы читаются только локально). Для тестирования есть ~1500 файлов, каждый файл размером около 50Кб.
Следующий скрипт count_loglines.py иллюстрирует проблему:
from pyspark import SparkConf, SparkContext
spark_conf = SparkConf().setAppName("SparkTest")
sc = SparkContext(conf=spark_conf)
overall_log_lines = sc.textFile('/files/bzipped/*.log.bz2')
line_count = overall_log_lines.count()
print line_count
Запустив скрипт локально на одном ядре, он работает как положено.
spark/bin/spark-submit --master local[1] count_log_lines.py
Запуск скрипта на 2-х ядрах с помощью
spark/bin/spark-submit --master local[2] count_log_lines.py
заканчивается сообщениями об ошибках библиотеки hadoop bzip2, такими как
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 60 in stage 0.0 failed 1 times, most recent failure: Lost task 60.0 in stage 0.0 (TID 60, localhost): java.io.IOException: unexpected end of stream
at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.bsGetBit(CBZip2InputStream.java:626)
Когда я заранее распаковываю файлы и читаю несжатые файлы журналов вместо сжатых, т. Е. sc.textFile('/files/unzipped/*.log'), сценарий работает, как и ожидалось, также на нескольких ядрах.
Мой вопрос: что здесь не так? Почему задание Spark неправильно считывает файлы bz2, если работает на нескольких ядрах?
Спасибо за помощь!