RxJava Obteniendo Observables en Paralelo

Necesito ayuda para implementar llamadas asíncronas paralelas en RxJava. He elegido un caso de uso simple en el que la PRIMERA llamada recupera (más bien busca) una lista de productos (Mosaico) para mostrar. Las llamadas subsiguientes salen y obtienen (A) REVISIONES y (B) IMÁGENES DEL PRODUCTO

Después de varios bashs llegué a este lugar.

1 Observable searchTile = searchServiceClient.getSearchResults(searchTerm); 2 List allTiles = new ArrayList(); 3 ClientResponse response = new ClientResponse(); 4 searchTile.parallel(oTile -> { 5 return oTile.flatMap(t -> { 6 Observable reviews = reviewsServiceClient.getSellerReviews(t.getSellerId()); 7 Observable imageUrl = reviewsServiceClient.getProductImage(t.getProductId()); 8 return Observable.zip(reviews, imageUrl, (r, u) -> { 9 t.setReviews(r); 10 t.setImageUrl(u); 11 return t; 12 }); 13 }); 14 }).subscribe(e -> { 15 allTiles.add((Tile) e); 16 }); 

Línea 1: sale y recupera el producto (mosaico) para mostrarse

Línea 4: tomamos la lista de Observable y la RECHAZAMOS para obtener revisiones e imageUrls

Mentira 6,7: Obtener la revisión Observable y la URL Observable

Línea 8: Finalmente, los 2 observables se comprimen para devolver un Observable actualizado

Línea 15: finalmente, la línea 15 recostack todos los productos individuales que se mostrarán en una colección que puede devolverse a la capa de llamada

Mientras que el Observable ha sido fragmentado y en nuestras pruebas ejecuta más de 4 hilos diferentes; La búsqueda de reseñas e imágenes parece ser una tras otra. Sospecho que el paso zip de la línea 8 básicamente está causando la invocación secuencial de los 2 observables (reseñas y url).

enter image description here

¿Este grupo tiene alguna sugerencia para buscar de forma paralela reviews y urls de imágenes? En esencia, el diagtwig de cascada adjunto arriba debe verse más verticalmente astackdo. Las llamadas a revisiones e imágenes deben estar en paralelo

gracias anand twign

El operador paralelo demostró ser un problema para casi todos los casos de uso y no hace lo que la mayoría espera de él, por lo que fue eliminado en la versión 1.0.0.rc.4: https://github.com/ReactiveX/RxJava/ tirar / 1716

Un buen ejemplo de cómo hacer este tipo de comportamiento y obtener la ejecución paralela se puede ver aquí .

En su código de ejemplo no está claro si searchServiceClient es síncrono o asíncrono. Afecta la forma de resolver el problema ligeramente, como si ya fuera asincrónico, no se requiere progtwigción adicional. Si se necesita una progtwigción síncrona adicional.

Primero aquí hay algunos ejemplos simples que muestran el comportamiento sincrónico y asincrónico:

 import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers; public class ParallelExecution { public static void main(String[] args) { System.out.println("------------ mergingAsync"); mergingAsync(); System.out.println("------------ mergingSync"); mergingSync(); System.out.println("------------ mergingSyncMadeAsync"); mergingSyncMadeAsync(); System.out.println("------------ flatMapExampleSync"); flatMapExampleSync(); System.out.println("------------ flatMapExampleAsync"); flatMapExampleAsync(); System.out.println("------------"); } private static void mergingAsync() { Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println); } private static void mergingSync() { // here you'll see the delay as each is executed synchronously Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println); } private static void mergingSyncMadeAsync() { // if you have something synchronous and want to make it async, you can schedule it like this // so here we see both executed concurrently Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println); } private static void flatMapExampleAsync() { Observable.range(0, 5).flatMap(i -> { return getDataAsync(i); }).toBlocking().forEach(System.out::println); } private static void flatMapExampleSync() { Observable.range(0, 5).flatMap(i -> { return getDataSync(i); }).toBlocking().forEach(System.out::println); } // artificial representations of IO work static Observable getDataAsync(int i) { return getDataSync(i).subscribeOn(Schedulers.io()); } static Observable getDataSync(int i) { return Observable.create((Subscriber s) -> { // simulate latency try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } s.onNext(i); s.onCompleted(); }); } } 

Lo que sigue es un bash de proporcionar un ejemplo que se asemeje más a su código:

 import java.util.List; import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers; public class ParallelExecutionExample { public static void main(String[] args) { final long startTime = System.currentTimeMillis(); Observable searchTile = getSearchResults("search term") .doOnSubscribe(() -> logTime("Search started ", startTime)) .doOnCompleted(() -> logTime("Search completed ", startTime)); Observable populatedTiles = searchTile.flatMap(t -> { Observable reviews = getSellerReviews(t.getSellerId()) .doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime)); Observable imageUrl = getProductImage(t.getProductId()) .doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime)); return Observable.zip(reviews, imageUrl, (r, u) -> { return new TileResponse(t, r, u); }).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime)); }); List allTiles = populatedTiles.toList() .doOnCompleted(() -> logTime("All Tiles Completed ", startTime)) .toBlocking().single(); } private static Observable getSearchResults(String string) { return mockClient(new Tile(1), new Tile(2), new Tile(3)); } private static Observable getSellerReviews(int id) { return mockClient(new Reviews()); } private static Observable getProductImage(int id) { return mockClient("image_" + id); } private static void logTime(String message, long startTime) { System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms"); } private static  Observable mockClient(T... ts) { return Observable.create((Subscriber s) -> { // simulate latency try { Thread.sleep(1000); } catch (Exception e) { } for (T t : ts) { s.onNext(t); } s.onCompleted(); }).subscribeOn(Schedulers.io()); // note the use of subscribeOn to make an otherwise synchronous Observable async } public static class TileResponse { public TileResponse(Tile t, Reviews r, String u) { // store the values } } public static class Tile { private final int id; public Tile(int i) { this.id = i; } public int getSellerId() { return id; } public int getProductId() { return id; } } public static class Reviews { } } 

Esto produce:

 Search started => 65ms Search completed => 1094ms getProductImage[1] completed => 2095ms getSellerReviews[2] completed => 2095ms getProductImage[3] completed => 2095ms zip[1] completed => 2096ms zip[2] completed => 2096ms getProductImage[2] completed => 2096ms getSellerReviews[1] completed => 2096ms zip[3] completed => 2096ms All Tiles Completed => 2097ms getSellerReviews[3] completed => 2097ms 

He hecho que cada llamada IO se simule para tomar 1000 ms, por lo que es obvio dónde está la latencia y que está sucediendo en paralelo. Imprime el progreso que se hace en milisegundos transcurridos.

El truco aquí es que flatMap combina llamadas asincrónicas, por lo que siempre que los Observables que se combinen sean asincrónicos, todos se ejecutarán simultáneamente.

Si una llamada como getProductImage(t.getProductId()) fue sincrónica, puede hacerse asíncrona de esta manera: getProductImage (t.getProductId ()). SubscribeOn (Schedulers.io).

Aquí está la parte importante del ejemplo anterior sin todos los tipos de registro y repetitivo:

  Observable searchTile = getSearchResults("search term");; Observable populatedTiles = searchTile.flatMap(t -> { Observable reviews = getSellerReviews(t.getSellerId()); Observable imageUrl = getProductImage(t.getProductId()); return Observable.zip(reviews, imageUrl, (r, u) -> { return new TileResponse(t, r, u); }); }); List allTiles = populatedTiles.toList() .toBlocking().single(); 

Espero que esto ayude.

Las personas que todavía están en @ JDK 7, cuyo IDE aún no detecta automáticamente la fuente JDK 8 y qué probar con la shiny respuesta anterior (y la explicación) de @benjchristensen pueden usar este código JDK 7 desvergonzado y refractado. ¡Felicitaciones a @benjchristensen por una increíble explicación y ejemplo!

 import java.util.List; import rx.Observable; import rx.Subscriber; import rx.functions.Action0; import rx.functions.Func1; import rx.functions.Func2; import rx.schedulers.Schedulers; public class ParallelExecutionExample { public static void main(String[] args) { final long startTime = System.currentTimeMillis(); Observable searchTile = getSearchResults("search term") .doOnSubscribe(new Action0() { @Override public void call() { logTime("Search started ", startTime); } }) .doOnCompleted(new Action0() { @Override public void call() { logTime("Search completed ", startTime); } }); Observable populatedTiles = searchTile.flatMap(new Func1>() { @Override public Observable call(final Tile t) { Observable reviews = getSellerReviews(t.getSellerId()) .doOnCompleted(new Action0() { @Override public void call() { logTime("getSellerReviews[" + t.id + "] completed ", startTime); } }); Observable imageUrl = getProductImage(t.getProductId()) .doOnCompleted(new Action0() { @Override public void call() { logTime("getProductImage[" + t.id + "] completed ", startTime); } }); return Observable.zip(reviews, imageUrl, new Func2() { @Override public TileResponse call(Reviews r, String u) { return new TileResponse(t, r, u); } }) .doOnCompleted(new Action0() { @Override public void call() { logTime("zip[" + t.id + "] completed ", startTime); } }); } }); List allTiles = populatedTiles .toList() .doOnCompleted(new Action0() { @Override public void call() { logTime("All Tiles Completed ", startTime); } }) .toBlocking() .single(); } private static Observable getSearchResults(String string) { return mockClient(new Tile(1), new Tile(2), new Tile(3)); } private static Observable getSellerReviews(int id) { return mockClient(new Reviews()); } private static Observable getProductImage(int id) { return mockClient("image_" + id); } private static void logTime(String message, long startTime) { System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms"); } private static  Observable mockClient(final T... ts) { return Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber s) { try { Thread.sleep(1000); } catch (Exception e) { } for (T t : ts) { s.onNext(t); } s.onCompleted(); } }) .subscribeOn(Schedulers.io()); // note the use of subscribeOn to make an otherwise synchronous Observable async } public static class TileResponse { public TileResponse(Tile t, Reviews r, String u) { // store the values } } public static class Tile { private final int id; public Tile(int i) { this.id = i; } public int getSellerId() { return id; } public int getProductId() { return id; } } public static class Reviews { } }