Skip to content

Commit

Permalink
fix(race): unsubscribe raced observables with immediate scheduler
Browse files Browse the repository at this point in the history
Fixed the bug where racing an Observable that emits immediately,
with default scheduler, leaves other raced Observables unsubscribed
forever. Now immediate emit makes the race ignore following Observables.
  • Loading branch information
ktkiiski committed Nov 29, 2016
1 parent f51b8f9 commit cb98652
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
36 changes: 36 additions & 0 deletions spec/operators/race-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {expect} from 'chai';
import * as sinon from 'sinon';
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, expectObservable, expectSubscriptions};

Expand Down Expand Up @@ -171,4 +172,39 @@ describe('Observable.prototype.race', () => {
expect(x).to.be.true;
}, done, done);
});

it('should ignore latter observables if a former one emits immediately', () => {
const onNext = sinon.spy();
const onSubscribe = sinon.spy();
const e1 = Observable.of('a'); // Wins the race
const e2 = Observable.defer(onSubscribe); // Should be ignored

e1.race(e2).subscribe(onNext);
expect(onNext.calledWithExactly('a')).to.be.true;
expect(onSubscribe.called).to.be.false;
});

it('should unsubscribe former observables if a latter one emits immediately', () => {
const onNext = sinon.spy();
const onUnsubscribe = sinon.spy();
const e1 = Observable.never<string>().finally(onUnsubscribe); // Should be unsubscribed
const e2 = Observable.of('b'); // Wins the race

e1.race(e2).subscribe(onNext);
expect(onNext.calledWithExactly('b')).to.be.true;
expect(onUnsubscribe.calledOnce).to.be.true;
});

it('should unsubscribe from immediately emitting observable on unsubscription', () => {
const onNext = sinon.spy();
const onUnsubscribe = sinon.spy();
const e1 = Observable.never<string>().startWith('a').finally(onUnsubscribe); // Wins the race
const e2 = Observable.never<string>(); // Loses the race

const subscription = e1.race(e2).subscribe(onNext);
expect(onNext.calledWithExactly('a')).to.be.true;
expect(onUnsubscribe.called).to.be.false;
subscription.unsubscribe();
expect(onUnsubscribe.calledOnce).to.be.true;
});
});
4 changes: 2 additions & 2 deletions src/operator/race.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
if (len === 0) {
this.destination.complete();
} else {
for (let i = 0; i < len; i++) {
for (let i = 0; i < len && !this.hasFirst; i++) {
let observable = observables[i];
let subscription = subscribeToResult(this, observable, observable, i);

if (this.subscriptions) {
this.subscriptions.push(subscription);
this.add(subscription);
}
this.add(subscription);
}
this.observables = null;
}
Expand Down

0 comments on commit cb98652

Please sign in to comment.