From cb9865298945e2f26550ef858608c0f519693595 Mon Sep 17 00:00:00 2001 From: Kimmo Kiiski Date: Tue, 29 Nov 2016 06:32:31 +0200 Subject: [PATCH] fix(race): unsubscribe raced observables with immediate scheduler Fixed the bug where racing an Observable that emits immediately, with default scheduler, leaves other raced Observables unsubscribed forever. Now immediate emit makes the race ignore following Observables. --- spec/operators/race-spec.ts | 36 ++++++++++++++++++++++++++++++++++++ src/operator/race.ts | 4 ++-- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/spec/operators/race-spec.ts b/spec/operators/race-spec.ts index f87dd15177..ac507cc60a 100644 --- a/spec/operators/race-spec.ts +++ b/spec/operators/race-spec.ts @@ -1,4 +1,5 @@ import {expect} from 'chai'; +import * as sinon from 'sinon'; import * as Rx from '../../dist/cjs/Rx'; declare const {hot, cold, expectObservable, expectSubscriptions}; @@ -171,4 +172,39 @@ describe('Observable.prototype.race', () => { expect(x).to.be.true; }, done, done); }); + + it('should ignore latter observables if a former one emits immediately', () => { + const onNext = sinon.spy(); + const onSubscribe = sinon.spy(); + const e1 = Observable.of('a'); // Wins the race + const e2 = Observable.defer(onSubscribe); // Should be ignored + + e1.race(e2).subscribe(onNext); + expect(onNext.calledWithExactly('a')).to.be.true; + expect(onSubscribe.called).to.be.false; + }); + + it('should unsubscribe former observables if a latter one emits immediately', () => { + const onNext = sinon.spy(); + const onUnsubscribe = sinon.spy(); + const e1 = Observable.never().finally(onUnsubscribe); // Should be unsubscribed + const e2 = Observable.of('b'); // Wins the race + + e1.race(e2).subscribe(onNext); + expect(onNext.calledWithExactly('b')).to.be.true; + expect(onUnsubscribe.calledOnce).to.be.true; + }); + + it('should unsubscribe from immediately emitting observable on unsubscription', () => { + const onNext = sinon.spy(); + const onUnsubscribe = sinon.spy(); + const e1 = Observable.never().startWith('a').finally(onUnsubscribe); // Wins the race + const e2 = Observable.never(); // Loses the race + + const subscription = e1.race(e2).subscribe(onNext); + expect(onNext.calledWithExactly('a')).to.be.true; + expect(onUnsubscribe.called).to.be.false; + subscription.unsubscribe(); + expect(onUnsubscribe.calledOnce).to.be.true; + }); }); diff --git a/src/operator/race.ts b/src/operator/race.ts index 453c934604..474f64531e 100644 --- a/src/operator/race.ts +++ b/src/operator/race.ts @@ -85,14 +85,14 @@ export class RaceSubscriber extends OuterSubscriber { if (len === 0) { this.destination.complete(); } else { - for (let i = 0; i < len; i++) { + for (let i = 0; i < len && !this.hasFirst; i++) { let observable = observables[i]; let subscription = subscribeToResult(this, observable, observable, i); if (this.subscriptions) { this.subscriptions.push(subscription); - this.add(subscription); } + this.add(subscription); } this.observables = null; }