From cb0b806502c19592aab8a91b64724490ae097b49 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 13 Jun 2016 17:24:19 -0700 Subject: [PATCH] fix(cache): get correct caching behavior (#1765) This is an initial pass at fixing the cache operator. There is still a lot to do. Ideally, the cache operator would use the lift mechanism. Also this is not well optimized as it is introducing a lot of closures. But it works, and that's the point for now. fixes #1628 --- spec/operators/cache-spec.ts | 34 +++++++++++++++++++++++--- src/operator/cache.ts | 47 +++++++++++++++++++++++++++++++++--- 2 files changed, 74 insertions(+), 7 deletions(-) diff --git a/spec/operators/cache-spec.ts b/spec/operators/cache-spec.ts index d8e5ee1750..68ec4aecf8 100644 --- a/spec/operators/cache-spec.ts +++ b/spec/operators/cache-spec.ts @@ -35,10 +35,10 @@ describe('Observable.prototype.cache', () => { rxTestScheduler.schedule(() => expectObservable(s1).toBe(expected2), time(sub2)); }); - it('should replay values and error', () => { - const s1 = hot('---^---a---b---c---# ').cache(undefined, undefined, rxTestScheduler); - const expected1 = '----a---b---c---# '; - const expected2 = ' (abc#)'; + it('should not replay values after error with a hot observable', () => { + const s1 = hot('---^---a---b---c---# ').cache(undefined, undefined, rxTestScheduler); + const expected1 = '----a---b---c---# '; + const expected2 = ' -'; const t = time( '------------------|'); expectObservable(s1).toBe(expected1); @@ -48,6 +48,19 @@ describe('Observable.prototype.cache', () => { }, t); }); + it('should be resubscribable after error with a cold observable', () => { + const s1 = cold( '----a---b---c---# ').cache(undefined, undefined, rxTestScheduler); + const expected1 = '----a---b---c---# '; + const expected2 = ' ----a---b---c---#'; + const t = time( '------------------| '); + + expectObservable(s1).toBe(expected1); + + rxTestScheduler.schedule(() => { + expectObservable(s1).toBe(expected2); + }, t); + }); + it('should replay values and and share', () => { const s1 = hot('---^---a---b---c------------d--e--f-|').cache(undefined, undefined, rxTestScheduler); const expected1 = '----a---b---c------------d--e--f-|'; @@ -193,4 +206,17 @@ describe('Observable.prototype.cache', () => { expectObservable(s1).toBe(e3); }, t2); }); + + it('should be retryable', () => { + const source = cold('--1-2-3-#'); + const subs = ['^ ! ', + ' ^ ! ', + ' ^ !']; + const expected = '--1-2-3---1-2-3---1-2-3-#'; + + const result = source.cache(undefined, undefined, rxTestScheduler).retry(2); + + expectObservable(result).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); }); \ No newline at end of file diff --git a/src/operator/cache.ts b/src/operator/cache.ts index b089f97f28..df56ce3166 100644 --- a/src/operator/cache.ts +++ b/src/operator/cache.ts @@ -1,7 +1,8 @@ import {Observable} from '../Observable'; -import {publishReplay} from './publishReplay'; import {Scheduler} from '../Scheduler'; -import {ConnectableObservable} from '../observable/ConnectableObservable'; +import {ReplaySubject} from '../ReplaySubject'; +import {Observer} from '../Observer'; +import {Subscription} from '../Subscription'; /** * @param bufferSize @@ -14,7 +15,47 @@ import {ConnectableObservable} from '../observable/ConnectableObservable'; export function cache(bufferSize: number = Number.POSITIVE_INFINITY, windowTime: number = Number.POSITIVE_INFINITY, scheduler?: Scheduler): Observable { - return (>publishReplay.call(this, bufferSize, windowTime, scheduler)).refCount(); + let subject: ReplaySubject; + let source = this; + let refs = 0; + let outerSub: Subscription; + + const getSubject = () => { + subject = new ReplaySubject(bufferSize, windowTime, scheduler); + return subject; + }; + + return new Observable((observer: Observer) => { + if (!subject) { + subject = getSubject(); + outerSub = source.subscribe( + (value: T) => subject.next(value), + (err: any) => { + let s = subject; + subject = null; + s.error(err); + }, + () => subject.complete() + ); + } + + refs++; + + if (!subject) { + subject = getSubject(); + } + let innerSub = subject.subscribe(observer); + + return () => { + refs--; + if (innerSub) { + innerSub.unsubscribe(); + } + if (refs === 0) { + outerSub.unsubscribe(); + } + }; + }); } export interface CacheSignature {