Skip to content

Commit

Permalink
feat: add lastValueFrom and firstValueFrom methods (#5295)
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh authored Mar 30, 2020
1 parent 2979ec1 commit e69b765
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 0 deletions.
8 changes: 8 additions & 0 deletions spec-dtslint/firstValueFrom-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { firstValueFrom } from 'rxjs';
import { a$ } from 'helpers';

describe('firstValueFrom', () => {
const r0 = firstValueFrom(a$); // $ExpectType Promise<A>
const r1 = firstValueFrom(); // $ExpectError
const r2 = firstValueFrom(Promise.resolve(42)); // $ExpectError
});
8 changes: 8 additions & 0 deletions spec-dtslint/lastValueFrom-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { lastValueFrom } from 'rxjs';
import { a$ } from 'helpers';

describe('lastValueFrom', () => {
const r0 = lastValueFrom(a$); // $ExpectType Promise<A>
const r1 = lastValueFrom(); // $ExpectError
const r2 = lastValueFrom(Promise.resolve(42)); // $ExpectError
});
44 changes: 44 additions & 0 deletions spec/firstValueFrom-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { interval, firstValueFrom, EMPTY, EmptyError, throwError, of } from 'rxjs';
import { expect } from 'chai';
import { finalize } from 'rxjs/operators';

describe('firstValueFrom', () => {
it('should emit the first value as a promise', async () => {
let finalized = false;
const source = interval(10).pipe(finalize(() => (finalized = true)));
const result = await firstValueFrom(source);
expect(result).to.equal(0);
expect(finalized).to.be.true;
});

it('should error for empty observables', async () => {
const source = EMPTY;
let error: any = null;
try {
await firstValueFrom(source);
} catch (err) {
error = err;
}
expect(error).to.be.an.instanceOf(EmptyError);
});

it('should error for errored observables', async () => {
const source = throwError(new Error('blorp!'));
let error: any = null;
try {
await firstValueFrom(source);
} catch (err) {
error = err;
}
expect(error).to.be.an.instanceOf(Error);
expect(error.message).to.equal('blorp!');
});

it('should work with a synchronous observable', async () => {
let finalized = false;
const source = of('apples', 'bananas').pipe(finalize(() => (finalized = true)));
const result = await firstValueFrom(source);
expect(result).to.equal('apples');
expect(finalized).to.be.true;
});
});
13 changes: 13 additions & 0 deletions spec/helpers/test-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ import { iterator } from 'rxjs/internal/symbol/iterator';
import * as sinon from 'sinon';
import { expect } from 'chai';

if (process && process.on) {
/**
* With async/await functions in Node, mocha seems to allow
* tests to pass, even they shouldn't there's something about how
* it handles the rejected promise where it does not notice
* that the test failed.
*/
process.on('unhandledRejection', err => {
console.error(err);
process.exit(1);
});
}

export function lowerCaseO<T>(...args: Array<any>): Observable<T> {
const o: any = {
subscribe(observer: any) {
Expand Down
47 changes: 47 additions & 0 deletions spec/lastValueFrom-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { interval, lastValueFrom, EMPTY, EmptyError, throwError, of } from 'rxjs';
import { expect } from 'chai';
import { finalize, take } from 'rxjs/operators';

describe('lastValueFrom', () => {
it('should emit the last value as a promise', async () => {
let finalized = false;
const source = interval(2).pipe(
take(10),
finalize(() => (finalized = true))
);
const result = await lastValueFrom(source);
expect(result).to.equal(9);
expect(finalized).to.be.true;
});

it('should error for empty observables', async () => {
const source = EMPTY;
let error: any = null;
try {
await lastValueFrom(source);
} catch (err) {
error = err;
}
expect(error).to.be.an.instanceOf(EmptyError);
});

it('should error for errored observables', async () => {
const source = throwError(new Error('blorp!'));
let error: any = null;
try {
await lastValueFrom(source);
} catch (err) {
error = err;
}
expect(error).to.be.an.instanceOf(Error);
expect(error.message).to.equal('blorp!');
});

it('should work with a synchronous observable', async () => {
let finalized = false;
const source = of('apples', 'bananas').pipe(finalize(() => (finalized = true)));
const result = await lastValueFrom(source);
expect(result).to.equal('bananas');
expect(finalized).to.be.true;
});
});
4 changes: 4 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ export { noop } from './internal/util/noop';
export { identity } from './internal/util/identity';
export { isObservable } from './internal/util/isObservable';

/* Promise Conversion */
export { lastValueFrom } from './internal/lastValueFrom';
export { firstValueFrom } from './internal/firstValueFrom';

/* Error types */
export { ArgumentOutOfRangeError } from './internal/util/ArgumentOutOfRangeError';
export { EmptyError } from './internal/util/EmptyError';
Expand Down
54 changes: 54 additions & 0 deletions src/internal/firstValueFrom.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { Observable } from './Observable';
import { EmptyError } from './util/EmptyError';
import { Subscription } from './Subscription';

/**
* Converts an observable to a promise by subscribing to the observable,
* and returning a promise that will resolve as soon as the first value
* arrives from the observable. The subscription will then be closed.
*
* If the observable stream completes before any values were emitted, the
* returned promise will reject with {@link EmptyError}.
*
* If the observable stream emits an error, the returned promise will reject
* with that error.
*
* ### Example
*
* Wait for the first value from a stream and emit it from a promise in
* an async function.
*
* ```ts
* import { interval, firstValueFrom } from 'rxjs';
*
* async function execute() {
* const source$ = interval(2000);
* const firstNumber = await firstValueFrom(source);
* console.log(`The first number is ${firstNumber}`);
* }
*
* execute();
*
* // Expected output:
* // "The first number is 0"
* ```
*
* @param source the observable to convert to a promise
*/
export function firstValueFrom<T>(source$: Observable<T>) {
return new Promise<T>((resolve, reject) => {
const subs = new Subscription();
subs.add(
source$.subscribe({
next: value => {
resolve(value);
subs.unsubscribe();
},
error: reject,
complete: () => {
reject(new EmptyError());
},
})
);
});
}
57 changes: 57 additions & 0 deletions src/internal/lastValueFrom.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { Observable } from './Observable';
import { EmptyError } from './util/EmptyError';

/**
* Converts an observable to a promise by subscribing to the observable,
* waiting for it to complete, and resolving the returned promise with the
* last value from the observed stream.
*
* If the observable stream completes before any values were emitted, the
* returned promise will reject with {@link EmptyError}.
*
* If the observable stream emits an error, the returned promise will reject
* with that error.
*
* ### Example
*
* Wait for the last value from a stream and emit it from a promise in
* an async function.
*
* ```ts
* import { interval, lastValueFrom } from 'rxjs';
* import { take } from 'rxjs/operators';
*
* async function execute() {
* const source$ = interval(2000).pipe(take(10));
* const finalNumber = await lastValueFrom(source);
* console.log(`The final number is ${finalNumber}`);
* }
*
* execute();
*
* // Expected output:
* // "The final number is 9"
* ```
*
* @param source the observable to convert to a promise
*/
export function lastValueFrom<T>(source: Observable<T>) {
return new Promise<T>((resolve, reject) => {
let _hasValue = false;
let _value: T;
source.subscribe({
next: value => {
_value = value;
_hasValue = true;
},
error: reject,
complete: () => {
if (_hasValue) {
resolve(_value);
} else {
reject(new EmptyError());
}
},
});
});
}

0 comments on commit e69b765

Please sign in to comment.