diff --git a/spec/operators/multicast-spec.ts b/spec/operators/multicast-spec.ts index 1f42e3ebfc..78ba78abf3 100644 --- a/spec/operators/multicast-spec.ts +++ b/spec/operators/multicast-spec.ts @@ -702,6 +702,22 @@ describe('multicast operator', () => { }); // TODO: fix firehose unsubscription + // AFAICT, it's not possible for multicast observables to support ASAP + // unsubscription from synchronous firehose sources. The problem is that the + // chaining of the closed 'signal' is broken by the subject. For example, + // here: + // + // https://github.com/ReactiveX/rxjs/blob/2d5e4d5bd7b684a912485e1c1583ba3d41c8308e/src/internal/operators/multicast.ts#L53 + // + // The subject is passed to subscribe. However, in the subscribe + // implementation a SafeSubcriber is created with the subject as the + // observer: + // + // https://github.com/ReactiveX/rxjs/blob/2d5e4d5bd7b684a912485e1c1583ba3d41c8308e/src/internal/Observable.ts#L210 + // + // That breaks the chaining of closed - i.e. even if the unsubscribe is + // called on the subject, closing it, the SafeSubscriber's closed property + // won't reflect that. it.skip('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { diff --git a/spec/operators/repeat-spec.ts b/spec/operators/repeat-spec.ts index b245b35536..c7440012f2 100644 --- a/spec/operators/repeat-spec.ts +++ b/spec/operators/repeat-spec.ts @@ -281,8 +281,7 @@ describe('repeat operator', () => { }); }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop diff --git a/spec/operators/repeatWhen-spec.ts b/spec/operators/repeatWhen-spec.ts index af62704c98..878c8260dd 100644 --- a/spec/operators/repeatWhen-spec.ts +++ b/spec/operators/repeatWhen-spec.ts @@ -408,8 +408,7 @@ describe('repeatWhen operator', () => { expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'complete', 'teardown']) }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop diff --git a/spec/operators/retry-spec.ts b/spec/operators/retry-spec.ts index 29dd295783..484a3659c2 100644 --- a/spec/operators/retry-spec.ts +++ b/spec/operators/retry-spec.ts @@ -308,8 +308,7 @@ describe('retry operator', () => { }); }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop diff --git a/spec/operators/skipLast-spec.ts b/spec/operators/skipLast-spec.ts index f29b563f6b..6b491ea1fd 100644 --- a/spec/operators/skipLast-spec.ts +++ b/spec/operators/skipLast-spec.ts @@ -148,8 +148,7 @@ describe('skipLast operator', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop @@ -165,6 +164,9 @@ describe('skipLast operator', () => { take(3), ).subscribe(() => { /* noop */ }); - expect(sideEffects).to.deep.equal([0, 1, 2]); + // This expectation might seem a little strange, but the implementation of + // skipLast works by eating the number of elements that are to be skipped, + // so it will consume the number skipped in addition to the number taken. + expect(sideEffects).to.deep.equal([0, 1, 2, 3]); }); }); diff --git a/spec/operators/throttle-spec.ts b/spec/operators/throttle-spec.ts index 140ce6c9c1..fa91f65003 100644 --- a/spec/operators/throttle-spec.ts +++ b/spec/operators/throttle-spec.ts @@ -409,8 +409,7 @@ describe('throttle operator', () => { }) }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop diff --git a/spec/operators/timeoutWith-spec.ts b/spec/operators/timeoutWith-spec.ts index 1b36aa161d..368a10383a 100644 --- a/spec/operators/timeoutWith-spec.ts +++ b/spec/operators/timeoutWith-spec.ts @@ -276,8 +276,7 @@ describe('timeoutWith operator', () => { }); }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop diff --git a/src/internal/operators/multicast.ts b/src/internal/operators/multicast.ts index 5f3271516f..799cab3dfe 100644 --- a/src/internal/operators/multicast.ts +++ b/src/internal/operators/multicast.ts @@ -48,7 +48,7 @@ export function multicast( // Intentionally terse code: Subscribe to the result of the selector, // then immediately connect the source through the subject, adding // that to the resulting subscription. The act of subscribing with `this`, - // the primary destination subscriber, will automatically add the subcription + // the primary destination subscriber, will automatically add the subscription // to the result. selector(subject).subscribe(subscriber).add(source.subscribe(subject)); }); diff --git a/src/internal/operators/throttle.ts b/src/internal/operators/throttle.ts index 6ccceb70a6..1a7594aa80 100644 --- a/src/internal/operators/throttle.ts +++ b/src/internal/operators/throttle.ts @@ -95,15 +95,20 @@ export function throttle( source.subscribe( new OperatorSubscriber( subscriber, + // Regarding the presence of throttled.closed in the following + // conditions, if a synchronous duration selector is specified - weird, + // but legal - an already-closed subscription will be assigned to + // throttled, so the subscription's closed property needs to be checked, + // too. (value) => { hasValue = true; sendValue = value; - !throttled && (leading ? send() : throttle(value)); + !(throttled && !throttled.closed) && (leading ? send() : throttle(value)); }, undefined, () => { isComplete = true; - !(trailing && hasValue && throttled) && subscriber.complete(); + !(trailing && hasValue && throttled && !throttled.closed) && subscriber.complete(); } ) );