Rxjava Android cómo usar el operador Zip

Estoy teniendo problemas para entender el operador zip en RxJava para mi proyecto de Android. Problema Necesito poder enviar una solicitud de red para subir un video Luego necesito enviar una solicitud de red para subir una imagen para ir con ella Finalmente, tengo que agregar una descripción y usar las respuestas de las dos solicitudes anteriores para subir la URL de ubicación del video y la imagen junto con la descripción de mi servidor.

Supuse que el operador zip sería perfecto para esta tarea, ya que entendía que podíamos tomar la respuesta de dos observables (solicitudes de video e imágenes) y usarlas para mi tarea final. Pero no puedo hacer que esto ocurra como lo visualizo.

Estoy buscando a alguien que responda cómo esto se puede hacer conceptualmente con un poco de código psuedo. Gracias

El operador zip empareja estrictamente los elementos emitidos de los observables. Espera que lleguen ambos (o más) elementos y luego los fusiona. Entonces sí, esto sería adecuado para sus necesidades.

Func2 para encadenar el resultado de los dos primeros observables. Tenga en cuenta que este enfoque sería más simple si usa Retrofit ya que su interfaz api puede devolver un valor observable. De lo contrario, necesitarías crear tu propio observable.

 // assuming each observable returns response in the form of String Observable movOb = Observable.create(...); // if you use Retrofit Observable picOb = RetrofitApiManager.getService().uploadPic(...), Observable.zip(movOb, picOb, new Func2() { @Override public MyResult call(String movieUploadResponse, String picUploadResponse) { // analyze both responses, upload them to another server // and return this method with a MyResult type return myResult; } } ) // continue chaining this observable with subscriber // or use it for something else 

Paso a paso: -> Primero definamos nuestro objeto Retrofit para acceder a la API de Github, luego configuremos dos observables para las dos solicitudes de red anteriores:

 Retrofit repo = new Retrofit.Builder() .baseUrl("https://api.github.com") .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(); Observable userObservable = repo .create(GitHubUser.class) .getUser(loginName) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()); Observable eventsObservable = repo .create(GitHubEvents.class) .listEvents(loginName) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()); 

Las interfaces de Retrofit son bastante simples:

 public interface GitHubUser { @GET("users/{user}") Observable getUser(@Path("user") String user); } public interface GitHubEvents { @GET("users/{user}/events") Observable listEvents(@Path("user") String user); } 

Últimamente usamos el método zip de RxJava para combinar nuestros dos Observables y esperar a que se completen antes de crear un nuevo Observable.

 Observable combined = Observable.zip(userObservable, eventsObservable, new Func2() { @Override public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) { return new UserAndEvents(jsonObject, jsonElements); } }); 

¿Cuál es el UserAndEvents? Es solo un POJO simple para combinar los dos objetos:

 public class UserAndEvents { public UserAndEvents(JsonObject user, JsonArray events) { this.events = events; this.user = user; } public JsonArray events; public JsonObject user; } 

Finalmente, llamemos al método de suscripción en nuestro nuevo Observable combinado:

 combined.subscribe(new Subscriber() { ... @Override public void onNext(UserAndEvents o) { // You can access the results of the // two observabes via the POJO now } }); 

Un pequeño ejemplo :

 Observable stringObservable1 = Observable.just("Hello", "World"); Observable stringObservable2 = Observable.just("Bye", "Friends"); Observable.zip(stringObservable1, stringObservable2, new BiFunction() { @Override public String apply(@NonNull String s, @NonNull String s2) throws Exception { return s + " - " + s2; } }).subscribe(new Consumer() { @Override public void accept(String s) throws Exception { System.out.println(s); } }); 

Esto se imprimirá:

 Hello - Bye World - Friends 

Aquí tengo un ejemplo que hice usando Zip de manera asíncrona, por si acaso tienes curiosidad

  /** * Since every observable into the zip is created to subscribeOn a diferent thread, it´s means all of them will run in parallel. * By default Rx is not async, only if you explicitly use subscribeOn. */ @Test public void testAsyncZip() { scheduler = Schedulers.newThread(); scheduler1 = Schedulers.newThread(); scheduler2 = Schedulers.newThread(); long start = System.currentTimeMillis(); Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2) .concat(s3)) .subscribe(result -> showResult("Async in:", start, result)); } /** * In this example the the three observables will be emitted sequentially and the three items will be passed to the pipeline */ @Test public void testZip() { long start = System.currentTimeMillis(); Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2) .concat(s3)) .subscribe(result -> showResult("Sync in:", start, result)); } public void showResult(String transactionType, long start, String result) { System.out.println(result + " " + transactionType + String.valueOf(System.currentTimeMillis() - start)); } public Observable obString() { return Observable.just("") .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> "Hello"); } public Observable obString1() { return Observable.just("") .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> " World"); } public Observable obString2() { return Observable.just("") .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> "!"); } public Observable obAsyncString() { return Observable.just("") .observeOn(scheduler) .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> "Hello"); } public Observable obAsyncString1() { return Observable.just("") .observeOn(scheduler1) .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> " World"); } public Observable obAsyncString2() { return Observable.just("") .observeOn(scheduler2) .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> "!"); } 

Puedes ver más ejemplos aquí https://github.com/politrons/reactive

He estado buscando una respuesta simple sobre cómo usar el operador Zip, y qué hacer con los Observables que creo para pasarlos a él, me preguntaba si debería llamar a subscribe () para cada elemento observable o no, no de estos las respuestas fueron fáciles de encontrar, tuve que resolverlo solo, así que aquí hay un ejemplo simple para usar el operador Zip en 2 Observables:

 @Test public void zipOperator() throws Exception { List indexes = Arrays.asList(0, 1, 2, 3, 4); List letters = Arrays.asList("a", "b", "c", "d", "e"); Observable indexesObservable = Observable.fromIterable(indexes); Observable lettersObservable = Observable.fromIterable(letters); Observable.zip(indexesObservable, lettersObservable, mergeEmittedItems()) .subscribe(printMergedItems()); } @NonNull private BiFunction mergeEmittedItems() { return new BiFunction() { @Override public String apply(Integer index, String letter) throws Exception { return "[" + index + "] " + letter; } }; } @NonNull private Consumer printMergedItems() { return new Consumer() { @Override public void accept(String s) throws Exception { System.out.println(s); } }; } 

el resultado impreso es:

 [0] a [1] b [2] c [3] d [4] e 

las respuestas finales a las preguntas que estaban en mi cabeza eran las siguientes

los Observables pasados ​​al método zip () solo necesitan ser creados, no necesitan tener ningún suscriptor, solo crearlos es suficiente … si quiere que cualquier observable se ejecute en un progtwigdor, puede especificar esto para ese Observable … también probé el operador zip () en Observables donde deberían esperar el resultado, y el Consumble del zip () se activó solo cuando ambos resultados estaban listos (que es el comportamiento esperado)

zip operador zip permite componer un resultado a partir de resultados de dos observaciones diferentes.

Tendrás que dar am lambda que creará un resultado a partir de los datos emitidos por cada observable.

 Observable movies = ... Observable picture = ... Observable response = movies.zipWith(picture, (movie, pic) -> { return new Response("description", movie.getName(), pic.getUrl()); }); 

Esta es mi implementación usando Single.zip y rxJava2

Traté de hacerlo tan fácil de entender como sea posible

 // // API Client Interface // @GET(ServicesConstants.API_PREFIX + "questions/{id}/") Single>>> getBaseQuestions(@Path("id") int personId); @GET(ServicesConstants.API_PREFIX + "physician/{id}/") Single>>> getPhysicianInfo(@Path("id") int personId); // // API middle layer - NOTE: I had feedback that the Single.create is not needed (but I haven't yet spent the time to improve it) // public Single> getPhysicianInfo(int personId) { return Single.create(subscriber -> { apiClient.getPhysicianInfo(appId) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .subscribe(response -> { ResponseGeneric> responseBody = response.body(); if(responseBody != null && responseBody.statusCode == 1) { if (!subscriber.isDisposed()) subscriber.onSuccess(responseBody.data); } else if(response.body() != null && response.body().status != null ){ if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.body().status)); } else { if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message())); } }, throwable -> { throwable.printStackTrace(); if(!subscriber.isDisposed()) subscriber.onError(throwable); }); }); } public Single> getHealthQuestions(int personId){ return Single.create(subscriber -> { apiClient.getBaseQuestions(personId) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .subscribe(response -> { ResponseGeneric> responseBody = response.body(); if(responseBody != null && responseBody.data != null) { if (!subscriber.isDisposed()) subscriber.onSuccess(response.body().data); } else if(response.body() != null && response.body().status != null ){ if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.body().status)); } else { if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message())); } }, throwable -> { throwable.printStackTrace(); if(!subscriber.isDisposed()) subscriber.onError(throwable); }); }); } //please note that ResponseGeneric is just an outer wrapper of the returned data - common to all API's in this project public class ResponseGeneric { @SerializedName("Status") public String status; @SerializedName("StatusCode") public float statusCode; @SerializedName("Data") public T data; } // // API end-use layer - this gets close to the UI so notice the oberver is set for main thread // private static class MergedResponse{// this is just a POJO to store all the responses in one object public List listQuestions; public List listPhysicians; public MergedResponse(List listQuestions, List listPhysicians){ this.listQuestions = listQuestions; this.listPhysicians = listPhysicians; } } // example of Single.zip() - calls getHealthQuestions() and getPhysicianInfo() from API Middle Layer private void downloadHealthQuestions(int personId) { addRxSubscription(Single .zip(getHealthQuestions(personId), getPhysicianInfo(personId), MergedResponse::new) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(response -> { if(response != null) { Timber.i(" - total health questions downloaded %d", response.listQuestions.size()); Timber.i(" - physicians downloaded %d", response.listPhysicians.size()); if (response.listPhysicians != null && response.listPhysicians.size()>0) { // do your stuff to process response data } if (response.listQuestions != null && response.listQuestions.size()>0) { // do your stuff to process response data } } else { // process error - show message } }, error -> { // process error - show network error message })); } 

rxjava el zip de rxjava con Java 8 :

 Observable movies = ... Observable picture = ... Observable response = Observable.zip(movies, picture, ZipResponse::new); class ZipResponse { private MovieResponse movieResponse; private PictureResponse pictureResponse; ZipResponse(MovieResponse movieResponse, PictureResponse pictureResponse) { this.movieResponse = movieResponse; this.pictureResponse = pictureResponse; } public MovieResponse getMovieResponse() { return movieResponse; } public void setMovieResponse(MovieResponse movieResponse) { this.movieResponse= movieResponse; } public PictureResponse getPictureResponse() { return pictureResponse; } public void setPictureResponse(PictureResponse pictureResponse) { this.pictureResponse= pictureResponse; } }