Запуск пользовательского класса Java в PySpark на EMR

Я пытаюсь использовать пакет Cerner Bunsen для обработки FHIR в PySpark на AWS EMR, в частности класс Bundles и его методы. Я создаю сеанс Spark с помощью API Apache Livy,

def create_spark_session(master_dns, kind, jars):
    # 8998 is the port on which the Livy server runs
    host = 'http://' + master_dns + ':8998'
    data = {'kind': kind, 'jars': jars}
    headers = {'Content-Type': 'application/json'}
    response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    logging.info(response.json())
    return response.headers

Где kind = pyspark3, а jars - это локация S3, в которой находится банка (bunsen-shaded-1.4.7.jar)

Преобразование данных пытается импортировать банку и вызывать методы через:

# Setting the Spark Session and Pulling the Existing SparkContext
sc = SparkContext.getOrCreate()

# Cerner Bunsen
from py4j.java_gateway import java_import, JavaGateway
java_import(sc._gateway.jvm,"com.cerner.bunsen.Bundles")
func = sc._gateway.jvm.Bundles()

Я получаю сообщение об ошибке

«py4j.protocol.Py4JError: ошибка при вызове None.com.cerner.bunsen.Bundles. Трассировка: \ npy4j.Py4JException: конструктор com.cerner.bunsen.Bundles ([]) не существует»

Я впервые попытался использовать java_import, поэтому я буду благодарен за любую помощь.

РЕДАКТИРОВАТЬ: Я немного изменил сценарий преобразования и теперь вижу другую ошибку. Я вижу, что jar добавляется в журналы, поэтому я уверен, что он там, и что функциональность jars: jars работает должным образом. Новое преобразование:

# Setting the Spark Session and Pulling the Existing SparkContext
sc = SparkContext.getOrCreate()

# Manage logging
#sc.setLogLevel("INFO")

# Cerner Bunsen
from py4j.java_gateway import java_import, JavaGateway
java_import(sc._gateway.jvm,"com.cerner.bunsen")
func_main = sc._gateway.jvm.Bundles
func_deep = sc._gateway.jvm.Bundles.BundleContainer

fhir_data_frame = func_deep.loadFromDirectory(spark,"s3://<bucket>/source_database/Patient",1)
fhir_data_frame_fromJson = func_deep.fromJson(fhir_data_frame)
fhir_data_frame_clean = func_main.extract_entry(spark,fhir_data_frame_fromJson,'patient')
fhir_data_frame_clean.show(20, False)

и новая ошибка:

Объект JavaPackage не вызывается

Поиск этой ошибки был немного бесполезным, но, опять же, если у кого-то есть идеи, я с радостью воспользуюсь ими.


person user1983682    schedule 22.01.2020    source источник


Ответы (1)


Если вы хотите использовать функцию Scala / Java в Pyspark, вам также необходимо добавить пакет jar в путь к классам. Вы можете сделать это двумя способами:

Вариант1: в Spark отправить с флагом --jars

 spark-submit example.py --jars /path/to/bunsen-shaded-1.4.7.jar

Вариант 2: добавьте его в spark-defaults.conf файл в свойстве:

Добавьте следующий код в: path/to/spark/conf/spark-defaults.conf

# Comma-separated list of jars include on the driver and executor classpaths. 
spark.jars /path/to/bunsen-shaded-1.4.7.jar
person ggeop    schedule 22.01.2020
comment
Я считаю, что первый вариант перенесен в вызов api livy rest через data = {'kind': kind, 'jars': jars}, где jars = bunsen-shaded-1.4.7.jar - Разве это не кейс? Ссылка: livy.incubator.apache.org/docs/latest/rest- api.html, вызов POST / sessions - person user1983682; 22.01.2020
comment
@ user1983682 Я не использовал livy api, но похоже, что он выполняет ту же работу. Ваша ОШИБКА очень ясна, вы не добавили банку в путь к классам. Может у вас опечатка, неправильный путь, ошибка в упаковке что-то вроде этого. У вас есть доступ в пользовательском интерфейсе искры? Если у вас есть, вы можете перейти на вкладку среды и посмотреть, правильно ли вы добавили - person ggeop; 22.01.2020
comment
Это определенно есть, я вижу, как это добавляется в журналы. Я немного изменил преобразование данных и теперь получаю другую ошибку. Я добавлю правку выше. - person user1983682; 24.01.2020
comment
@ user1983682, у вашей проблемы есть простое решение, посмотрите эту документацию: engineering. cerner.com/bunsen/0.4.6/introduction.html - person ggeop; 24.01.2020
comment
Теперь банка выглядит как место записи, но искра не может ее найти. На мой взгляд, путь импорта выглядит подозрительно. - person ggeop; 24.01.2020
comment
Да, я пробовал их простое решение, но, к сожалению, оно не казалось совместимым с более широкой картиной (Apache Airflow запускает EMR и использует Ливи для вызова Spark). Я внес дополнительные изменения (удалил параметр jars из Livy и вместо этого использовал загрузку для получения файла, затем EMR Configurations, чтобы установить значения по умолчанию для искры. Ошибка в том, что Constructor не существует, поэтому я думаю, что вы правы насчет импорта path или что-то еще о вызове класса. Работаем над этим и сообщим об этом. Еще раз спасибо, @ggeop! - person user1983682; 25.01.2020
comment
Да, попробуйте исследовать путь импорта, и вы его найдете, я думаю, что теперь вы близки с новой ОШИБКОЙ! Не пропадай! Сообщи мне, когда найдешь - person ggeop; 25.01.2020