Escriba un método de extensión Rx “RetryAfter”

En el libro IntroToRx, el autor sugiere escribir un rebash “inteligente” para E / S que reintenta una solicitud de E / S, como una solicitud de red, después de un período de tiempo.

Aquí está el párrafo exacto:

Un método de extensión útil para agregar a su propia biblioteca podría ser un método de “Retroceder y volver a intentar”. Los equipos con los que he trabajado han encontrado que esta característica es útil cuando se realizan E / S, especialmente las solicitudes de red. El concepto es intentar, y en el fracaso esperar por un período de tiempo dado y luego intentarlo nuevamente. Su versión de este método puede tener en cuenta el tipo de Excepción que desea volver a intentar, así como la cantidad máxima de veces para volver a intentar. Es posible que desee alargar el período de espera para que sea menos agresivo en cada bash posterior.

Lamentablemente, no puedo entender cómo escribir este método. 🙁

La clave para esta implementación de un rebash de retroceso es observables diferidos . Un observable diferido no ejecutará su fábrica hasta que alguien se suscriba. E invocará a la fábrica para cada suscripción, por lo que es ideal para nuestro escenario de rebash.

Supongamos que tenemos un método que desencadena una solicitud de red.

public IObservable SomeApiMethod() { ... } 

A los efectos de este pequeño fragmento, definamos el diferido como source

 var source = Observable.Defer(() => SomeApiMethod()); 

Cada vez que alguien se suscribe a la fuente, invocará SomeApiMethod y lanzará una nueva solicitud web. La forma ingenua de volver a intentarlo cada vez que falla utilizaría el operador Retry integrado.

 source.Retry(4) 

Sin embargo, eso no sería muy agradable para la API y no es lo que estás pidiendo. Necesitamos retrasar el lanzamiento de solicitudes entre cada bash. Una forma de hacerlo es con una suscripción retrasada .

 Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4) 

Eso no es ideal ya que agregará la demora incluso en la primera solicitud, arreglemos eso.

 int attempt = 0; Observable.Defer(() => { return ((++attempt == 1) ? source : source.DelaySubscription(TimeSpan.FromSeconds(1))) }) .Retry(4) .Select(response => ...) 

Sin embargo, hacer una pausa por un segundo no es un método de rebash muy bueno, así que cambiemos esa constante para que sea una función que reciba el conteo de rebashs y devuelva un retraso apropiado. El retroceso exponencial es bastante fácil de implementar.

 Func strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2)); ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1))) 

Ya casi hemos terminado, solo necesitamos agregar una forma de especificar para qué excepciones debemos volver a intentarlo. Agreguemos una función que, dada una excepción, nos indique si tiene sentido o no volver a intentarlo, lo llamaremos retryOnError.

Ahora tenemos que escribir un código de aspecto aterrador, pero tengan paciencia conmigo.

 Observable.Defer(() => { return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1))) .Select(item => new Tuple(true, item, null)) .Catch, Exception>(e => retryOnError(e) ? Observable.Throw>(e) : Observable.Return(new Tuple(false, null, e))); }) .Retry(retryCount) .SelectMany(t => t.Item1 ? Observable.Return(t.Item2) : Observable.Throw(t.Item3)) 

Todos estos corchetes angulares están ahí para ordenar una excepción para la cual no debemos volver a intentar más allá de .Retry() . Hemos hecho que el observable interno sea un IObservable> donde el primer bool indica si tenemos una respuesta o una excepción. Si retryOnError indica que debemos volver a intentar una excepción en particular, la observación interna arrojará y eso será recogido por el rebash. El SelectMany simplemente desenvuelve nuestro Tuple y hace que el observable resultante sea IObservable nuevo.

Ver mi esencia con fuente completa y pruebas para la versión final. Tener este operador nos permite escribir nuestro código de rebash de manera muy sucinta

 Observable.Defer(() => SomApiMethod()) .RetryWithBackoffStrategy( retryCount: 4, retryOnError: e => e is ApiRetryWebException ) 

Tal vez estoy simplificando demasiado la situación, pero si observamos la implementación de Reintentar, es simplemente un Observable. Catch sobre un número infinito de observables:

 private static IEnumerable RepeatInfinite(T value) { while (true) yield return value; } public virtual IObservable Retry(IObservable source) { return Observable.Catch(QueryLanguage.RepeatInfinite(source)); } 

Entonces, si tomamos este enfoque, podemos agregar un retraso después del primer rendimiento.

 private static IEnumerable> RepeateInfinite (IObservable source, TimeSpan dueTime) { // Don't delay the first time yield return source; while (true) yield return source.DelaySubscription(dueTime); } public static IObservable RetryAfterDelay(this IObservable source, TimeSpan dueTime) { return RepeateInfinite(source, dueTime).Catch(); } 

Una sobrecarga que capte una excepción específica con un recuento de rebashs puede ser aún más conciso:

 public static IObservable RetryAfterDelay(this IObservable source, TimeSpan dueTime, int count) where TException : Exception { return source.Catch(exception => { if (count <= 0) { return Observable.Throw(exception); } return source.DelaySubscription(dueTime).RetryAfterDelay(dueTime, --count); }); } 

Tenga en cuenta que la sobrecarga aquí está utilizando la recursión. En las primeras apariencias, parecería que es posible una StackOverflowException si el recuento era algo así como Int32.MaxValue. Sin embargo, DelaySubscription utiliza un progtwigdor para ejecutar la acción de suscripción, por lo que el desbordamiento de la stack no sería posible (es decir, mediante el uso de “trampolín”). Sin embargo, creo que esto no es realmente obvio al mirar el código. Podríamos forzar un desbordamiento de stack estableciendo explícitamente el planificador en la sobrecarga DelaySubscription en Scheduler.Immediate, y pasando en TimeSpan.Zero e Int32.MaxValue. Podríamos pasar un progtwigdor no inmediato para express nuestra intención un poco más explícitamente, por ejemplo:

 return source.DelaySubscription(dueTime, TaskPoolScheduler.Default).RetryAfterDelay(dueTime, --count); 

ACTUALIZACIÓN: Sobrecarga agregada para tomar en un progtwigdor específico.

 public static IObservable RetryAfterDelay( this IObservable source, TimeSpan retryDelay, int retryCount, IScheduler scheduler) where TException : Exception { return source.Catch( ex => { if (retryCount <= 0) { return Observable.Throw(ex); } return source.DelaySubscription(retryDelay, scheduler) .RetryAfterDelay(retryDelay, --retryCount, scheduler); }); } 

Aquí está el que estoy usando:

 public static IObservable DelayedRetry(this IObservable src, TimeSpan delay) { Contract.Requires(src != null); Contract.Ensures(Contract.Result>() != null); if (delay == TimeSpan.Zero) return src.Retry(); return src.Catch(Observable.Timer(delay).SelectMany(x => src).Retry()); } 

Basado en la respuesta de Markus, escribí lo siguiente:

 public static class ObservableExtensions { private static IObservable BackOffAndRetry( this IObservable source, Func strategy, Func retryOnError, int attempt) { return Observable .Defer(() => { var delay = attempt == 0 ? TimeSpan.Zero : strategy(attempt); var s = delay == TimeSpan.Zero ? source : source.DelaySubscription(delay); return s .Catch(e => { if (retryOnError(attempt, e)) { return source.BackOffAndRetry(strategy, retryOnError, attempt + 1); } return Observable.Throw(e); }); }); } public static IObservable BackOffAndRetry( this IObservable source, Func strategy, Func retryOnError) { return source.BackOffAndRetry(strategy, retryOnError, 0); } } 

Me gusta más porque

  • no modifica los attempts pero usa recursividad.
  • No utiliza retries pero pasa el número de bashs de retryOnError

Aquí hay otra implementación ligeramente diferente que surgió mientras estudiaba cómo Rxx lo hace. Por lo tanto, es en gran medida una versión reducida del enfoque de Rxx.

La firma es ligeramente diferente de la versión de Markus. Especifica un tipo de Excepción para volver a intentarlo, y la estrategia de demora toma la excepción y el recuento de rebash, por lo que podría tener retrasos más largos para cada rebash sucesivo, etc.

No puedo garantizar que sea una prueba de errores, o el mejor enfoque, pero parece funcionar.

 public static IObservable RetryWithDelay(this IObservable source, Func delayFactory, IScheduler scheduler = null) where TException : Exception { return Observable.Create(observer => { scheduler = scheduler ?? Scheduler.CurrentThread; var disposable = new SerialDisposable(); int retryCount = 0; var scheduleDisposable = scheduler.Schedule(TimeSpan.Zero, self => { var subscription = source.Subscribe( observer.OnNext, ex => { var typedException = ex as TException; if (typedException != null) { var retryDelay = delayFactory(typedException, ++retryCount); self(retryDelay); } else { observer.OnError(ex); } }, observer.OnCompleted); disposable.Disposable = subscription; }); return new CompositeDisposable(scheduleDisposable, disposable); }); } 

Aquí está el que se me ocurrió.

No quería concatenar los elementos de los rebashs individuales en una secuencia, sino que emitía la secuencia fuente como un todo en cada rebash, por lo que el operador devuelve un IObservable> . Si no se desea, simplemente se puede volver a Switch() a una secuencia.

(Antecedentes: en mi caso de uso, la fuente es una secuencia caliente y caliente, que GroupByUntil que aparece un elemento que cierra el grupo. Si este elemento se pierde entre dos bashs, el grupo nunca se cierra, lo que da como resultado una pérdida de memoria. de secuencias permite agrupar solamente en las secuencias internas (o manejo de excepciones o …))

 ///  /// Repeats  in individual windows, with  time in between. ///  public static IObservable> RetryAfter(this IObservable source, TimeSpan interval, IScheduler scheduler = null) { if (scheduler == null) scheduler = Scheduler.Default; return Observable.Create>(observer => { return scheduler.Schedule(self => { observer.OnNext(Observable.Create(innerObserver => { return source.Subscribe( innerObserver.OnNext, ex => { innerObserver.OnError(ex); scheduler.Schedule(interval, self); }, () => { innerObserver.OnCompleted(); scheduler.Schedule(interval, self); }); })); }); }); }