¿Cuál es la forma correcta de compartir el resultado de una llamada de red de Hngp angular en RxJs 5?

Al usar Http, llamamos a un método que realiza una llamada de red y devuelve un http observable:

getCustomer() { return this.http.get('/someUrl').map(res => res.json()); } 

Si tomamos esto observable y agregamos varios suscriptores a él:

 let network$ = getCustomer(); let subscriber1 = network$.subscribe(...); let subscriber2 = network$.subscribe(...); 

Lo que queremos hacer es asegurarnos de que esto no cause múltiples solicitudes de red.

Esto podría parecer un escenario inusual, pero en realidad es bastante común: por ejemplo, si la persona que llama se suscribe a lo observable para mostrar un mensaje de error y lo pasa a la plantilla utilizando el canal asíncrono, ya tenemos dos suscriptores.

¿Cuál es la forma correcta de hacerlo en RxJs 5?

A saber, esto parece funcionar bien:

 getCustomer() { return this.http.get('/someUrl').map(res => res.json()).share(); } 

¿Pero es esta la manera idiomática de hacer esto en RxJs 5, o deberíamos hacer otra cosa en su lugar?

Nota: Según Angular 5 new HttpClient , la parte .map(res => res.json()) en todos los ejemplos ahora es inútil, ya que el resultado JSON ahora se asume de manera predeterminada.

    Guarde en caché los datos y, si están disponibles en caché, devuélvalos, de lo contrario, realice la solicitud HTTP.

     import {Injectable} from '@angular/core'; import {Http, Headers} from '@angular/http'; import {Observable} from 'rxjs/Observable'; import 'rxjs/add/observable/of'; //proper way to import the 'of' operator import 'rxjs/add/operator/share'; import 'rxjs/add/operator/map'; import {Data} from './data'; @Injectable() export class DataService { private url:string = 'https://cors-test.appspot.com/test'; private data: Data; private observable: Observable; constructor(private http:Http) {} getData() { if(this.data) { // if `data` is available just return it as `Observable` return Observable.of(this.data); } else if(this.observable) { // if `this.observable` is set then the request is in progress // return the `Observable` for the ongoing request return this.observable; } else { // example header (not necessary) let headers = new Headers(); headers.append('Content-Type', 'application/json'); // create the request, store the `Observable` for subsequent subscribers this.observable = this.http.get(this.url, { headers: headers }) .map(response => { // when the cached data is available we don't need the `Observable` reference anymore this.observable = null; if(response.status == 400) { return "FAILURE"; } else if(response.status == 200) { this.data = new Data(response.json()); return this.data; } // make it shared so more than one subscriber can get the result }) .share(); return this.observable; } } } 

    Ejemplo de Plunker

    Este artiláctico https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html es una gran explicación de cómo almacenar en caché con shareReplay .

    Por sugerencia de @Cristian, esta es una forma que funciona bien para los observables HTTP, que solo emiten una vez y luego completan:

     getCustomer() { return this.http.get('/someUrl') .map(res => res.json()).publishLast().refCount(); } 

    ACTUALIZACIÓN: Ben Lesh dice que en el próximo lanzamiento menor después de 5.2.0, podrás simplemente llamar a shareReplay () para que sea realmente caché.

    PREVIAMENTE…..

    En primer lugar, no use share () o publishReplay (1) .refCount (), son lo mismo y el problema es que solo comparte si las conexiones se realizan mientras el observable está activo, si se conecta después de que se completa , crea un nuevo observable de nuevo, traducción, no realmente almacenamiento en caché.

    Birowski dio la solución correcta arriba, que es usar ReplaySubject. ReplaySubject guardará en caché los valores que le da (bufferSize) en nuestro caso 1. No creará un nuevo elemento observable como share () una vez que refCount llegue a cero y establezca una nueva conexión, que es el comportamiento correcto para el almacenamiento en caché.

    Aquí hay una función reutilizable

     export function cacheable(o: Observable): Observable { let replay = new ReplaySubject(1); o.subscribe( x => replay.next(x), x => replay.error(x), () => replay.complete() ); return replay.asObservable(); } 

    He aquí cómo usarlo

     import { Injectable } from '@angular/core'; import { Http } from '@angular/http'; import { Observable } from 'rxjs/Observable'; import { cacheable } from '../utils/rxjs-functions'; @Injectable() export class SettingsService { _cache: Observable; constructor(private _http: Http, ) { } refresh = () => { if (this._cache) { return this._cache; } return this._cache = cacheable(this._http.get('YOUR URL')); } } 

    A continuación se muestra una versión más avanzada de la función de caché que permite tener su propia tabla de búsqueda + la capacidad de proporcionar una tabla de búsqueda personalizada. De esta manera, no es necesario que verifique this._cache como en el ejemplo anterior. También observe que en lugar de pasar lo observable como primer argumento, pasa una función que devuelve los observables, esto es porque Http de Angular se ejecuta de inmediato, por lo que al devolver una función ejecutada floja, podemos decidir no llamar si ya está en nuestro caché

     let cacheableCache: { [key: string]: Observable } = {}; export function cacheable(returnObservable: () => Observable, key?: string, customCache?: { [key: string]: Observable }): Observable { if (!!key && (customCache || cacheableCache)[key]) { return (customCache || cacheableCache)[key] as Observable; } let replay = new ReplaySubject(1); returnObservable().subscribe( x => replay.next(x), x => replay.error(x), () => replay.complete() ); let observable = replay.asObservable(); if (!!key) { if (!!customCache) { customCache[key] = observable; } else { cacheableCache[key] = observable; } } return observable; } 

    Uso:

     getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache") 

    según este artículo

    Resulta que podemos agregar fácilmente el almacenamiento en caché a lo observable al agregar publishReplay (1) y refCount.

    tan dentro si las declaraciones solo anexan

     .publishReplay(1) .refCount(); 

    a .map(...)

    rxjs 5.4.0 tiene un nuevo método shareReplay .

    • rx-book shareReplay ()
    • No hay documentos en reactivex.io/rxjs

    El autor dice explícitamente “ideal para manejar cosas como el almacenamiento en caché de los resultados de AJAX”

    rxjs PR # 2443 feat (shareReplay): agrega la variante de publishReplay de publishReplay

    shareReplay devuelve un observable que es la fuente multidifundida sobre un ReplaySubject. Ese tema de reproducción se recicla por error desde la fuente, pero no al completar la fuente. Esto hace que shareReplay sea ideal para manejar cosas como el almacenamiento en caché de resultados AJAX, ya que es recuperable. Sin embargo, su comportamiento repetitivo difiere de compartir en que no repetirá la fuente observable, sino que repetirá los valores observables de la fuente.

    Hice la pregunta, pero intentaré probarlo.

     //this will be the shared observable that //anyone can subscribe to, get the value, //but not cause an api request let customer$ = new Rx.ReplaySubject(1); getCustomer().subscribe(customer$); //here's the first subscriber customer$.subscribe(val => console.log('subscriber 1: ' + val)); //here's the second subscriber setTimeout(() => { customer$.subscribe(val => console.log('subscriber 2: ' + val)); }, 1000); function getCustomer() { return new Rx.Observable(observer => { console.log('api request'); setTimeout(() => { console.log('api response'); observer.next('customer object'); observer.complete(); }, 500); }); } 

    Aquí está la prueba 🙂

    Solo hay un punto getCustomer().subscribe(customer$) : getCustomer().subscribe(customer$)

    No estamos suscribiéndonos a la respuesta api de getCustomer() , nos estamos suscribiendo a un ReplaySubject que es observable que también puede suscribirse a un Observable diferente y (y esto es importante) mantener su último valor emitido y volver a publicarlo en cualquiera de son suscriptores de (ReplaySubject).

    La implementación que elija dependerá de si desea cancelar la suscripción () para cancelar su solicitud HTTP o no.

    En cualquier caso, los decoradores de TypeScript son una buena forma de estandarizar el comportamiento. Este es el que escribí:

      @CacheObservableArgsKey getMyThing(id: string): Observable { return this.http.get('things/'+id); } 

    Definición de decorador:

     /** * Decorator that replays and connects to the Observable returned from the function. * Caches the result using all arguments to form a key. * @param target * @param name * @param descriptor * @returns {PropertyDescriptor} */ export function CacheObservableArgsKey(target: Object, name: string, descriptor: PropertyDescriptor) { const originalFunc = descriptor.value; const cacheMap = new Map(); descriptor.value = function(this: any, ...args: any[]): any { const key = args.join('::'); let returnValue = cacheMap.get(key); if (returnValue !== undefined) { console.log(`${name} cache-hit ${key}`, returnValue); return returnValue; } returnValue = originalFunc.apply(this, args); console.log(`${name} cache-miss ${key} new`, returnValue); if (returnValue instanceof Observable) { returnValue = returnValue.publishReplay(1); returnValue.connect(); } else { console.warn('CacheHttpArgsKey: value not an Observable cannot publishReplay and connect', returnValue); } cacheMap.set(key, returnValue); return returnValue; }; return descriptor; } 

    Encontré una forma de almacenar el resultado de obtención de http en sessionStorage y usarlo para la sesión, para que nunca vuelva a llamar al servidor.

    Lo usé para llamar a la API de Github para evitar el límite de uso.

     @Injectable() export class HttpCache { constructor(private http: Http) {} get(url: string): Observable { let cached: any; if (cached === sessionStorage.getItem(url)) { return Observable.of(JSON.parse(cached)); } else { return this.http.get(url) .map(resp => { sessionStorage.setItem(url, resp.text()); return resp.json(); }); } } } 

    FYI, el límite de sessionStorage es 5M (o 4.75M). Por lo tanto, no debe usarse así para un gran conjunto de datos.

    —— editar ————-
    Si desea tener datos actualizados con F5, que utiliza datos de memoria en lugar de sessionStorage;

     @Injectable() export class HttpCache { cached: any = {}; // this will store data constructor(private http: Http) {} get(url: string): Observable { if (this.cached[url]) { return Observable.of(this.cached[url])); } else { return this.http.get(url) .map(resp => { this.cached[url] = resp.text(); return resp.json(); }); } } } 

    Datos de respuesta HTTP en caché usando Rxjs Observer / Observable + Caching + Subscription

    Ver el código a continuación

    * descargo de responsabilidad: soy nuevo en rxjs, así que tenga en cuenta que puedo estar haciendo un uso indebido del enfoque observable / observador. Mi solución es puramente un conglomerado de otras soluciones que encontré, y es la consecuencia de no haber encontrado una solución simple y bien documentada. Por lo tanto, estoy brindando mi solución de código completa (como me hubiera gustado haber encontrado) con la esperanza de que ayude a otros.

    * tenga en cuenta que este enfoque se basa libremente en GoogleFirebaseObservables. Lamentablemente, carezco de la experiencia / el tiempo adecuados para replicar lo que hicieron bajo el capó. Pero la siguiente es una forma simplista de proporcionar acceso asincrónico a algunos datos almacenables en caché.

    Situación : un componente ‘lista de productos’ tiene la tarea de mostrar una lista de productos. El sitio es una aplicación web de una sola página con algunos botones de menú que “filtran” los productos que se muestran en la página.

    Solución : el componente “se suscribe” a un método de servicio. El método de servicio devuelve una matriz de objetos de producto, a la que accede el componente a través de la callback de suscripción. El método de servicio envuelve su actividad en un Observer recién creado y devuelve al observador. Dentro de este observador, busca los datos en caché y los vuelve a enviar al suscriptor (el componente) y los devuelve. De lo contrario, emite una llamada http para recuperar los datos, se suscribe a la respuesta, donde puede procesar esos datos (por ejemplo, asignar los datos a su propio modelo) y luego pasar los datos al suscriptor.

    El código

    product-list.component.ts

     import { Component, OnInit, Input } from '@angular/core'; import { ProductService } from '../../../services/product.service'; import { Product, ProductResponse } from '../../../models/Product'; @Component({ selector: 'app-product-list', templateUrl: './product-list.component.html', styleUrls: ['./product-list.component.scss'] }) export class ProductListComponent implements OnInit { products: Product[]; constructor( private productService: ProductService ) { } ngOnInit() { console.log('product-list init...'); this.productService.getProducts().subscribe(products => { console.log('product-list received updated products'); this.products = products; }); } } 

    product.service.ts

     import { Injectable } from '@angular/core'; import { Http, Headers } from '@angular/http'; import { Observable, Observer } from 'rxjs'; import 'rxjs/add/operator/map'; import { Product, ProductResponse } from '../models/Product'; @Injectable() export class ProductService { products: Product[]; constructor( private http:Http ) { console.log('product service init. calling http to get products...'); } getProducts():Observable{ //wrap getProducts around an Observable to make it async. let productsObservable$ = Observable.create((observer: Observer) => { //return products if it was previously fetched if(this.products){ console.log('## returning existing products'); observer.next(this.products); return observer.complete(); } //Fetch products from REST API console.log('** products do not yet exist; fetching from rest api...'); let headers = new Headers(); this.http.get('http://localhost:3000/products/', {headers: headers}) .map(res => res.json()).subscribe((response:ProductResponse) => { console.log('productResponse: ', response); let productlist = Product.fromJsonList(response.products); //convert service observable to product[] this.products = productlist; observer.next(productlist); }); }); return productsObservable$; } } 

    product.ts (el modelo)

     export interface ProductResponse { success: boolean; msg: string; products: Product[]; } export class Product { product_id: number; sku: string; product_title: string; ..etc... constructor(product_id: number, sku: string, product_title: string, ...etc... ){ //typescript will not autoassign the formal parameters to related properties for exported classes. this.product_id = product_id; this.sku = sku; this.product_title = product_title; ...etc... } //Class method to convert products within http response to pure array of Product objects. //Caller: product.service:getProducts() static fromJsonList(products:any): Product[] { let mappedArray = products.map(Product.fromJson); return mappedArray; } //add more parameters depending on your database entries and constructor static fromJson({ product_id, sku, product_title, ...etc... }): Product { return new Product( product_id, sku, product_title, ...etc... ); } } 

    Aquí hay una muestra de la salida que veo cuando cargo la página en Chrome. Tenga en cuenta que en la carga inicial, los productos se obtienen de http (llame al servicio de descanso de mi nodo, que se ejecuta localmente en el puerto 3000). Cuando hago clic para navegar a una vista ‘filtrada’ de los productos, los productos se encuentran en el caché.

    Mi registro de Chrome (consola):

     core.es5.js:2925 Angular is running in the development mode. Call enableProdMode() to enable the production mode. app.component.ts:19 app.component url: /products product.service.ts:15 product service init. calling http to get products... product-list.component.ts:18 product-list init... product.service.ts:29 ** products do not yet exist; fetching from rest api... product.service.ts:33 productResponse: {success: true, msg: "Products found", products: Array(23)} product-list.component.ts:20 product-list received updated products 

    … [hizo clic en un botón de menú para filtrar los productos] …

     app.component.ts:19 app.component url: /products/chocolatechip product-list.component.ts:18 product-list init... product.service.ts:24 ## returning existing products product-list.component.ts:20 product-list received updated products 

    Conclusión: Esta es la forma más sencilla que he encontrado (hasta ahora) para implementar datos de respuesta HTTP en caché. En mi aplicación angular, cada vez que navego hacia una vista diferente de los productos, el componente de la lista de productos se vuelve a cargar. ProductService parece ser una instancia compartida, por lo que la memoria caché local de ‘products: Product []’ en ProductService se conserva durante la navegación, y las llamadas posteriores a “GetProducts ()” devuelven el valor en caché. Una nota final: he leído comentarios sobre cómo las observaciones / suscripciones deben cerrarse cuando hayas terminado para evitar “pérdidas de memoria”. No he incluido esto aquí, pero es algo a tener en cuenta.

    Supongo que @ ngx-cache / core podría ser útil para mantener las características de caché para las llamadas http, especialmente si la llamada HTTP se realiza tanto en el navegador como en las plataformas de servidor .

    Digamos que tenemos el siguiente método:

     getCustomer() { return this.http.get('/someUrl').map(res => res.json()); } 

    Puede usar el decorador en Cached de @ ngx-cache / core para almacenar el valor devuelto del método que realiza la llamada HTTP en el cache storage en cache storage ( el storage puede ser configurable, verifique la implementación en ng-seed / universal ) – right on la primera ejecución. Las próximas veces que se invoca el método (no importa en el navegador ni en la plataforma del servidor ), el valor se recupera del cache storage .

     import { Cached } from '@ngx-cache/core'; ... @Cached('get-customer') // the cache key/identifier getCustomer() { return this.http.get('/someUrl').map(res => res.json()); } 

    También existe la posibilidad de usar métodos de almacenamiento en caché ( has , get , set ) usando la API de almacenamiento en caché .

    anyclass.ts

     ... import { CacheService } from '@ngx-cache/core'; @Injectable() export class AnyClass { constructor(private readonly cache: CacheService) { // note that CacheService is injected into a private property of AnyClass } // will retrieve 'some string value' getSomeStringValue(): string { if (this.cache.has('some-string')) return this.cache.get('some-string'); this.cache.set('some-string', 'some string value'); return 'some string value'; } } 

    Aquí está la lista de paquetes, tanto para el caché del lado del cliente como del lado del servidor:

    • @ ngx-cache / core : utilidad de caché
    • @ ngx-cache / platform-browser : implementación de la plataforma SPA / Browser
    • @ ngx-cache / platform-server : implementación de la plataforma del servidor
    • @ ngx-cache / fs-storage : utilidad de almacenamiento (requerida para la plataforma del servidor)

    rxjs 5.3.0

    No me he sentido satisfecho con .map(myFunction).publishReplay(1).refCount()

    Con múltiples suscriptores, .map() ejecuta myFunction dos veces en algunos casos (espero que solo se ejecute una vez). Una solución parece ser publishReplay(1).refCount().take(1)

    Otra cosa que puede hacer es simplemente no usar refCount() y hacer que el Observable se caliente inmediatamente:

     let obs = this.http.get('my/data.json').publishReplay(1); obs.connect(); return obs; 

    Esto iniciará la solicitud HTTP independientemente de los suscriptores. No estoy seguro si cancelar la suscripción antes de que termine HTTP GET lo cancelará o no.

    Lo que queremos hacer es asegurarnos de que esto no cause múltiples solicitudes de red.

    Mi favorito personal es hacer uso de métodos async para llamadas que realizan solicitudes de red. Los métodos en sí mismos no devuelven un valor, sino que actualizan un BehaviorSubject dentro del mismo servicio, a qué componentes se suscribirán.

    Ahora ¿Por qué usar un BehaviorSubject lugar de un Observable ? Porque,

    • Tras la suscripción, BehaviorSubject devuelve el último valor, mientras que Un observable regular solo se activa cuando recibe un onnext .
    • Si desea recuperar el último valor de BehaviorSubject en un código no observable (sin suscripción), puede usar el método getValue() .

    Ejemplo:

    customer.service.ts

     public customers$: BehaviorSubject = new BehaviorSubject([]); public async getCustomers(): Promise { let customers = await this.httpClient.post(this.endPoint, criteria).toPromise(); if (customers) this.customers$.next(customers); } 

    Luego, cuando sea necesario, podemos suscribir customers$ .

     public ngOnInit(): void { this.customerService.customers$ .subscribe((customers: Customer[]) => this.customerList = customers); } 

    O tal vez quieras usarlo directamente en una plantilla

     
  • ...
  • Entonces, hasta que realice otra llamada a getCustomers , los datos se conservan en los customers$ BehaviorSubject.

    Entonces, ¿qué pasa si quieres actualizar estos datos? solo haga una llamada a getCustomers()

     public async refresh(): Promise { try { await this.customerService.getCustomers(); } catch (e) { // request failed, handle exception console.error(e); } } 

    Al usar este método, no es necesario que retengamos explícitamente los datos entre las llamadas de red subsiguientes ya que BehaviorSubject maneja.

    PD: generalmente, cuando se destruye un componente, es una buena práctica deshacerse de las suscripciones, para eso puedes usar el método sugerido en esta respuesta.

    Simplemente llame a share () después del mapa y antes de cualquier suscripción .

    En mi caso, tengo un servicio genérico (RestClientService.ts) que realiza la llamada de reposo, extrae datos, comprueba errores y devuelve observable a un servicio de implementación concreto (por ejemplo, ContractClientService.ts), finalmente esta implementación concreta devuelve observable a de ContractComponent.ts, y este se suscribe para actualizar la vista.

    RestClientService.ts:

     export abstract class RestClientService { public GetAll = (path: string, property: string): Observable => { let fullPath = this.actionUrl + path; let observable = this._http.get(fullPath).map(res => this.extractData(res, property)); observable = observable.share(); //allows multiple subscribers without making again the http request observable.subscribe( (res) => {}, error => this.handleError2(error, "GetAll", fullPath), () => {} ); return observable; } private extractData(res: Response, property: string) { ... } private handleError2(error: any, method: string, path: string) { ... } } 

    ContractService.ts:

     export class ContractService extends RestClientService { private GET_ALL_ITEMS_REST_URI_PATH = "search"; private GET_ALL_ITEMS_PROPERTY_PATH = "contract"; public getAllItems(): Observable { return this.GetAll(this.GET_ALL_ITEMS_REST_URI_PATH, this.GET_ALL_ITEMS_PROPERTY_PATH); } } 

    ContractComponent.ts:

     export class ContractComponent implements OnInit { getAllItems() { this.rcService.getAllItems().subscribe((data) => { this.items = data; }); } } 

    Escribí una clase de caché,

     /** * Caches results returned from given fetcher callback for given key, * up to maxItems results, deletes the oldest results when full (FIFO). */ export class StaticCache { static cachedData: Map = new Map(); static maxItems: number = 400; static get(key: string){ return this.cachedData.get(key); } static getOrFetch(key: string, fetcher: (string) => any): any { let value = this.cachedData.get(key); if (value != null){ console.log("Cache HIT! (fetcher)"); return value; } console.log("Cache MISS... (fetcher)"); value = fetcher(key); this.add(key, value); return value; } static add(key, value){ this.cachedData.set(key, value); this.deleteOverflowing(); } static deleteOverflowing(): void { if (this.cachedData.size > this.maxItems) { this.deleteOldest(this.cachedData.size - this.maxItems); } } /// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration. /// However that seems not to work. Trying with forEach. static deleteOldest(howMany: number): void { //console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size); let iterKeys = this.cachedData.keys(); let item: IteratorResult; while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){ //console.debug(" Deleting: " + item.value); this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS. } } static clear(): void { this.cachedData = new Map(); } } 

    It’s all static because of how we use it, but feel free to make it a normal class and a service. I’m not sure if angular keeps a single instance for the whole time though (new to Angular2).

    And this is how I use it:

      let httpService: Http = this.http; function fetcher(url: string): Observable { console.log(" Fetching URL: " + url); return httpService.get(url).map((response: Response) => { if (!response) return null; if (typeof response.json() !== "array") throw new Error("Graph REST should return an array of vertices."); let items: any[] = graphService.fromJSONarray(response.json(), httpService); return array ? items : items[0]; }); } // If data is a link, return a result of a service call. if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link") { // Make an HTTP call. let url = this.data[verticesLabel][name]["link"]; let cachedObservable: Observable = StaticCache.getOrFetch(url, fetcher); if (!cachedObservable) throw new Error("Failed loading link: " + url); return cachedObservable; } 

    I assume there could be a more clever way, which would use some Observable tricks but this was just fine for my purposes.

    Just use this cache layer, it does everything you requires, and even manage cache for ajax requests.

    http://www.ravinderpayal.com/blogs/12Jan2017-Ajax-Cache-Mangement-Angular2-Service.html

    It’s this much easy to use

     @Component({ selector: 'home', templateUrl: './html/home.component.html', styleUrls: ['./css/home.component.css'], }) export class HomeComponent { constructor(AjaxService:AjaxService){ AjaxService.postCache("/api/home/articles").subscribe(values=>{console.log(values);this.articles=values;}); } articles={1:[{data:[{title:"first",sort_text:"description"},{title:"second",sort_text:"description"}],type:"Open Source Works"}]}; } 

    The layer(as an inject-able angular service) is

     import { Injectable } from '@angular/core'; import { Http, Response} from '@angular/http'; import { Observable } from 'rxjs/Observable'; import './../rxjs/operator' @Injectable() export class AjaxService { public data:Object={}; /* private dataObservable:Observable; */ private dataObserver:Array=[]; private loading:Object={}; private links:Object={}; counter:number=-1; constructor (private http: Http) { } private loadPostCache(link:string){ if(!this.loading[link]){ this.loading[link]=true; this.links[link].forEach(a=>this.dataObserver[a].next(false)); this.http.get(link) .map(this.setValue) .catch(this.handleError).subscribe( values => { this.data[link] = values; delete this.loading[link]; this.links[link].forEach(a=>this.dataObserver[a].next(false)); }, error => { delete this.loading[link]; } ); } } private setValue(res: Response) { return res.json() || { }; } private handleError (error: Response | any) { // In a real world app, we might use a remote logging infrastructure let errMsg: string; if (error instanceof Response) { const body = error.json() || ''; const err = body.error || JSON.stringify(body); errMsg = `${error.status} - ${error.statusText || ''} ${err}`; } else { errMsg = error.message ? error.message : error.toString(); } console.error(errMsg); return Observable.throw(errMsg); } postCache(link:string): Observable{ return Observable.create(observer=> { if(this.data.hasOwnProperty(link)){ observer.next(this.data[link]); } else{ let _observable=Observable.create(_observer=>{ this.counter=this.counter+1; this.dataObserver[this.counter]=_observer; this.links.hasOwnProperty(link)?this.links[link].push(this.counter):(this.links[link]=[this.counter]); _observer.next(false); }); this.loadPostCache(link); _observable.subscribe(status=>{ if(status){ observer.next(this.data[link]); } } ); } }); } } 

    It’s .publishReplay(1).refCount(); or .publishLast().refCount(); since Angular Http observables complete after request.

    This simple class caches the result so you can subscribe to .value many times and makes only 1 request. You can also use .reload() to make new request and publish data.

    You can use it like:

     let res = new RestResource(() => this.http.get('inline.bundleo.js')); res.status.subscribe((loading)=>{ console.log('STATUS=',loading); }); res.value.subscribe((value) => { console.log('VALUE=', value); }); 

    and the source:

     export class RestResource { static readonly LOADING: string = 'RestResource_Loading'; static readonly ERROR: string = 'RestResource_Error'; static readonly IDLE: string = 'RestResource_Idle'; public value: Observable; public status: Observable; private loadStatus: Observer; private reloader: Observable; private reloadTrigger: Observer; constructor(requestObservableFn: () => Observable) { this.status = Observable.create((o) => { this.loadStatus = o; }); this.reloader = Observable.create((o: Observer) => { this.reloadTrigger = o; }); this.value = this.reloader.startWith(null).switchMap(() => { if (this.loadStatus) { this.loadStatus.next(RestResource.LOADING); } return requestObservableFn() .map((res) => { if (this.loadStatus) { this.loadStatus.next(RestResource.IDLE); } return res; }).catch((err)=>{ if (this.loadStatus) { this.loadStatus.next(RestResource.ERROR); } return Observable.of(null); }); }).publishReplay(1).refCount(); } reload() { this.reloadTrigger.next(null); } } 

    You can build simple class Cacheable<> that helps managing data retrieved from http server with multiple subscribers:

     declare type GetDataHandler = () => Observable; export class Cacheable { protected data: T; protected subjectData: Subject; protected observableData: Observable; public getHandler: GetDataHandler; constructor() { this.subjectData = new ReplaySubject(1); this.observableData = this.subjectData.asObservable(); } public getData(): Observable { if (!this.getHandler) { throw new Error("getHandler is not defined"); } if (!this.data) { this.getHandler().map((r: T) => { this.data = r; return r; }).subscribe( result => this.subjectData.next(result), err => this.subjectData.error(err) ); } return this.observableData; } public resetCache(): void { this.data = null; } public refresh(): void { this.resetCache(); this.getData(); } } 

    Uso

    Declare Cacheable<> object (presumably as part of the service):

     list: Cacheable = new Cacheable(); 

    and handler:

     this.list.getHandler = () => { // get data from server return this.http.get(url) .map((r: Response) => r.json() as string[]); } 

    Call from a component:

     //gets data from server List.getData().subscribe(…) 

    You can have several components subscribed to it.

    More details and code example are here: http://devinstance.net/articles/20171021/rxjs-cacheable

    Great answers.

    Or you could do this:

    This is from latest version of rxjs. I am using 5.5.7 version of RxJS

     import {share} from "rxjs/operators"; this.http.get('/someUrl').pipe(share()); 

    Have you tried running the code you already have?

    Because you are constructing the Observable from the promise resulting from getJSON() , the network request is made before anyone subscribes. And the resulting promise is shared by all subscribers.

     var promise = jQuery.getJSON(requestUrl); // network call is executed now var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable o.subscribe(...); // does not trigger network call o.subscribe(...); // does not trigger network call // ...