Как я могу объединить две Pcollections (различных размеров / данных) с общим ключом (Street) через боковой ввод?

У меня есть две коллекции PCollections: одна извлекает информацию из Pub / Sub, а другая извлекает данные из файла CSV. После нескольких различных преобразований в каждом конвейере я хотел бы объединить эти два на общем ключе, который они оба имеют, «УЛИЦА». Я включаю вторую коллекцию PCollection как побочный ввод. Однако при попытке запуска я получаю сообщение об ошибке.

Я попытался использовать CoGroupByKey, но продолжал получать ошибки относительно разницы в типах данных в Pcollections. Я попытался провести рефакторинг выходных данных и установить атрибут PCollection с помощью __setattr__, чтобы заставить типы быть равными, но, тем не менее, он сообщил о «смешанных значениях». После дальнейших исследований выяснилось, что лучше использовать побочные входы, особенно когда существует несоответствие в размере данных между элементами. Даже с боковыми входами я все еще не могу обойти текущую ошибку:

from_runner_api raise ValueError('No producer for %s' % id)
ValueError: No producer for ref_PCollection_PCollection_6

Логика моего приложения следующая:

def merge_accidents(element, pcoll):
    print(element)
    print(pcoll)
    "some code that will append to existing data"

accident_pl = beam.Pipeline()
accident_data = (accident_pl |
                        'Read' >> beam.io.ReadFromText('/modified_Excel_Crashes_Chicago.csv')
                        | 'Map Accidents' >> beam.ParDo(AccidentstoDict())
                        | 'Count Accidents' >> Count.PerKey())

chi_traf_pl = beam.Pipeline(options=pipeline_options)
chi_traffic = (chi_traf_pl | 'ReadPubSub' >> beam.io.ReadFromPubSub(subscription=subscription_name, with_attributes=True)
                           | 'GeoEnrich&Trim' >> beam.Map(loc_trim_enhance)
                           | 'TimeDelayEnrich' >> beam.Map(timedelay)
                           | 'TrafficRatingEnrich' >> beam.Map(traffic_rating)
                           | 'MergeAccidents' >> beam.Map(merge_accidents, pcoll=AsDict(accident_data))
                           | 'Temp Write'>> beam.io.WriteToText('testtime', file_name_suffix='.txt'))

accident_pl.run()
chi_result = chi_traf_pl.run()
chi_result.wait_until_finish()```

**Pcoll 1:**
[{'segment_id': '1', 'street': 'Western Ave', 'direction': 'EB', 'length': '0.5', 'cur_traffic': '24', 'county': 'Cook County', 'neighborhood': 'West Elsdon', 'zip_code': '60629', 'evnt_timestamp': '2019-04-01 20:50:20.0', 'traffic_rating': 'Heavy', 'time_delay': '0.15'}]
**Pcoll 2:**
('MILWAUKEE AVE', 1)
('CENTRAL AVE', 2)
('WESTERN AVE', 6)

**Expected:**
[{'segment_id': '1', 'street': 'Western Ave', 'direction': 'EB', 'length': '0.5', 'cur_traffic': '24', 'county': 'Cook County', 'neighborhood': 'West Elsdon', 'zip_code': '60629', 'evnt_timestamp': '2019-04-01 20:50:20.0', 'traffic_rating': 'Heavy', 'time_delay': '0.15', 'accident_count': '6'}]

**Actual Results:**
"from_runner_api raise ValueError('No producer for %s' % id)ValueError: No producer for ref_PCollection_PCollection_6

person cloudpython    schedule 02.04.2019    source источник
comment
Это DirectRunner? Также просто чтобы проверить, передаете ли вы --streaming в качестве опции для конвейера?   -  person Reza Rokni    schedule 02.04.2019
comment
Спасибо за помощь! ! Я использую DirectRunner для тестирования прямо сейчас. Намерение состоит в том, чтобы использовать PubSub и Cloud Datastore в качестве источников, запускать через Cloud Dataflow и выгружать в BigQuery. Я не хотел продолжать использовать Dataflow для тестирования своего кода, поэтому я запускаю DirectRunner на измененных / меньших наборах данных, пока логика кода не станет твердой. Параметр потоковой передачи передается конвейеру chi_traffic из-за извлечения PubSub. В конечном итоге это будет комбинация потоковой / пакетной обработки.   -  person cloudpython    schedule 02.04.2019


Ответы (1)


Итак, я разобрался с проблемой. Посмотрев на pipeline.py и источник unittest для побочных входов, я понял, что есть проверка на созданный объект Pipeline.

Я новичок в этом, и поэтому я изначально полагал, что вам нужно создать два отдельных объекта Pipeline (потоковая передача или пакетная передача), чтобы я мог передавать разные параметры обоим; т.е. потоковая передача: Верно. При этом я не считаю, что это необходимо.

После объединения их в один объект, как показано ниже, ошибка исчезла, и я смог принять побочные входы в функцию:

'''

pipeline = beam.Pipeline(options=pipeline_options)
accident_data = (pipeline
                 | 'Read' >> beam.io.ReadFromText('modified_Excel_Crashes_Chicago.csv')
                 | 'Map Accidents' >> beam.ParDo(AccidentstoDict())
                 | 'Count Accidents' >> Count.PerKey())

chi_traffic = (pipeline
               | 'ReadPubSub' >> beam.io.ReadFromPubSub(subscription=subscription_name, with_attributes=True)
               | 'GeoEnrich&Trim' >> beam.Map(loc_trim_enhance)
               | 'TimeDelayEnrich' >> beam.Map(timedelay)
               | 'TrafficRatingEnrich' >> beam.Map(traffic_rating)
               | 'MergeAccidents' >> beam.Map(merge_accidents, pcoll=pvalue.AsDict(accident_data))
               | 'Temp Write' >> beam.io.WriteToText('testtime',
                                                     file_name_suffix='.txt'))

chi_result = pipeline.run()
chi_result.wait_until_finish()

'''

person cloudpython    schedule 02.04.2019