diff --git a/spec/asynciterable/from-spec.ts b/spec/asynciterable/from-spec.ts index 130c6e8e..f0d4fa80 100644 --- a/spec/asynciterable/from-spec.ts +++ b/spec/asynciterable/from-spec.ts @@ -2,6 +2,7 @@ import * as Ix from '../Ix'; import * as test from 'tape-async'; const { from } = Ix.AsyncIterable; import { hasNext, noNext } from '../asynciterablehelpers'; +import { setInterval, clearInterval } from 'timers'; test('AsyncIterable#from from promise list', async t => { const xs: Iterable> = [ @@ -206,6 +207,50 @@ test('AsyncIterable#fromObservable with completion', async t => { t.end(); }); +test('AsyncIterable#fromObservable without completion', async t => { + const xs = new TestObservable(obs => { + let count = 0; + const interval = setInterval(() => { + obs.next(count++); + if (count === 3) { + clearInterval(interval); + obs.complete(); + } + }, 10); + return new EmptySubscription(); + }); + const ys = from(xs); + + const it = ys[Symbol.asyncIterator](); + await hasNext(t, it, 0); + await hasNext(t, it, 1); + await hasNext(t, it, 2); + await noNext(t, it); + t.end(); +}); + +test('AsyncIterable#fromObservable without completion', async t => { + const xs = new TestObservable(obs => { + let count = 0; + const interval = setInterval(() => { + obs.next(count++); + if (count === 3) { + clearInterval(interval); + obs.complete(); + } + }, 10); + return new EmptySubscription(); + }); + const ys = from(xs, (x, i) => x + i); + + const it = ys[Symbol.asyncIterator](); + await hasNext(t, it, 0); + await hasNext(t, it, 2); + await hasNext(t, it, 4); + await noNext(t, it); + t.end(); +}); + test('AsyncIterable#fromObservable with error', async t => { const err = new Error(); const xs = new TestObservable(obs => { diff --git a/src/asynciterable/asynciterablex.ts b/src/asynciterable/asynciterablex.ts index 79906043..56bc7885 100644 --- a/src/asynciterable/asynciterablex.ts +++ b/src/asynciterable/asynciterablex.ts @@ -1,3 +1,4 @@ +import { AsyncSink } from './../asyncsink'; import { OperatorAsyncFunction } from '../interfaces'; import { bindCallback } from '../internal/bindcallback'; import { identityAsync } from '../internal/identity'; @@ -131,42 +132,6 @@ class FromPromiseIterable extends AsyncIterableX { - public values: TSource[]; - public hasError: boolean; - public hasCompleted: boolean; - public errorValue: any; - public closed: boolean; - - constructor() { - this.values = []; - this.hasCompleted = false; - this.hasError = false; - this.errorValue = null; - this.closed = false; - } - - next(value: TSource) { - if (!this.closed) { - this.values.push(value); - } - } - - error(err: any) { - if (!this.closed) { - this.closed = true; - this.hasError = true; - this.errorValue = err; - } - } - - complete() { - if (!this.closed) { - this.closed = true; - } - } -} - class FromObservableAsyncIterable extends AsyncIterableX { private _observable: Observable; private _selector: (value: TSource, index: number) => TResult | Promise; @@ -181,21 +146,26 @@ class FromObservableAsyncIterable extends AsyncItera } async *[Symbol.asyncIterator]() { - const observer = new AsyncObserver(); - const subscription = this._observable.subscribe(observer); + const sink: AsyncSink = new AsyncSink(); + const subscription = this._observable.subscribe({ + next(value: TSource) { + sink.write(value); + }, + error(err: any) { + sink.error(err); + }, + complete() { + sink.end(); + } + }); let i = 0; - while (1) { - if (observer.values.length > 0) { - yield await this._selector(observer.values.shift()!, i++); - } else if (observer.closed) { - subscription.unsubscribe(); - if (observer.hasError) { - throw observer.errorValue; - } else { - break; - } + try { + for (let next; !(next = await sink.next()).done; ) { + yield await this._selector(next.value!, i++); } + } finally { + subscription.unsubscribe(); } } }