diff --git a/spec-dtslint/firstValueFrom-spec.ts b/spec-dtslint/firstValueFrom-spec.ts new file mode 100644 index 0000000000..7711a2d90a --- /dev/null +++ b/spec-dtslint/firstValueFrom-spec.ts @@ -0,0 +1,8 @@ +import { firstValueFrom } from 'rxjs'; +import { a$ } from 'helpers'; + +describe('firstValueFrom', () => { + const r0 = firstValueFrom(a$); // $ExpectType Promise + const r1 = firstValueFrom(); // $ExpectError + const r2 = firstValueFrom(Promise.resolve(42)); // $ExpectError +}); diff --git a/spec-dtslint/lastValueFrom-spec.ts b/spec-dtslint/lastValueFrom-spec.ts new file mode 100644 index 0000000000..2228b8cfcc --- /dev/null +++ b/spec-dtslint/lastValueFrom-spec.ts @@ -0,0 +1,8 @@ +import { lastValueFrom } from 'rxjs'; +import { a$ } from 'helpers'; + +describe('lastValueFrom', () => { + const r0 = lastValueFrom(a$); // $ExpectType Promise + const r1 = lastValueFrom(); // $ExpectError + const r2 = lastValueFrom(Promise.resolve(42)); // $ExpectError +}); diff --git a/spec/firstValueFrom-spec.ts b/spec/firstValueFrom-spec.ts new file mode 100644 index 0000000000..7c6c691aca --- /dev/null +++ b/spec/firstValueFrom-spec.ts @@ -0,0 +1,44 @@ +import { interval, firstValueFrom, EMPTY, EmptyError, throwError, of } from 'rxjs'; +import { expect } from 'chai'; +import { finalize } from 'rxjs/operators'; + +describe('firstValueFrom', () => { + it('should emit the first value as a promise', async () => { + let finalized = false; + const source = interval(10).pipe(finalize(() => (finalized = true))); + const result = await firstValueFrom(source); + expect(result).to.equal(0); + expect(finalized).to.be.true; + }); + + it('should error for empty observables', async () => { + const source = EMPTY; + let error: any = null; + try { + await firstValueFrom(source); + } catch (err) { + error = err; + } + expect(error).to.be.an.instanceOf(EmptyError); + }); + + it('should error for errored observables', async () => { + const source = throwError(new Error('blorp!')); + let error: any = null; + try { + await firstValueFrom(source); + } catch (err) { + error = err; + } + expect(error).to.be.an.instanceOf(Error); + expect(error.message).to.equal('blorp!'); + }); + + it('should work with a synchronous observable', async () => { + let finalized = false; + const source = of('apples', 'bananas').pipe(finalize(() => (finalized = true))); + const result = await firstValueFrom(source); + expect(result).to.equal('apples'); + expect(finalized).to.be.true; + }); +}); diff --git a/spec/helpers/test-helper.ts b/spec/helpers/test-helper.ts index b9af36acf4..e37924a9c2 100644 --- a/spec/helpers/test-helper.ts +++ b/spec/helpers/test-helper.ts @@ -7,6 +7,19 @@ import { iterator } from 'rxjs/internal/symbol/iterator'; import * as sinon from 'sinon'; import { expect } from 'chai'; +if (process && process.on) { + /** + * With async/await functions in Node, mocha seems to allow + * tests to pass, even they shouldn't there's something about how + * it handles the rejected promise where it does not notice + * that the test failed. + */ + process.on('unhandledRejection', err => { + console.error(err); + process.exit(1); + }); +} + export function lowerCaseO(...args: Array): Observable { const o: any = { subscribe(observer: any) { diff --git a/spec/lastValueFrom-spec.ts b/spec/lastValueFrom-spec.ts new file mode 100644 index 0000000000..a4540b8dab --- /dev/null +++ b/spec/lastValueFrom-spec.ts @@ -0,0 +1,47 @@ +import { interval, lastValueFrom, EMPTY, EmptyError, throwError, of } from 'rxjs'; +import { expect } from 'chai'; +import { finalize, take } from 'rxjs/operators'; + +describe('lastValueFrom', () => { + it('should emit the last value as a promise', async () => { + let finalized = false; + const source = interval(2).pipe( + take(10), + finalize(() => (finalized = true)) + ); + const result = await lastValueFrom(source); + expect(result).to.equal(9); + expect(finalized).to.be.true; + }); + + it('should error for empty observables', async () => { + const source = EMPTY; + let error: any = null; + try { + await lastValueFrom(source); + } catch (err) { + error = err; + } + expect(error).to.be.an.instanceOf(EmptyError); + }); + + it('should error for errored observables', async () => { + const source = throwError(new Error('blorp!')); + let error: any = null; + try { + await lastValueFrom(source); + } catch (err) { + error = err; + } + expect(error).to.be.an.instanceOf(Error); + expect(error.message).to.equal('blorp!'); + }); + + it('should work with a synchronous observable', async () => { + let finalized = false; + const source = of('apples', 'bananas').pipe(finalize(() => (finalized = true))); + const result = await lastValueFrom(source); + expect(result).to.equal('bananas'); + expect(finalized).to.be.true; + }); +}); diff --git a/src/index.ts b/src/index.ts index 09f8c5df4e..80f5c4f38b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -33,6 +33,10 @@ export { noop } from './internal/util/noop'; export { identity } from './internal/util/identity'; export { isObservable } from './internal/util/isObservable'; +/* Promise Conversion */ +export { lastValueFrom } from './internal/lastValueFrom'; +export { firstValueFrom } from './internal/firstValueFrom'; + /* Error types */ export { ArgumentOutOfRangeError } from './internal/util/ArgumentOutOfRangeError'; export { EmptyError } from './internal/util/EmptyError'; diff --git a/src/internal/firstValueFrom.ts b/src/internal/firstValueFrom.ts new file mode 100644 index 0000000000..7b5b61bae0 --- /dev/null +++ b/src/internal/firstValueFrom.ts @@ -0,0 +1,54 @@ +import { Observable } from './Observable'; +import { EmptyError } from './util/EmptyError'; +import { Subscription } from './Subscription'; + +/** + * Converts an observable to a promise by subscribing to the observable, + * and returning a promise that will resolve as soon as the first value + * arrives from the observable. The subscription will then be closed. + * + * If the observable stream completes before any values were emitted, the + * returned promise will reject with {@link EmptyError}. + * + * If the observable stream emits an error, the returned promise will reject + * with that error. + * + * ### Example + * + * Wait for the first value from a stream and emit it from a promise in + * an async function. + * + * ```ts + * import { interval, firstValueFrom } from 'rxjs'; + * + * async function execute() { + * const source$ = interval(2000); + * const firstNumber = await firstValueFrom(source); + * console.log(`The first number is ${firstNumber}`); + * } + * + * execute(); + * + * // Expected output: + * // "The first number is 0" + * ``` + * + * @param source the observable to convert to a promise + */ +export function firstValueFrom(source$: Observable) { + return new Promise((resolve, reject) => { + const subs = new Subscription(); + subs.add( + source$.subscribe({ + next: value => { + resolve(value); + subs.unsubscribe(); + }, + error: reject, + complete: () => { + reject(new EmptyError()); + }, + }) + ); + }); +} diff --git a/src/internal/lastValueFrom.ts b/src/internal/lastValueFrom.ts new file mode 100644 index 0000000000..c4f8e681c7 --- /dev/null +++ b/src/internal/lastValueFrom.ts @@ -0,0 +1,57 @@ +import { Observable } from './Observable'; +import { EmptyError } from './util/EmptyError'; + +/** + * Converts an observable to a promise by subscribing to the observable, + * waiting for it to complete, and resolving the returned promise with the + * last value from the observed stream. + * + * If the observable stream completes before any values were emitted, the + * returned promise will reject with {@link EmptyError}. + * + * If the observable stream emits an error, the returned promise will reject + * with that error. + * + * ### Example + * + * Wait for the last value from a stream and emit it from a promise in + * an async function. + * + * ```ts + * import { interval, lastValueFrom } from 'rxjs'; + * import { take } from 'rxjs/operators'; + * + * async function execute() { + * const source$ = interval(2000).pipe(take(10)); + * const finalNumber = await lastValueFrom(source); + * console.log(`The final number is ${finalNumber}`); + * } + * + * execute(); + * + * // Expected output: + * // "The final number is 9" + * ``` + * + * @param source the observable to convert to a promise + */ +export function lastValueFrom(source: Observable) { + return new Promise((resolve, reject) => { + let _hasValue = false; + let _value: T; + source.subscribe({ + next: value => { + _value = value; + _hasValue = true; + }, + error: reject, + complete: () => { + if (_hasValue) { + resolve(_value); + } else { + reject(new EmptyError()); + } + }, + }); + }); +}