Observable individual con múltiples suscriptores

Tengo un Observable<<List> getFoo() que se crea a partir de un servicio de .getFoo() y después de llamar al método .getFoo() , necesito compartirlo con varios suscriptores. .share() embargo, al llamar al método .share() , hace que se vuelva a ejecutar la llamada de red. Replay Operator tampoco funciona. Sé que una posible solución podría ser .cache() , pero no sé por qué se produce este comportamiento.

 // Create an instance of our GitHub API interface. Retrofit retrofit = new Retrofit.Builder() .baseUrl(API_URL) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(); // Create a call instance for looking up Retrofit contributors. Observable<List> testObservable = retrofit .create(GitHub.class) .contributors("square", "retrofit") .share(); Subscription subscription1 = testObservable .subscribe(new Subscriber<List>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List contributors) { System.out.println(contributors); } }); Subscription subscription2 = testObservable .subscribe(new Subscriber<List>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List contributors) { System.out.println(contributors + " -> 2"); } }); subscription1.unsubscribe(); subscription2.unsubscribe(); 

El código anterior puede reproducir el comportamiento mencionado anteriormente. Puede depurarlo y ver que las Listas recibidas pertenecen a una MemoryAddress diferente.

También he considerado ConnectableObservables como una posible solución, pero esto me exige llevar el observable original y llamar .connect() cada vez que deseo agregar un nuevo suscriptor.

Este tipo de comportamiento con .share() funcionaba bien hasta Retrofit 1.9. Dejó de funcionar en Retrofit 2 – beta. Todavía no lo he probado con la versión de lanzamiento de Retrofit 2, que se lanzó hace algunas horas.

EDITAR: 01/02/2017

Para futuros lectores, ¡he escrito un artículo aquí explicando más sobre el caso!

Parece que (implícitamente) estás .share() tu ConnectedObservable devuelto por .share() a un Observable normal. Es posible que desee leer sobre la diferencia entre observables calientes y fríos.

Tratar

 ConnectedObservable> testObservable = retrofit .create(GitHub.class) .contributors("square", "retrofit") .share(); Subscription subscription1 = testObservable .subscribe(new Subscriber>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List contributors) { System.out.println(contributors); } }); Subscription subscription2 = testObservable .subscribe(new Subscriber>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List contributors) { System.out.println(contributors + " -> 2"); } }); testObservable.connect(); subscription1.unsubscribe(); subscription2.unsubscribe(); 

Editar: No necesita llamar a connect() cada vez que quiere una nueva suscripción, solo la necesita para iniciar la observación. Supongo que podría usar replay() para asegurarse de que todos los subscriptores posteriores obtengan todos los artículos producidos

 ConnectedObservable> testObservable = retrofit .create(GitHub.class) .contributors("square", "retrofit") .share() .replay() 

Después de consultar con el desarrollador de RxJava, Dávid Karnok, me gustaría proponer una explicación completa de lo que estaba sucediendo aquí.

share() se define como publish().refCount() , es decir, la fuente Observable se transforma primero en un ConnectableObservable por publish() pero en lugar de tener que llamar a connect() “manualmente” esa parte es manejada por refCount() . En particular, refCount llamará a connect() en el ConnectableObservable cuando reciba la primera suscripción; luego, mientras haya al menos un suscriptor, permanecerá suscrito; y, finalmente, cuando la cantidad de suscriptores caiga a 0, cancelará la suscripción. Con Observables fríos , como los devueltos por Retrofit, esto detendrá cualquier cálculo en ejecución.

Si, después de uno de estos ciclos, aparece otro suscriptor, refCount volverá a llamar a connect y, por lo tanto, activará una nueva suscripción a la fuente Observable. En este caso, activará otra solicitud de red.

Ahora, esto generalmente no se hizo evidente con Retrofit 1 (y de hecho cualquier versión antes de este commit ), porque estas versiones anteriores de Retrofit movieron todas las solicitudes de red a otro thread por defecto. Esto usualmente significaba que todas sus llamadas a subscribe() se realizarían mientras la primera solicitud / Observable aún se estaba ejecutando y, por lo tanto, los nuevos Subscriber simplemente se agregarían a la refCount y, por lo tanto, no desencadenarían solicitudes / Observables adicionales.

Las versiones más recientes de Retrofit, sin embargo, ya no mueven el trabajo a otro subproceso de forma predeterminada; debe hacerlo explícitamente llamando, por ejemplo, subscribeOn(Schedulers.io()) . Si no lo hace, todo permanecerá en el hilo actual, lo que significa que la segunda subscribe() solo ocurrirá después de que el primer Observable haya llamado a onCompleted y, por lo tanto, después de que todos los Subscribers hayan cancelado la suscripción y todo se cierre. Ahora, como vimos en el primer párrafo, cuando se llama a la segunda subscribe() , share() no tiene otra opción que causar otra Subscription a la fuente Observable y desencadenar otra solicitud de red.

Por lo tanto, para volver al comportamiento que está acostumbrado de Retrofit 1, simplemente agregue subscribeOn(Schedulers.io()) .

Esto debería dar como resultado que solo se ejecute la solicitud de red, la mayoría de las veces. Sin embargo, en principio, aún podría recibir múltiples solicitudes (y siempre podría hacerlo con Retrofit 1), pero solo si sus solicitudes de red son extremadamente rápidas y / o las llamadas subscribe() suceden con un retraso considerable, de modo que, una vez más, la primera la solicitud finaliza cuando ocurre la segunda subscribe() .

Por lo tanto, Dávid sugiere usar cache() (pero tiene los inconvenientes que mencionó) o replay().autoConnect() . De acuerdo con estas notas de la versión , autoConnect funciona solo como la primera mitad de refCount , o más precisamente, es

comportamiento similar a refCount (), excepto que no se desconecta cuando se pierden los suscriptores.

Esto significa que la solicitud solo se activará cuando se produzca la primera subscribe() pero luego todos los Subscriber posteriores recibirán todos los elementos emitidos, independientemente de si hubo, en cualquier momento intermedio, 0 suscriptores.