RxJS: takeUntil () componente angular ngOnDestroy ()

tl; dr: Básicamente quiero casarme con ngOnDestroy de ngOnDestroy con el operador takeUntil() . — ¿es eso posible?

Tengo un componente angular que abre varias suscripciones Rxjs. Estos deben estar cerrados cuando se destruye el componente.

Una solución simple para esto sería:

 class myComponent { private subscriptionA; private subscriptionB; private subscriptionC; constructor( private serviceA: ServiceA, private serviceB: ServiceB, private serviceC: ServiceC) {} ngOnInit() { this.subscriptionA = this.serviceA.subscribe(...); this.subscriptionB = this.serviceB.subscribe(...); this.subscriptionC = this.serviceC.subscribe(...); } ngOnDestroy() { this.subscriptionA.unsubscribe(); this.subscriptionB.unsubscribe(); this.subscriptionC.unsubscribe(); } } 

Esto funciona, pero es un poco redundante. En especial, no me gusta eso: la unsubscribe() está en otro lugar, así que debes recordar que están vinculados. – El estado del componente está contaminado con la suscripción.

Preferiría usar el operador takeUntil() o algo similar, para que se vea así:

 class myComponent { constructor( private serviceA: ServiceA, private serviceB: ServiceB, private serviceC: ServiceC) {} ngOnInit() { const destroy = Observable.fromEvent(???).first(); this.subscriptionA = this.serviceA.subscribe(...).takeUntil(destroy); this.subscriptionB = this.serviceB.subscribe(...).takeUntil(destroy); this.subscriptionC = this.serviceC.subscribe(...).takeUntil(destroy); } } 

¿Hay algún evento de destrucción o algo similar que me permita usar takeUntil() u otra forma de simplificar la architecture de componentes de esa manera? Me doy cuenta de que podría crear un evento yo mismo en el constructor o algo que se ngOnDestroy() dentro de ngOnDestroy() pero que al final no haría las cosas mucho más simples de leer.

Podría aprovechar un ReplaySubject para eso:

 class myComponent { private destroyed$: ReplaySubject = new ReplaySubject(1); constructor( private serviceA: ServiceA, private serviceB: ServiceB, private serviceC: ServiceC) {} ngOnInit() { this.serviceA .takeUntil(this.destroyed$) .subscribe(...); this.serviceB .takeUntil(this.destroyed$) .subscribe(...); this.serviceC .takeUntil(this.destroyed$) .subscribe(...); } ngOnDestroy() { this.destroyed$.next(true); this.destroyed$.complete(); } } 

Bueno, esto se reduce a lo que quieres decir al cerrar una suscripción. Básicamente hay dos formas de hacer esto:

  1. Usar un operador que complete la cadena (como takeWhile() ).
  2. Darse de baja de la fuente Observable.

Es bueno saber que estos dos no son lo mismo.

Al usar, por ejemplo, takeWhile() haces que el operador envíe complete notificación complete que se propaga a tus observadores. Entonces si defines:

 ... .subscribe(..., ..., () => doWhatever()); 

Luego, cuando completes la cadena con, por ej. takeWhile() doWhatever() función doWhatever() .

Por ejemplo, podría verse así:

 const Observable = Rx.Observable; const Subject = Rx.Subject; let source = Observable.timer(0, 1000); let subject = new Subject(); source.takeUntil(subject).subscribe(null, null, () => console.log('complete 1')); source.takeUntil(subject).subscribe(null, null, () => console.log('complete 2')); source.takeUntil(subject).subscribe(null, null, () => console.log('complete 3')); setTimeout(() => { subject.next(); }, 3000); 

Después de 3s se llamarán todas las devoluciones de llamada completas.

Por otro lado, cuando cancelas la suscripción estás diciendo que ya no estás interesado en los artículos producidos por la fuente Observable. Sin embargo, esto no significa que la fuente deba completarse. Simplemente no te importa más.

Esto significa que puede recostackr todas las Subscription de las .subscribe(...) y .subscribe(...) suscripción de todas ellas a la vez:

 let subscriptions = new Rx.Subscription(); let source = Observable.timer(0, 1000); subscriptions.add(source.subscribe(null, null, () => console.log('complete 1'))); subscriptions.add(source.subscribe(null, null, () => console.log('complete 2'))); subscriptions.add(source.subscribe(null, null, () => console.log('complete 3'))); setTimeout(() => { subscriptions.unsubscribe(); }, 3000); 

Ahora, después de la demora de 3 segundos, no se imprimirá nada en la consola porque cancelamos la suscripción y no se invocó ninguna callback completa.

Entonces, lo que quiere usar depende de usted y de su caso de uso. Solo ten en cuenta que cancelar la suscripción no es lo mismo que completar, aunque supongo que en tu situación en realidad no importa.

El uso de la función componentDestroyed() del paquete npm ng2-rx-componentdestroyed es, de lejos, la forma más fácil de utilizar takeUntil:

 @Component({ selector: 'foo', templateUrl: './foo.component.html' }) export class FooComponent implements OnInit, OnDestroy { ngOnInit() { Observable.interval(1000) .takeUntil(componentDestroyed(this)) // <--- magic is here! .subscribe(console.log); } ngOnDestroy() {} } 

Aquí hay una versión de componentDestroyed() para incluir directamente en su código:

 // Based on https://www.npmjs.com/package/ng2-rx-componentdestroyed import { OnDestroy } from '@angular/core'; import { ReplaySubject } from 'rxjs/ReplaySubject'; export function componentDestroyed(component: OnDestroy) { const oldNgOnDestroy = component.ngOnDestroy; const destroyed$ = new ReplaySubject(1); component.ngOnDestroy = () => { oldNgOnDestroy.apply(component); destroyed$.next(undefined); destroyed$.complete(); }; return destroyed$; } 

Si su componente está directamente vinculado a una ruta, puede evitar agregar estado aprovechando los eventos del Router con takeUntil() . De esta forma, tan pronto como salga del componente, limpiará sus suscripciones automáticamente.

 import { Component, OnInit } from '@angular/core'; import { ActivatedRoute, Router } from '@angular/router'; import { MyService } from './my.service'; import { Observable } from 'rxjs/Observable'; import 'rxjs/add/operator/takeUntil'; @Component({ ... }) export class ExampleComopnent implements OnInit { constructor(private router: Router, private myService: MyService) { } ngOnInit() { this.myService.methodA() .takeUntil(this.router.events) .subscribe(dataA => { ... }); this.myService.methodB() .takeUntil(this.router.events) .subscribe(dataB => { ... }); } } 

Nota: Este simple ejemplo no tiene en cuenta las rutas guardadas ni la navegación de ruta cancelada. Si existe la posibilidad de que uno de los eventos del enrutador se active pero se cancele la navegación en ruta, deberá filtrar los eventos del enrutador para que se active en el punto apropiado, por ejemplo, después de la verificación de Route Guard o una vez que navegue Esta completo.

 this.myService.methodA() .takeUntil(this.router.events.filter(e => e instanceOf NavigationEnd)) .subscribe(dataA => { ... });