Una forma de enviar eventos almacenados en intervalos regulares

Lo que estoy tratando de lograr es almacenar los eventos entrantes de algún IObservable (vienen en ráfagas) y liberarlos más, pero uno a uno, en intervalos pares. Me gusta esto:

-oo-ooo-oo------------------oooo-oo-o--------------> -o--o--o--o--o--o--o--------o--o--o--o--o--o--o----> 

Como soy bastante nuevo en Rx , no estoy seguro de si ya hay un Sujeto o un operador que haga esto. ¿Tal vez se puede hacer por composición?

actualizar:

Gracias a Richard Szalay por señalar al operador de Drain , encontré otro ejemplo del uso del operador de James Miles of Drain. Así es como logré que funcionara en una aplicación de WPF:

  .Drain(x => { Process(x); return Observable.Return(new Unit()) .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher ); }).Subscribe(); 

Me divertí un poco, porque al omitir el parámetro del planificador, la aplicación se bloquea en el modo de depuración sin que se muestre ninguna excepción (necesito aprender a manejar las excepciones en Rx). El método de proceso modifica el estado de la interfaz de usuario directamente, pero supongo que es bastante simple hacer un IObservable (utilizando un objeto IS?).

actualizar:

Mientras tanto, he estado experimentando con ISubject, la clase de abajo hace lo que quería: deja salir Ts de manera puntual:

 public class StepSubject : ISubject { IObserver subscriber; Queue queue = new Queue(); MutableDisposable cancel = new MutableDisposable(); TimeSpan interval; IScheduler scheduler; bool idle = true; public StepSubject(TimeSpan interval, IScheduler scheduler) { this.interval = interval; this.scheduler = scheduler; } void Step() { T next; lock (queue) { idle = queue.Count == 0; if (!idle) next = queue.Dequeue(); } if (!idle) { cancel.Disposable = scheduler.Schedule(Step, interval); subscriber.OnNext(next); } } public void OnNext(T value) { lock (queue) queue.Enqueue(value); if (idle) cancel.Disposable = scheduler.Schedule(Step); } public IDisposable Subscribe(IObserver observer) { subscriber = observer; return cancel; } } 

Esta implementación ingenua se elimina de OnCompleted y OnError para mayor claridad, también solo se permite la suscripción individual.

En realidad es más complicado de lo que parece.

El uso de Delay no funciona porque los valores seguirán ocurriendo a granel, solo un poco retrasado.

El uso de Interval con CombineLatest o Zip no funciona, ya que el primero causará que se CombineLatest valores de origen y el segundo almacenará los valores de intervalo.

Creo que el nuevo operador de Drain ( agregado en 1.0.2787.0 ), combinado con Delay debería hacer el truco:

 source.Drain(x => Observable.Empty().Delay(TimeSpan.FromSeconds(1)).StartWith(x)); 

El operador Drain funciona como SelectMany , pero espera hasta que la salida anterior se complete antes de llamar al selector con el siguiente valor. Todavía no es exactamente lo que buscas (el primer valor en un bloque también se retrasará), pero está cerca: el uso anterior coincide con tu diagtwig de mármol ahora.

Editar: Aparentemente, el Drain en el marco no funciona como SelectMany . Pediré algunos consejos en los foros oficiales. Mientras tanto, aquí hay una implementación de Drain que hace lo que buscas:

Editar 09/11: Se corrigieron los errores en la implementación y el uso actualizado para que coincida con el diagtwig de mármol solicitado.

 public static class ObservableDrainExtensions { public static IObservable Drain(this IObservable source, Func> selector) { return Observable.Defer(() => { BehaviorSubject queue = new BehaviorSubject(new Unit()); return source .Zip(queue, (v, q) => v) .SelectMany(v => selector(v) .Do(_ => { }, () => queue.OnNext(new Unit())) ); }); } } 

Para completar, aquí hay una versión alternativa (más compacta) del método Drain () sugerido por Richard:

 public static IObservable SelectManySequential( this IObservable source, Func> selector ) { return source .Select(x => Observable.Defer(() => selector(x))) .Concat(); } 

Ver el hilo Drain + SelectMany =? en el foro de Rx.

Actualización: me di cuenta de que la sobrecarga de Concat () que utilicé fue una de mis extensiones de Rx personales que (todavía) forman parte del framework. Lo siento por este error … Por supuesto, esto hace que mi solución sea menos elegante de lo que pensaba.

Sin embargo, para completar, publico aquí mi sobrecarga del método de extensión Conact ():

 public static IObservable Concat(this IObservable> source) { return Observable.CreateWithDisposable(o => { var lockCookie = new Object(); bool completed = false; bool subscribed = false; var waiting = new Queue>(); var pendingSubscription = new MutableDisposable(); Action errorHandler = e => { o.OnError(e); pendingSubscription.Dispose(); }; Func, IDisposable> subscribe = null; subscribe = (ob) => { subscribed = true; return ob.Subscribe( o.OnNext, errorHandler, () => { lock (lockCookie) { if (waiting.Count > 0) pendingSubscription.Disposable = subscribe(waiting.Dequeue()); else if (completed) o.OnCompleted(); else subscribed = false; } } ); }; return new CompositeDisposable(pendingSubscription, source.Subscribe( n => { lock (lockCookie) { if (!subscribed) pendingSubscription.Disposable = subscribe(n); else waiting.Enqueue(n); } }, errorHandler , () => { lock (lockCookie) { completed = true; if (!subscribed) o.OnCompleted(); } } ) ); }); } 

Y ahora golpeándome con mis propias armas: el mismo método Concat () podría escribirse mucho más elegante en la shiny manera de Richard Szalay:

 public static IObservable Concat(this IObservable> source) { return Observable.Defer(() => { BehaviorSubject queue = new BehaviorSubject(new Unit()); return source .Zip(queue, (v, q) => v) .SelectMany(v => v.Do(_ => { }, () => queue.OnNext(new Unit())) ); }); } 

Entonces el crédito le pertenece a Richard. 🙂

He aquí cómo lo hice, simplemente usando una cola explícita (ReactiveCollection es simplemente una versión elegante de ObservableCollection – ReactiveCollection.ItemsAdded OnNext de WPF para cada elemento agregado, como se puede imaginar):

https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ReactiveCollection.cs#L309

 public static ReactiveCollection CreateCollection(this IObservable FromObservable, TimeSpan? WithDelay = null) { var ret = new ReactiveCollection(); if (WithDelay == null) { FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add); return ret; } // On a timer, dequeue items from queue if they are available var queue = new Queue(); var disconnect = Observable.Timer(WithDelay.Value, WithDelay.Value) .ObserveOn(RxApp.DeferredScheduler).Subscribe(_ => { if (queue.Count > 0) { ret.Add(queue.Dequeue()); } }); // When new items come in from the observable, stuff them in the queue. // Using the DeferredScheduler guarantees we'll always access the queue // from the same thread. FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue); // This is a bit clever - keep a running count of the items actually // added and compare them to the final count of items provided by the // Observable. Combine the two values, and when they're equal, // disconnect the timer ret.ItemsAdded.Scan0(0, ((acc, _) => acc+1)).Zip(FromObservable.Aggregate(0, (acc,_) => acc+1), (l,r) => (l == r)).Where(x => x != false).Subscribe(_ => disconnect.Dispose()); return ret; }