Как установить кодировщик для конвейера потока данных Google в Python?

Я создаю пользовательское задание Dataflow на Python для загрузки данных из PubSub в BigQuery. В таблице много вложенных полей.

Где я могу установить Coder в этом конвейере?

avail_schema = parse_table_schema_from_json(bg_out_schema)
coder = TableRowJsonCoder(table_schema=avail_schema)

with beam.Pipeline(options=options) as p:
    # Read the text from PubSub messages.
    lines = (p | beam.io.ReadFromPubSub(subscription="projects/project_name/subscriptions/subscription_name")
              | 'Map' >> beam.Map(coder))
    # transformed = lines| 'Parse JSON to Dict' >> beam.Map(json.loads)
    transformed | 'Write to BigQuery' >> beam.io.WriteToBigQuery("Project:DataSet.Table", schema=avail_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)


Error: Map can be used only with callable objects. Received TableRowJsonCoder instead.

person BVSKanth    schedule 22.08.2019    source источник


Ответы (1)


В приведенном выше коде кодировщик применяется к сообщению, прочитанному из PubSub, которое является текстом.

WriteToBigQuery работает как со словарем, так и с TableRow. json.load генерирует dict, поэтому вы можете просто использовать его вывод для записи в BigQuery без применения какого-либо кодера. Обратите внимание, что поле в словаре должно соответствовать схеме таблицы.

Чтобы избежать проблем с кодером, я бы предложил использовать следующий код.

avail_schema = parse_table_schema_from_json(bg_out_schema)

with beam.Pipeline(options=options) as p:
    # Read the text from PubSub messages.
    lines = (p | beam.io.ReadFromPubSub(subscription="projects/project_name/subscriptions/subscription_name"))
    transformed = lines| 'Parse JSON to Dict' >> beam.Map(json.loads)
    transformed | 'Write to BigQuery' >> beam.io.WriteToBigQuery("Project:DataSet.Table", schema=avail_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
person Ankur    schedule 22.08.2019
comment
Он работает для простого JSON, но не вложенного. Таблица имеет поля типа RECORD с вложенными данными. Вот где это терпит неудачу. - person BVSKanth; 23.08.2019
comment
Я бы предложил попробовать проанализировать вложенный json отдельно, чтобы отладить проблему. Поскольку я подозреваю, что синтаксис и библиотека json могут быть неправильными. - person Ankur; 24.08.2019
comment
Я пытался вставить нуль в столбцы «ПОВТОРЯЮЩИЕСЯ». Изменение ввода с nulll на [] устранило проблему. - person BVSKanth; 27.08.2019