Изменить результат первого наблюдаемого во втором наблюдаемом и вернуть обновленный результат

У меня есть метод exampleMethod(), который возвращает результат двух последовательных вызовов службы (getData1() и getData2()), тогда как getData2() должен изменить данные, возвращенные getData1(), до того, как exampleMethod() вернется. В настоящее время вызов вложенной службы выполняется после того, как мы уже вернулись из exampleMethod(), и это слишком поздно. Я знаю, что вызов подписки внутри другого наблюдаемого объекта является плохой практикой и может быть причиной проблемы, поэтому я спрашиваю, как правильно это сделать. Пример кода ниже. Спасибо!

exampleMethod() {
    return this.myService.getData1()
      .pipe(
        map(res => {
          res.orders = someValue;
          return res;
        }),
        map(res => {
          res.orders.forEach((order: any) => {
            this.myService.getData2()
              .pipe(
                tap((orders: any[]) =>
                  order.status = orders.find(o=> o.number === order.number).status // <<====== this resolves after we've already returned from exampleMethod()
                )
              ).subscribe();
          });
          return res; // <<====== status value of res.orders not set
        })
      );
  }

person bigb055    schedule 16.01.2021    source источник
comment
Я полагаю, вы хотите карту переключения   -  person theMayer    schedule 16.01.2021
comment
да, взгляните на concatMap или mergeMap, а может быть, и на forkJoin   -  person martin    schedule 16.01.2021
comment
Если я заменю второй вызов map() на switchMap() и избавлюсь от subscribe(), я получаю: ERROR TypeError: Вы указали недопустимый объект там, где ожидался поток. Вы можете предоставить Observable, Promise, Array или Iterable   -  person bigb055    schedule 16.01.2021
comment
Эмпирическое правило: каждый раз, когда вы сталкиваетесь со встроенной подпиской, вы можете заменить ее обратным корреспондентом Observable и иметь 1 subscribe() в самом конце   -  person IgorK    schedule 16.01.2021


Ответы (2)


Вы правы, говоря, что внутренняя подписка не элегантна. Здесь вам понадобится оператор RxJS switchMap с функцией forkJoin для нескольких одновременных запросов.

Попробуйте следующее

exampleMethod(): Observable<any> {   // <-- define return type here
  return this.myService.getData1().pipe(
    map(res => ({...res, res.orders: someValue})),
    switchMap(res =>
      forkJoin(
        res.orders.map(order =>
          this.myService.getData2().pipe(map((orders: any[]) =>       // <-- pip the `map` operator here
            ({...res, res.status: orders.find(o => o.number === order.number).status})
          ))
        )
      )
    )
  );
}

Изменить: включить pipe для RxJS map

person Michael D    schedule 16.01.2021
comment
Борьба с TypeError: вы предоставили недопустимый объект там, где ожидался поток. Вы можете предоставить Observable, Promise, Array или Iterable. по вызову res.orders.map(). - person bigb055; 16.01.2021
comment
Я пропустил функцию pipe для оператора map в строке 7. Я обновил ответ. Пожалуйста, попробуйте решение еще раз. - person Michael D; 18.01.2021
comment
Используя ваш и следующий ответ stackoverflow.com/questions/60590846/ в качестве руководства я смог создать рабочее решение. Единственное, что отличается от вашей реализации, это то, что я помещаю внутренние наблюдаемые в массив, а затем вызываю для него forkJoin(), затем делаю некоторые модификации и использую mapTo() для сопоставления результата с наблюдаемым более высокого уровня перед возвратом. - person bigb055; 18.01.2021

Вы можете сделать это с помощью Observables более высокого порядка. Я подозреваю, что switchMap может быть не тем, что вы ищете, а скорее concatMap или concatAll.

return this.myService.getData1().pipe(
    map((res) => {
      res.orders = ["someValue"];
      return res;
    }),
    map((res) => 
      // <-- use switch map to internally subscribe to inner observable
      res.orders.forEach((order: any) => 
        this.myService.getData2().pipe(
          first(),
          tap(/* do something */)
        );
      )
    ),
    concatAll(), // <-- concat all inner-observables values in order
    last() // ensure that only the last value is emitted to outer observable
  );

Однако следует помнить одну вещь: наблюдаемый объект не будет выдавать значения синхронно. поэтому exampleMethod() всегда будет возвращаться до того, как наблюдаемое обработает какие-либо значения. Один из способов, хотя я бы не рекомендовал его, — дождаться завершения наблюдаемого в асинхронном методе:

async function exampleMethod(){
    return await this.dataService.getData()
       .pipe(/* whatever*/ )
       .toPromise(); // this will resolve when the observable is complete
}

person Omar Abu Saada    schedule 16.01.2021