Skip to content

Commit

Permalink
fix(IteratorObservable): Observables from generators will now final…
Browse files Browse the repository at this point in the history
…ize when subscription ends

In order to model the behavior of `for..of` when consuming a Generator, if a `break` is hit in the `for..of`, `return()` is called on the generator and the generator will jump to a `finally` block if it has one. Observables created from generators will now have this same behavior.
```js
Observable.from((function* () {
  try {
    yield 1;
    yield 2;
    yield 3;
  } finally {
    console.log('finalized');
  }
})())
.take(2)
.subscribe(x => console.log(x));

// should log
// 1
// 2
// finalized
```

fixes #1938
  • Loading branch information
benlesh committed Oct 14, 2016
1 parent 7ee0575 commit 22d286a
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 0 deletions.
63 changes: 63 additions & 0 deletions spec/observables/IteratorObservable-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import {queue} from '../../dist/cjs/scheduler/queue';
import {IteratorObservable} from '../../dist/cjs/observable/IteratorObservable';

declare const expectObservable;
Expand Down Expand Up @@ -46,6 +47,68 @@ describe('IteratorObservable', () => {
);
});

it('should finalize generators if the subscription ends', () => {
const iterator = {
finalized: false,
next() {
return { value: 'duck', done: false };
},
return() {
this.finalized = true;
}
};

const iterable = {
[Rx.Symbol.iterator]() {
return iterator;
}
};

const results = [];

IteratorObservable.create(iterable)
.take(3)
.subscribe(
x => results.push(x),
null,
() => results.push('GOOSE!')
);

expect(results).to.deep.equal(['duck', 'duck', 'duck', 'GOOSE!']);
expect(iterator.finalized).to.be.true;
});

it('should finalize generators if the subscription and it is scheduled', () => {
const iterator = {
finalized: false,
next() {
return { value: 'duck', done: false };
},
return() {
this.finalized = true;
}
};

const iterable = {
[Rx.Symbol.iterator]() {
return iterator;
}
};

const results = [];

IteratorObservable.create(iterable, queue)
.take(3)
.subscribe(
x => results.push(x),
null,
() => results.push('GOOSE!')
);

expect(results).to.deep.equal(['duck', 'duck', 'duck', 'GOOSE!']);
expect(iterator.finalized).to.be.true;
});

it('should emit members of an array iterator on a particular scheduler', () => {
const source = IteratorObservable.create(
[10, 20, 30, 40],
Expand Down
6 changes: 6 additions & 0 deletions src/observable/IteratorObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ export class IteratorObservable<T> extends Observable<T> {
state.index = index + 1;

if (subscriber.closed) {
if (typeof iterator.return === 'function') {
iterator.return();
}
return;
}

Expand Down Expand Up @@ -71,6 +74,9 @@ export class IteratorObservable<T> extends Observable<T> {
subscriber.next(result.value);
}
if (subscriber.closed) {
if (typeof iterator.return === 'function') {
iterator.return();
}
break;
}
} while (true);
Expand Down

0 comments on commit 22d286a

Please sign in to comment.