From dbdbaecbb08113b9186338302562bb71eefba166 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Fri, 26 Feb 2021 01:47:39 -0600 Subject: [PATCH] fix(throttle): no longer emits more than necessary in sync/sync trailing case Resolves an edge case where if a user provided a synchronous source and a synchronous notifier, using a trailing behavior, the returned observable would get caught in a loop emitting the final value over and over again. fixes: #6058 --- spec/operators/throttle-spec.ts | 16 +++++++++++++++- src/internal/operators/throttle.ts | 13 +++++++++---- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/spec/operators/throttle-spec.ts b/spec/operators/throttle-spec.ts index a854abdc95..1fe2cf68dd 100644 --- a/spec/operators/throttle-spec.ts +++ b/spec/operators/throttle-spec.ts @@ -4,7 +4,7 @@ import { throttle, mergeMap, mapTo, take } from 'rxjs/operators'; import { of, concat, timer, Observable } from 'rxjs'; /** @test {throttle} */ -describe('throttle operator', () => { +describe('throttle', () => { it('should immediately emit the first value in each time window', () => { const e1 = hot('-a-xy-----b--x--cxxx-|'); const e1subs = '^ !'; @@ -21,6 +21,20 @@ describe('throttle operator', () => { expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + it('should handle sync source with sync notifier and trailing appropriately', () => { + let results: any[] = []; + const source = of(1).pipe( + throttle(() => of(1), { leading: false, trailing: true }) + ); + + source.subscribe({ + next: value => results.push(value), + complete: () => results.push('done') + }); + + expect(results).to.deep.equal([1, 'done']) + }); + it('should simply mirror the source if values are not emitted often enough', () => { const e1 = hot('-a--------b-----c----|'); const e1subs = '^ !'; diff --git a/src/internal/operators/throttle.ts b/src/internal/operators/throttle.ts index af81fd0f2d..d8c221d6a0 100644 --- a/src/internal/operators/throttle.ts +++ b/src/internal/operators/throttle.ts @@ -90,11 +90,16 @@ export function throttle( const send = () => { if (hasValue) { - subscriber.next(sendValue!); - !isComplete && startThrottle(sendValue!); + // Ensure we clear out our value and hasValue flag + // before we emit, otherwise reentrant code can cause + // issues here. + hasValue = false; + const value = sendValue!; + sendValue = null; + // Emit the value. + subscriber.next(value); + !isComplete && startThrottle(value); } - hasValue = false; - sendValue = null; }; source.subscribe(