Как сгруппировать разные асинхронные источники в единый с помощью RxJava2?

Скажем, у меня есть этот синхронный метод:

public FruitBowl getFruitBowl() {
    Apple apple = getApple(); // IO intensive
    Banana banana = getBanana(); // CPU intensive
    return new FruitBowl(apple, banana);
}

Я могу использовать API параллелизма Java, чтобы превратить его в асинхронный метод, который будет выглядеть примерно так:

public Future<FruitBowl> getFruitBowl() {
    Future<Apple> appleFuture = getAppleAsync(); // IO intensive
    Future<Banana> bananaFuture = getBananaAsync(); // CPU intensive
    return createFruitBowlAsync(appleFuture, bananaFuture); // Awaits appleFuture and bananaFuture and then returns a new FruitBowl
}

Каков идиоматический способ Rx сделать это, используя преимущества его планировщиков (io и вычисления) и возвращая Single?


person Gustav Karlsson    schedule 08.11.2017    source источник


Ответы (1)


Вы можете использовать оператор zip. И для каждой асинхронной операции определите отдельный поток. Если вы этого не сделаете, методы будут выполняться один за другим в одном потоке.

Я бы создал Observable версию обоих методов, чтобы вернуть соответственно Observable<Apple> и Observable<Banana> и использовать их таким образом:

Observalbe.zip(getAppleObservable().subscribeOn(Schedulers.newThread()), 
               getBananaObservable().subscribeOn(Schedulers.newThread()),
     (apple, banana) -> new FruitBowl(apple, banana)))
     .subscribe(/* do your work here with FruitBowl object */);

Здесь подробнее о том, как распараллеливать операции с помощью оператора zip.

person GVillani82    schedule 08.11.2017
comment
Несколько действительно важных ошибок в этой статье! - person Gustav Karlsson; 08.11.2017