From 5a0f5016d42fafe95d422088cb240614d8f60d1f Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Thu, 24 Sep 2020 11:31:08 +1000 Subject: [PATCH 1/7] test: enable timeoutWith firehose test --- spec/operators/timeoutWith-spec.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spec/operators/timeoutWith-spec.ts b/spec/operators/timeoutWith-spec.ts index 1b36aa161d..368a10383a 100644 --- a/spec/operators/timeoutWith-spec.ts +++ b/spec/operators/timeoutWith-spec.ts @@ -276,8 +276,7 @@ describe('timeoutWith operator', () => { }); }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop From e06e32cb28642650b4c2b7a4943afd3518cab6d0 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Thu, 24 Sep 2020 11:36:03 +1000 Subject: [PATCH 2/7] test: enable repeat/retry firehose tests --- spec/operators/repeat-spec.ts | 3 +-- spec/operators/repeatWhen-spec.ts | 3 +-- spec/operators/retry-spec.ts | 3 +-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/spec/operators/repeat-spec.ts b/spec/operators/repeat-spec.ts index b245b35536..c7440012f2 100644 --- a/spec/operators/repeat-spec.ts +++ b/spec/operators/repeat-spec.ts @@ -281,8 +281,7 @@ describe('repeat operator', () => { }); }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop diff --git a/spec/operators/repeatWhen-spec.ts b/spec/operators/repeatWhen-spec.ts index af62704c98..878c8260dd 100644 --- a/spec/operators/repeatWhen-spec.ts +++ b/spec/operators/repeatWhen-spec.ts @@ -408,8 +408,7 @@ describe('repeatWhen operator', () => { expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'complete', 'teardown']) }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop diff --git a/spec/operators/retry-spec.ts b/spec/operators/retry-spec.ts index 29dd295783..484a3659c2 100644 --- a/spec/operators/retry-spec.ts +++ b/spec/operators/retry-spec.ts @@ -308,8 +308,7 @@ describe('retry operator', () => { }); }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop From f2d655802a9e08e2380d152f8928c193e6844ed3 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Thu, 24 Sep 2020 11:54:39 +1000 Subject: [PATCH 3/7] test: fix skipLast firehose expectation --- spec/operators/skipLast-spec.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/spec/operators/skipLast-spec.ts b/spec/operators/skipLast-spec.ts index f29b563f6b..6b491ea1fd 100644 --- a/spec/operators/skipLast-spec.ts +++ b/spec/operators/skipLast-spec.ts @@ -148,8 +148,7 @@ describe('skipLast operator', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop @@ -165,6 +164,9 @@ describe('skipLast operator', () => { take(3), ).subscribe(() => { /* noop */ }); - expect(sideEffects).to.deep.equal([0, 1, 2]); + // This expectation might seem a little strange, but the implementation of + // skipLast works by eating the number of elements that are to be skipped, + // so it will consume the number skipped in addition to the number taken. + expect(sideEffects).to.deep.equal([0, 1, 2, 3]); }); }); From cc7ed4b2240359c512ff4022b97182599e9783ed Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Thu, 24 Sep 2020 12:23:51 +1000 Subject: [PATCH 4/7] test: enable throttle firehose test --- spec/operators/throttle-spec.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spec/operators/throttle-spec.ts b/spec/operators/throttle-spec.ts index 140ce6c9c1..fa91f65003 100644 --- a/spec/operators/throttle-spec.ts +++ b/spec/operators/throttle-spec.ts @@ -409,8 +409,7 @@ describe('throttle operator', () => { }) }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop From 48380d554234ddab66b77b45f92bef3be90aacaf Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Thu, 24 Sep 2020 12:24:09 +1000 Subject: [PATCH 5/7] fix(throttle): support sync duration selector --- src/internal/operators/throttle.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/internal/operators/throttle.ts b/src/internal/operators/throttle.ts index 6ccceb70a6..1a7594aa80 100644 --- a/src/internal/operators/throttle.ts +++ b/src/internal/operators/throttle.ts @@ -95,15 +95,20 @@ export function throttle( source.subscribe( new OperatorSubscriber( subscriber, + // Regarding the presence of throttled.closed in the following + // conditions, if a synchronous duration selector is specified - weird, + // but legal - an already-closed subscription will be assigned to + // throttled, so the subscription's closed property needs to be checked, + // too. (value) => { hasValue = true; sendValue = value; - !throttled && (leading ? send() : throttle(value)); + !(throttled && !throttled.closed) && (leading ? send() : throttle(value)); }, undefined, () => { isComplete = true; - !(trailing && hasValue && throttled) && subscriber.complete(); + !(trailing && hasValue && throttled && !throttled.closed) && subscriber.complete(); } ) ); From d5f15e1e89caab2e719888a6b84213b7fe17a2eb Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Thu, 24 Sep 2020 12:32:04 +1000 Subject: [PATCH 6/7] chore: fix spelling in comment --- src/internal/operators/multicast.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/internal/operators/multicast.ts b/src/internal/operators/multicast.ts index 5f3271516f..799cab3dfe 100644 --- a/src/internal/operators/multicast.ts +++ b/src/internal/operators/multicast.ts @@ -48,7 +48,7 @@ export function multicast( // Intentionally terse code: Subscribe to the result of the selector, // then immediately connect the source through the subject, adding // that to the resulting subscription. The act of subscribing with `this`, - // the primary destination subscriber, will automatically add the subcription + // the primary destination subscriber, will automatically add the subscription // to the result. selector(subject).subscribe(subscriber).add(source.subscribe(subject)); }); From 2a0aa7e77e4f4b69d26bf8c7bd0ecad537f198c7 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Thu, 24 Sep 2020 12:51:13 +1000 Subject: [PATCH 7/7] test: add comment re: closed chain breakage Closes #5658. --- spec/operators/multicast-spec.ts | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/spec/operators/multicast-spec.ts b/spec/operators/multicast-spec.ts index 1f42e3ebfc..78ba78abf3 100644 --- a/spec/operators/multicast-spec.ts +++ b/spec/operators/multicast-spec.ts @@ -702,6 +702,22 @@ describe('multicast operator', () => { }); // TODO: fix firehose unsubscription + // AFAICT, it's not possible for multicast observables to support ASAP + // unsubscription from synchronous firehose sources. The problem is that the + // chaining of the closed 'signal' is broken by the subject. For example, + // here: + // + // https://github.com/ReactiveX/rxjs/blob/2d5e4d5bd7b684a912485e1c1583ba3d41c8308e/src/internal/operators/multicast.ts#L53 + // + // The subject is passed to subscribe. However, in the subscribe + // implementation a SafeSubcriber is created with the subject as the + // observer: + // + // https://github.com/ReactiveX/rxjs/blob/2d5e4d5bd7b684a912485e1c1583ba3d41c8308e/src/internal/Observable.ts#L210 + // + // That breaks the chaining of closed - i.e. even if the unsubscribe is + // called on the subject, closing it, the SafeSubscriber's closed property + // won't reflect that. it.skip('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => {