Skip to content

Commit

Permalink
feat(scheduled): Add scheduled creation method (ReactiveX#4595)
Browse files Browse the repository at this point in the history
- Simplifies code around common observable creation and subscription
- Removes `scalar` internal impl
- Deprecates a number of APIs that accept schedulers where we would rather people use `scheduled`.
  • Loading branch information
benlesh authored and BioPhoton committed May 15, 2019
1 parent f077b55 commit b4733d7
Show file tree
Hide file tree
Showing 27 changed files with 448 additions and 269 deletions.
48 changes: 48 additions & 0 deletions spec/helpers/observableMatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import * as _ from 'lodash';

function stringify(x: any): string {
return JSON.stringify(x, function (key: string, value: any) {
if (Array.isArray(value)) {
return '[' + value
.map(function (i) {
return '\n\t' + stringify(i);
}) + '\n]';
}
return value;
})
.replace(/\\"/g, '"')
.replace(/\\t/g, '\t')
.replace(/\\n/g, '\n');
}

function deleteErrorNotificationStack(marble: any) {
const { notification } = marble;
if (notification) {
const { kind, error } = notification;
if (kind === 'E' && error instanceof Error) {
notification.error = { name: error.name, message: error.message };
}
}
return marble;
}

export function observableMatcher(actual: any, expected: any) {
if (Array.isArray(actual) && Array.isArray(expected)) {
actual = actual.map(deleteErrorNotificationStack);
expected = expected.map(deleteErrorNotificationStack);
const passed = _.isEqual(actual, expected);
if (passed) {
return;
}

let message = '\nExpected \n';
actual.forEach((x: any) => message += `\t${stringify(x)}\n`);

message += '\t\nto deep equal \n';
expected.forEach((x: any) => message += `\t${stringify(x)}\n`);

chai.assert(passed, message);
} else {
chai.assert.deepEqual(actual, expected);
}
}
17 changes: 0 additions & 17 deletions spec/observables/ScalarObservable-spec.ts

This file was deleted.

9 changes: 2 additions & 7 deletions spec/observables/of-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect } from 'chai';
import { empty, of, Observable } from 'rxjs';
import { of, Observable } from 'rxjs';
import { expectObservable } from '../helpers/marble-testing';
import { TestScheduler } from 'rxjs/testing';
import { concatMap, delay, concatAll } from 'rxjs/operators';
Expand Down Expand Up @@ -33,18 +33,13 @@ describe('of', () => {
});
});

it('should return an empty observable if passed no values', () => {
const obs = of();
expect(obs).to.equal(empty());
});

it('should emit one value', (done: MochaDone) => {
let calls = 0;

of(42).subscribe((x: number) => {
expect(++calls).to.equal(1);
expect(x).to.equal(42);
}, (err: any) => {
}, (err: any) => {
done(new Error('should not be called'));
}, () => {
done();
Expand Down
63 changes: 63 additions & 0 deletions spec/scheduled/scheduled-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { scheduled, of } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { lowerCaseO } from '../helpers/test-helper';
import { observableMatcher } from '../helpers/observableMatcher';
import { expect } from 'chai';

describe('scheduled', () => {
let testScheduler: TestScheduler;

beforeEach(() => {
testScheduler = new TestScheduler(observableMatcher);
});

it('should schedule a sync observable', () => {
const input = of('a', 'b', 'c');
testScheduler.run(({ expectObservable }) => {
expectObservable(scheduled(input, testScheduler)).toBe('(abc|)');
});
});

it('should schedule an array', () => {
const input = ['a', 'b', 'c'];
testScheduler.run(({ expectObservable }) => {
expectObservable(scheduled(input, testScheduler)).toBe('(abc|)');
});
});

it('should schedule an iterable', () => {
const input = 'abc'; // strings are iterables
testScheduler.run(({ expectObservable }) => {
expectObservable(scheduled(input, testScheduler)).toBe('(abc|)');
});
});

it('should schedule an observable-like', () => {
const input = lowerCaseO('a', 'b', 'c'); // strings are iterables
testScheduler.run(({ expectObservable }) => {
expectObservable(scheduled(input, testScheduler)).toBe('(abc|)');
});
});

it('should schedule a promise', done => {
const results: any[] = [];
const input = Promise.resolve('x'); // strings are iterables
scheduled(input, testScheduler).subscribe({
next(value) { results.push(value); },
complete() { results.push('done'); },
});

expect(results).to.deep.equal([]);

// Promises force async, so we can't schedule synchronously, no matter what.
testScheduler.flush();
expect(results).to.deep.equal([]);

Promise.resolve().then(() => {
// NOW it should work, as the other promise should have resolved.
testScheduler.flush();
expect(results).to.deep.equal(['x', 'done']);
done();
});
});
});
4 changes: 2 additions & 2 deletions spec/util/subscribeToResult-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import $$symbolObservable from 'symbol-observable';
import { of, range, throwError } from 'rxjs';

describe('subscribeToResult', () => {
it('should synchronously complete when subscribe to scalarObservable', () => {
it('should synchronously complete when subscribed to scalarObservable', () => {
const result = of(42);
let expected: number;
const subscriber = new OuterSubscriber<number, number>((x) => expected = x);

const subscription = subscribeToResult(subscriber, result);

expect(expected).to.be.equal(42);
expect(subscription).to.not.exist;
expect(subscription.closed).to.be.true;
});

it('should subscribe to observables that are an instanceof Observable', (done) => {
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export { throwError } from './internal/observable/throwError';
export { timer } from './internal/observable/timer';
export { using } from './internal/observable/using';
export { zip } from './internal/observable/zip';
export { scheduled } from './internal/scheduled/scheduled';

/* Constants */
export { EMPTY } from './internal/observable/empty';
Expand Down
2 changes: 1 addition & 1 deletion src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { canReportError } from './util/canReportError';
import { toSubscriber } from './util/toSubscriber';
import { iif } from './observable/iif';
import { throwError } from './observable/throwError';
import { observable as Symbol_observable } from '../internal/symbol/observable';
import { observable as Symbol_observable } from './symbol/observable';
import { pipeFromArray } from './util/pipe';
import { config } from './config';

Expand Down
29 changes: 23 additions & 6 deletions src/internal/observable/concat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,30 @@ import { from } from './from';
import { concatAll } from '../operators/concatAll';

/* tslint:disable:max-line-length */
export function concat<O1 extends ObservableInput<any>>(v1: O1, scheduler?: SchedulerLike): Observable<ObservedValueOf<O1>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(v1: O1, v2: O2, scheduler?: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, scheduler?: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, scheduler?: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, scheduler?: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4> | ObservedValueOf<O5>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6, scheduler?: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4> | ObservedValueOf<O5> | ObservedValueOf<O6>>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<O1 extends ObservableInput<any>>(v1: O1, scheduler: SchedulerLike): Observable<ObservedValueOf<O1>>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(v1: O1, v2: O2, scheduler: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2>>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, scheduler: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3>>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, scheduler: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4>>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, scheduler: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4> | ObservedValueOf<O5>>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6, scheduler: SchedulerLike): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4> | ObservedValueOf<O5> | ObservedValueOf<O6>>;

export function concat<O1 extends ObservableInput<any>>(v1: O1): Observable<ObservedValueOf<O1>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(v1: O1, v2: O2): Observable<ObservedValueOf<O1> | ObservedValueOf<O2>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4> | ObservedValueOf<O5>>;
export function concat<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6): Observable<ObservedValueOf<O1> | ObservedValueOf<O2> | ObservedValueOf<O3> | ObservedValueOf<O4> | ObservedValueOf<O5> | ObservedValueOf<O6>>;
export function concat<O extends ObservableInput<any>>(...observables: O[]): Observable<ObservedValueOf<O>>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<O extends ObservableInput<any>>(...observables: (O | SchedulerLike)[]): Observable<ObservedValueOf<O>>;
export function concat<R>(...observables: ObservableInput<any>[]): Observable<R>;
/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */
export function concat<R>(...observables: (ObservableInput<any> | SchedulerLike)[]): Observable<R>;
/* tslint:enable:max-line-length */
/**
Expand Down
11 changes: 4 additions & 7 deletions src/internal/observable/empty.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,16 @@ export const EMPTY = new Observable<never>(subscriber => subscriber.complete());
* @see {@link of}
* @see {@link throwError}
*
* @param {SchedulerLike} [scheduler] A {@link SchedulerLike} to use for scheduling
* @param scheduler A {@link SchedulerLike} to use for scheduling
* the emission of the complete notification.
* @return {Observable} An "empty" Observable: emits only the complete
* @return An "empty" Observable: emits only the complete
* notification.
* @static true
* @name empty
* @owner Observable
* @deprecated Deprecated in favor of using {@link index/EMPTY} constant.
* @deprecated Deprecated in favor of using {@link EMPTY} constant, or {@link scheduled} (e.g. `scheduled([], scheduler)`)
*/
export function empty(scheduler?: SchedulerLike) {
return scheduler ? emptyScheduled(scheduler) : EMPTY;
}

export function emptyScheduled(scheduler: SchedulerLike) {
function emptyScheduled(scheduler: SchedulerLike) {
return new Observable<never>(subscriber => scheduler.schedule(() => subscriber.complete()));
}
30 changes: 6 additions & 24 deletions src/internal/observable/from.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
import { Observable } from '../Observable';
import { isPromise } from '../util/isPromise';
import { isArrayLike } from '../util/isArrayLike';
import { isInteropObservable } from '../util/isInteropObservable';
import { isIterable } from '../util/isIterable';
import { fromArray } from './fromArray';
import { fromPromise } from './fromPromise';
import { fromIterable } from './fromIterable';
import { fromObservable } from './fromObservable';
import { subscribeTo } from '../util/subscribeTo';
import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types';
import { scheduled } from '../scheduled/scheduled';

export function from<O extends ObservableInput<any>>(input: O, scheduler?: SchedulerLike): Observable<ObservedValueOf<O>>;
export function from<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>;
/** @deprecated use {@link scheduled} instead. */
export function from<O extends ObservableInput<any>>(input: O, scheduler: SchedulerLike): Observable<ObservedValueOf<O>>;

/**
* Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object.
Expand Down Expand Up @@ -111,26 +106,13 @@ export function from<O extends ObservableInput<any>>(input: O, scheduler?: Sched
* @name from
* @owner Observable
*/

export function from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T> {
if (!scheduler) {
if (input instanceof Observable) {
return input;
}
return new Observable<T>(subscribeTo(input));
} else {
return scheduled(input, scheduler);
}

if (input != null) {
if (isInteropObservable(input)) {
return fromObservable(input, scheduler);
} else if (isPromise(input)) {
return fromPromise(input, scheduler);
} else if (isArrayLike(input)) {
return fromArray(input, scheduler);
} else if (isIterable(input) || typeof input === 'string') {
return fromIterable(input, scheduler);
}
}

throw new TypeError((input !== null && typeof input || input) + ' is not observable');
}
18 changes: 2 additions & 16 deletions src/internal/observable/fromArray.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,12 @@
import { Observable } from '../Observable';
import { SchedulerLike } from '../types';
import { Subscription } from '../Subscription';
import { subscribeToArray } from '../util/subscribeToArray';
import { scheduleArray } from '../scheduled/scheduleArray';

export function fromArray<T>(input: ArrayLike<T>, scheduler?: SchedulerLike) {
if (!scheduler) {
return new Observable<T>(subscribeToArray(input));
} else {
return new Observable<T>(subscriber => {
const sub = new Subscription();
let i = 0;
sub.add(scheduler.schedule(function () {
if (i === input.length) {
subscriber.complete();
return;
}
subscriber.next(input[i++]);
if (!subscriber.closed) {
sub.add(this.schedule());
}
}));
return sub;
});
return scheduleArray(input, scheduler);
}
}
41 changes: 3 additions & 38 deletions src/internal/observable/fromIterable.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,15 @@
import { Observable } from '../Observable';
import { SchedulerLike } from '../types';
import { Subscription } from '../Subscription';
import { iterator as Symbol_iterator } from '../symbol/iterator';
import { subscribeToIterable } from '../util/subscribeToIterable';
import { scheduleIterable } from '../scheduled/scheduleIterable';

export function fromIterable<T>(input: Iterable<T>, scheduler: SchedulerLike) {
export function fromIterable<T>(input: Iterable<T>, scheduler?: SchedulerLike) {
if (!input) {
throw new Error('Iterable cannot be null');
}
if (!scheduler) {
return new Observable<T>(subscribeToIterable(input));
} else {
return new Observable<T>(subscriber => {
const sub = new Subscription();
let iterator: Iterator<T>;
sub.add(() => {
// Finalize generators
if (iterator && typeof iterator.return === 'function') {
iterator.return();
}
});
sub.add(scheduler.schedule(() => {
iterator = input[Symbol_iterator]();
sub.add(scheduler.schedule(function () {
if (subscriber.closed) {
return;
}
let value: T;
let done: boolean;
try {
const result = iterator.next();
value = result.value;
done = result.done;
} catch (err) {
subscriber.error(err);
return;
}
if (done) {
subscriber.complete();
} else {
subscriber.next(value);
this.schedule();
}
}));
}));
return sub;
});
return scheduleIterable(input, scheduler);
}
}
Loading

0 comments on commit b4733d7

Please sign in to comment.