modernización con rxjava manejando excepciones de red globalmente

Estoy tratando de manejar las excepciones en la aplicación a nivel global, por lo que la actualización arroja un error, la atrapo en alguna clase específica con lógica para manejar esos errores.

Tengo una interfaz

@POST("/token") AuthToken refreshToken(@Field("grant_type") String grantType, @Field("refresh_token") String refreshToken); 

y observables

 /** * Refreshes auth token * * @param refreshToken * @return */ public Observable refreshToken(String refreshToken) { return Observable.create((Subscriber subscriber) -> { try { subscriber.onNext(apiManager.refreshToken(REFRESH_TOKEN, refreshToken)); subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } }).subscribeOn(Schedulers.io()); } 

Cuando obtengo 401 del servidor (token no válido u otro error relacionado con la red) quiero actualizar el token y repetir la llamada al rest. ¿Hay alguna manera de hacer esto con rxjava para todas las llamadas de descanso con algún tipo de observable que capture este error globalmente, lo maneje y repita la llamada que lo arrojó?

Por ahora estoy usando subject para detectar el error en .subscribe () como este

 private static BehaviorSubject errorEvent = BehaviorSubject.create(); public static BehaviorSubject getErrorEvent() { return errorEvent; } 

y en alguna llamada

 getCurrentUser = userApi.getCurrentUser().observeOn(AndroidSchedulers.mainThread()) .subscribe( (user) -> { this.user = user; }, errorEvent::onNext ); 

luego en mi actividad principal me suscribo a ese sujeto de comportamiento y analizo el error

 SomeApi.getErrorEvent().subscribe( (e) -> { //parse the error } ); 

pero no puedo repetir la llamada del observable que arroja el error.

Necesita utilizar el operador en onErrorResumeNext(Func1 resumeFunction) , mejor explicado en la wiki oficial :

El método onErrorResumeNext () devuelve un Observable que refleja el comportamiento de la fuente Observable, a menos que ese Observable invoca onError () en cuyo caso, en lugar de propagar ese error al Suscriptor, onErrorResumeNext () en su lugar comenzará a duplicar una copia de seguridad. Observable

En tu caso, pondría algo como esto:

 getCurrentUser = userApi.getCurrentUser() .onErrorResumeNext(refreshTokenAndRetry(userApi.getCurrentUser())) .observeOn(AndroidSchedulers.mainThread()) .subscribe(...) 

dónde:

  private  Func1> refreshTokenAndRetry(final Observable toBeResumed) { return new Func1>() { @Override public Observable call(Throwable throwable) { // Here check if the error thrown really is a 401 if (isHttp401Error(throwable)) { return refreshToken().flatMap(new Func1>() { @Override public Observable call(AuthToken token) { return toBeResumed; } }); } // re-throw this error because it's not recoverable from here return Observable.error(throwable); } }; } 

Tenga en cuenta también que esta función se puede utilizar fácilmente en otros casos, ya que no se tipea con los valores reales emitidos por el reanudable Observable.

 @Override public Observable> messages(String accountId, int messageType) { return mMessageService.getLikeMessages(messageType) .onErrorResumeNext(mTokenTrick. refreshTokenAndRetry(mMessageService.getLikeMessages(messageType))); }