¿Por qué no se recomiendan los sujetos en Extensiones reactivas .NET?

Actualmente me estoy familiarizando con el marco de Extensiones Reactivas para .NET y estoy trabajando en los diversos recursos de introducción que he encontrado (principalmente http://www.introtorx.com )

Nuestra aplicación implica una serie de interfaces de hardware que detectan marcos de red, estos serán mis IObservables, luego tengo una variedad de componentes que consumirán esos marcos o realizarán alguna forma de transformación en los datos y producirán un nuevo tipo de marco. También habrá otros componentes que necesitan mostrar cada enésimo marco, por ejemplo. Estoy convencido de que Rx va a ser útil para nuestra aplicación, sin embargo, estoy luchando con los detalles de implementación para la interfaz IObserver.

La mayoría (si no todos) de los recursos que he estado leyendo han dicho que no debería implementar la interfaz IObservable por mi cuenta, pero uso una de las funciones o clases proporcionadas. De mi investigación parece que crear un Subject me proporcionaría lo que necesito, tendría mi único hilo que lee datos de la interfaz de hardware y luego llama a la función OnNext de mi instancia Subject . Los diferentes componentes de IObserver recibirían sus notificaciones de ese Sujeto.

Mi confusión viene del consejo dado en el apéndice de este tutorial donde dice:

Evite el uso de los tipos de sujeto. Rx es efectivamente un paradigma de progtwigción funcional. Usar sujetos significa que ahora estamos gestionando el estado, que es potencialmente mutante. Lidiar con el estado de mutación y la progtwigción asincrónica al mismo tiempo es muy difícil de resolver. Además, muchos de los operadores (métodos de extensión) se han escrito cuidadosamente para garantizar que se mantenga una vida útil y correcta de las suscripciones y secuencias; cuando presentas sujetos, puedes romper esto. Las versiones futuras también pueden ver una degradación significativa del rendimiento si usa explícitamente sujetos.

Mi aplicación es bastante crítica para el rendimiento, obviamente voy a probar el rendimiento del uso de los patrones Rx antes de que ingrese al código de producción; sin embargo, me preocupa que esté haciendo algo que va en contra del espíritu del marco de Rx al usar la clase Asunto y que una versión futura del marco perjudicará el rendimiento.

¿Hay una mejor manera de hacer lo que quiero? El hilo de interrogación de hardware se ejecutará continuamente si hay observadores o no (de lo contrario, el buffer de HW realizará una copia de seguridad), por lo que esta es una secuencia muy candente. Necesito pasar los marcos recibidos a múltiples observadores.

Cualquier consejo sería muy apreciado.

Ok, si ignoramos mis formas dogmáticas e ignoro “los temas son buenos / malos”, todos juntos. Echemos un vistazo al espacio problemático.

Apuesto a que tienes 1 de 2 estilos de sistema al que debes ingratarte.

  1. El sistema genera un evento o una callback cuando llega un mensaje
  2. Necesita sondear el sistema para ver si hay algún mensaje para procesar

Para la opción 1, es fácil, simplemente lo envolvemos con el método FromEvent apropiado y hemos terminado. ¡Al bar!

Para la opción 2, ahora debemos considerar cómo sondeamos esto y cómo hacerlo de manera efciente. También cuando obtenemos el valor, ¿cómo lo publicamos?

Me imagino que querrías un hilo dedicado para las encuestas. No querrás que otro codificador martillee el ThreadPool / TaskPool y te deje en una situación de inanición en ThreadPool. Alternativamente, no quiere la molestia del cambio de contexto (supongo). Así que supongamos que tenemos nuestro propio hilo, probablemente tendremos algún tipo de ciclo While / Sleep en el que nos sentemos a sondear. Cuando el cheque encuentra algunos mensajes, los publicamos. Bueno, todo esto suena perfecto para Observable.Create. Ahora es probable que no podamos usar un bucle While ya que eso no nos permitirá devolver un desechable para permitir la cancelación. Afortunadamente, usted ha leído todo el libro, ¡así que es inteligente con la progtwigción recursiva!

Imagino que algo así podría funcionar. #No probado

 public class MessageListener { private readonly IObservable _messages; private readonly IScheduler _scheduler; public MessageListener() { _scheduler = new EventLoopScheduler(); var messages = ListenToMessages() .SubscribeOn(_scheduler) .Publish(); _messages = messages; messages.Connect(); } public IObservable Messages { get {return _messages;} } private IObservable ListenToMessages() { return Observable.Create(o=> { return _scheduler.Schedule(recurse=> { try { var messages = GetMessages(); foreach (var msg in messages) { o.OnNext(msg); } recurse(); } catch (Exception ex) { o.OnError(ex); } }); }); } private IEnumerable GetMessages() { //Do some work here that gets messages from a queue, // file system, database or other system that cant push // new data at us. // //This may return an empty result when no new data is found. } } 

La razón por la que no me gustan los temas, es que generalmente es un caso en el que el desarrollador no tiene un diseño claro del problema. Hackear un tema, meterlo aquí y en todas partes, y luego dejar que el pobre apoyo se adivine en la WTF. Cuando utiliza los métodos Crear / Generar etc. está localizando los efectos en la secuencia. Puedes verlo todo en un método y sabes que nadie más está generando un desagradable efecto secundario. Si veo un campo de temas, ahora tengo que ir a buscar todos los lugares de una clase que se está utilizando. Si algún MFer expone uno públicamente, entonces todas las apuestas están desactivadas, ¡quién sabe cómo se usa esta secuencia! Async / Concurrency / Rx es difícil. No es necesario que sea más difícil al permitir que los efectos secundarios y la progtwigción de causalidad giren aún más.

En general, debes evitar usar Subject , sin embargo, para lo que estás haciendo aquí, creo que funcionan bastante bien. Hice una pregunta similar cuando me encontré con el mensaje “evitar temas” en los tutoriales de Rx.

Para citar a Dave Sexton (de Rxx)

“Los sujetos son los componentes con estado de Rx. Son útiles para cuando necesita crear un evento observable como un campo o una variable local”.

Tiendo a usarlos como el punto de entrada a Rx. Entonces, si tengo algún código que necesita decir ‘algo pasó’ (como lo hizo), usaría un Subject y llamaría a OnNext . Luego exponga eso como un IObservable para que otros se suscriban (puede usar AsObservable() en su tema para asegurarse de que nadie pueda enviar contenido a un Sujeto y estropearlo).

También podría lograr esto con un evento .NET y usar FromEventPattern , pero si solo voy a convertir el evento en un IObservable todos modos, no veo el beneficio de tener un evento en lugar de un Subject (lo que podría significar que Me falta algo aquí)

Sin embargo, lo que debe evitar con mucha fuerza es suscribirse a un IObservable con un Subject , es decir, no pasar un Subject al método IObservable.Subscribe .

A menudo, cuando administra un Subject, en realidad solo está reimplementando funciones que ya están en Rx, y probablemente de una forma no tan robusta, simple y extensible.

Cuando intenta adaptar un flujo de datos asíncronos a Rx (o crea un flujo de datos asíncronos a partir de uno que no es actualmente asincrónico), los casos más comunes son por lo general:

  • La fuente de datos es un evento : como dice Lee, este es el caso más simple: use FromEvent y diríjase al pub.

  • La fuente de datos proviene de una operación síncrona y desea actualizaciones sondeadas (por ejemplo, un servicio web o una llamada a la base de datos): en este caso podría usar el enfoque sugerido de Lee, o para casos simples, podría usar algo como Observable.Interval.Select(_ => ) . Es posible que desee utilizar DistinctUntilChanged () para evitar la publicación de actualizaciones cuando nada ha cambiado en los datos de origen.

  • La fuente de datos es algún tipo de API asíncrona que llama a su callback : En este caso, use Observable.Create para conectar su callback para llamar a OnNext / OnError / OnComplete en el observador.

  • La fuente de datos es una llamada que bloquea hasta que haya nuevos datos disponibles (por ejemplo, algunas operaciones de lectura de socket síncronas): en este caso, puede usar Observable.Create para ajustar el código imperativo que se lee desde el socket y lo publica en Observer.OnNext cuando se leen los datos Esto puede ser similar a lo que estás haciendo con el Sujeto.

Usar Observable.Create vs crear una clase que administra un Subject es bastante equivalente a usar la palabra clave yield versus crear una clase completa que implemente IEnumerator. Por supuesto, puede escribir un IEnumerator para que sea tan limpio y tan bueno como el código de rendimiento, pero ¿cuál está mejor encapsulado y se siente más limpio? Lo mismo es cierto para Observable.Create vs managing Subjects.

Observable.Create le da un patrón limpio para la configuración perezosa y desassembly limpio. ¿Cómo se logra esto con una clase envolviendo a un sujeto? Necesitas algún tipo de método de inicio … ¿cómo sabes cuándo llamarlo? ¿O siempre lo comienzas, incluso cuando nadie está escuchando? Y cuando haya terminado, ¿cómo logra que deje de leer desde el socket / sondeo de la base de datos, etc.? Tienes que tener algún tipo de método Stop, y aún debes tener acceso no solo al IObservable al que estás suscrito, sino a la clase que creó el Subject en primer lugar.

Con Observable.Create, todo está envuelto en un solo lugar. El cuerpo de Observable.Create no se ejecuta hasta que alguien se suscriba, por lo que si nadie se suscribe, nunca utilizará su recurso. Y Observable.Create devuelve un Desechable que puede cerrar limpiamente sus recursos / devoluciones de llamada, etc. – esto se llama cuando el Observador cancela la suscripción. La vida útil de los recursos que está utilizando para generar el Observable está claramente relacionada con la vida útil del Observable.

El texto del bloque citado explica por qué no debería utilizar Subject , pero para simplificar, está combinando las funciones de observador y observable, mientras inyecta algún tipo de estado en el medio (ya sea que esté encapsulando o extensión).

Aquí es donde te encuentras en problemas; estas responsabilidades deben ser separadas y distintas entre sí.

Dicho esto, en su caso específico , recomendaría que rompa sus preocupaciones en partes más pequeñas.

En primer lugar, tiene el hilo que está caliente y siempre supervisa el hardware en busca de señales para enviar notificaciones. ¿Cómo harías esto normalmente? Eventos . Así que comencemos con eso.

Definamos los EventArgs que lanzará su evento.

 // The event args that has the information. public class BaseFrameEventArgs : EventArgs { public BaseFrameEventArgs(IBaseFrame baseFrame) { // Validate parameters. if (baseFrame == null) throw new ArgumentNullException("IBaseFrame"); // Set values. BaseFrame = baseFrame; } // Poor man's immutability. public IBaseFrame BaseFrame { get; private set; } } 

Ahora, la clase que disparará el evento. Tenga en cuenta que esto podría ser una clase estática (ya que siempre tiene un subproceso que ejecuta la supervisión del búfer de hardware), o algo a lo que llame a petición que se suscriba a eso . Tendrá que modificar esto según corresponda.

 public class BaseFrameMonitor { // You want to make this access thread safe public event EventHandler HardwareEvent; public BaseFrameMonitor() { // Create/subscribe to your thread that // drains hardware signals. } } 

Entonces ahora tienes una clase que expone un evento. Los observables funcionan bien con los eventos. Tanto que hay soporte de primera clase para convertir flujos de eventos (piense en un flujo de eventos como disparos múltiples de un evento) en IObservable si sigue el patrón de eventos estándar, a través del método estático FromEventPattern en la clase Observable .

Con el origen de sus eventos y el método FromEventPattern , podemos crear un IObservable> fácilmente (la EventPattern incorpora lo que vería en un evento .NET, en particular, una instancia derivada de EventArgs y un objeto que representa al remitente), así:

 // The event source. // Or you might not need this if your class is static and exposes // the event as a static event. var source = new BaseFrameMonitor(); // Create the observable. It's going to be hot // as the events are hot. IObservable> observable = Observable. FromEventPattern( h => source.HardwareEvent += h, h => source.HardwareEvent -= h); 

Por supuesto, usted quiere un IObservable , pero eso es fácil, usando el método de extensión Select en la clase Observable para crear una proyección (como lo haría en LINQ, y podemos envolver todo esto en un fácil-para- método de uso):

 public IObservable CreateHardwareObservable() { // The event source. // Or you might not need this if your class is static and exposes // the event as a static event. var source = new BaseFrameMonitor(); // Create the observable. It's going to be hot // as the events are hot. IObservable> observable = Observable. FromEventPattern( h => source.HardwareEvent += h, h => source.HardwareEvent -= h); // Return the observable, but projected. return observable.Select(i => i.EventArgs.BaseFrame); } 

Es malo generalizar que los sujetos no son buenos para usar en una interfaz pública. Si bien es cierto que esta no es la forma en que debería verse un enfoque de progtwigción reactiva, definitivamente es una buena opción de mejora / refactorización para su código clásico.

Si tiene una propiedad normal con un acceso público y desea notificar acerca de los cambios, no hay nada en contra de reemplazarlo con un BehaviorSubject. INPC u otros eventos adicionales simplemente no son tan limpios y personalmente me quita el conocimiento. Para este propósito, puede y debe usar BehaviorSubjects como propiedades públicas en lugar de propiedades normales y zanja INPC u otros eventos.

Además, la interfaz del sujeto hace que los usuarios de su interfaz sean más conscientes de la funcionalidad de sus propiedades y es más probable que se suscriban en lugar de solo obtener el valor.

Es lo mejor para usar si desea que otros escuchen / suscriban los cambios de una propiedad.