Skip to content

Commit

Permalink
feat(from): let from handle any "observablesque"
Browse files Browse the repository at this point in the history
- `Observable.from(promise)` should work
- `Observable.from(iterable)` should work
- `Observable.from(observable)` should work

This includes any object that implements `Symbol.observer`.

closes #156
closes #236
  • Loading branch information
benlesh committed Aug 31, 2015
1 parent 44a4ee1 commit 526d4c3
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 25 deletions.
55 changes: 54 additions & 1 deletion spec/observables/from-spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/* globals describe, it, expect */
/* globals describe, it, expect, Symbol */
var Rx = require('../../dist/cjs/Rx');
var Promise = require('promise');
var Observable = Rx.Observable;
var $$iterator = require('../../dist/cjs/util/Symbol_iterator').default;

describe('Observable.from', function () {
it('should enumerate an Array', function (done) {
Expand All @@ -10,4 +12,55 @@ describe('Observable.from', function () {
expect(x).toBe(expected[i++])
}, null, done);
}, 300);

it('should handle a promise', function (done) {
var promise = Promise.resolve('pinky swear');

Observable.from(promise).subscribe(function (x) {
expect(x).toBe('pinky swear');
}, null, done);
});

it('should handle an "observableque" object', function (done) {
var observablesque = {};
observablesque[Symbol.observer] = function (observer) {
observer.next('test');
observer.complete();
};

Observable.from(observablesque).subscribe(function (x) {
expect(x).toBe('test');
}, null, done);
});

it('should handle a string', function (done) {
var expected = ['a', 'b', 'c'];
Observable.from('abc').subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
});

it('should handle any iterable thing', function (done) {
var iterable = {};
var iteratorResults = [
{ value: 'one', done: false },
{ value: 'two', done: false },
{ done: true }
];
var expected = ['one', 'two'];

expect(Symbol.iterator).toBe($$iterator);

iterable[Symbol.iterator] = function () {
return {
next: function() {
return iteratorResults.shift();
}
}
};

Observable.from(iterable).subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
});
});
2 changes: 1 addition & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export default class Observable<T> {
// TODO: convert this to an `abstract` class in TypeScript 1.6.

static defer: <T>(observableFactory: () => Observable<T>) => Observable<T>;
static from: <T>(iterable: any, project?: (x?: any, i?: number) => T, thisArg?: any, scheduler?: Scheduler) => Observable<T>;
static from: <T>(iterable: any, scheduler?: Scheduler) => Observable<T>;
static fromArray: <T>(array: T[], scheduler?: Scheduler) => Observable<T>;
static fromEvent: <T>(element: any, eventName: string, selector: (...args:Array<any>) => T) => Observable<T>;
static fromEventPattern: <T>(addHandler: (handler:Function)=>void, removeHandler: (handler:Function) => void, selector?: (...args:Array<any>) => T) => Observable<T>;
Expand Down
4 changes: 2 additions & 2 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ import EmptyObservable from './observables/EmptyObservable';
import ErrorObservable from './observables/ErrorObservable';
import InfiniteObservable from './observables/InfiniteObservable';
import IntervalObservable from './observables/IntervalObservable';
import IteratorObservable from './observables/IteratorObservable';
import PromiseObservable from './observables/PromiseObservable';
import RangeObservable from './observables/RangeObservable';
import ScalarObservable from './observables/ScalarObservable';
import TimerObservable from './observables/TimerObservable';
import FromEventPatternObservable from './observables/FromEventPatternObservable';
import FromEventObservable from './observables/FromEventObservable';
import ForkJoinObservable from './observables/ForkJoinObservable';
import FromObservable from './observables/FromObservable';

Observable.defer = DeferObservable.create;
Observable.from = IteratorObservable.create;
Observable.from = FromObservable.create;
Observable.fromArray = ArrayObservable.create;
Observable.fromPromise = PromiseObservable.create;
Observable.of = ArrayObservable.of;
Expand Down
43 changes: 43 additions & 0 deletions src/observables/FromObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import PromiseObservable from './PromiseObservable';
import IteratorObservable from'./IteratorObservable';
import ArrayObservable from './ArrayObservable';

import isArray from '../util/isArray';
import isPromise from '../util/isPromise';
import isObservable from '../util/isObservable';
import Scheduler from '../Scheduler';
import $$observer from '../util/Symbol_observer';
import Observable from '../Observable';
import Subscriber from '../Subscriber';
import { ObserveOnSubscriber } from '../operators/observeOn';

export default class FromObservable<T> extends Observable<T> {
constructor(private observablesque: any, private scheduler: Scheduler) {
super(null);
}

static create<T>(observablesque: any, scheduler: Scheduler = Scheduler.immediate): Observable<T> {
if (isArray(observablesque)) {
return new ArrayObservable(observablesque, scheduler);
} else if (isPromise(observablesque)) {
return new PromiseObservable(observablesque, scheduler);
} else if (isObservable(observablesque)) {
if(observablesque instanceof Observable) {
return observablesque;
}
return new FromObservable(observablesque, scheduler);
} else {
return new IteratorObservable(observablesque, null, null, scheduler);
}
}

_subscribe(subscriber: Subscriber<T>) {
const observablesque = this.observablesque;
const scheduler = this.scheduler;
if(scheduler === Scheduler.immediate) {
return this.observablesque[$$observer](subscriber);
} else {
return this.observablesque[$$observer](new ObserveOnSubscriber(subscriber, scheduler, 0));
}
}
}
10 changes: 5 additions & 5 deletions src/observables/IteratorObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Scheduler from '../Scheduler';
import Observable from '../Observable';

import {root} from '../util/root';
import $iterator$ from '../util/Symbol_iterator';
import $$iterator from '../util/Symbol_iterator';
import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

Expand Down Expand Up @@ -108,7 +108,7 @@ class StringIterator {
private idx: number = 0,
private len: number = str.length) {
}
[$iterator$]() { return (this); }
[$$iterator]() { return (this); }
next() {
return this.idx < this.len ? {
done: false,
Expand All @@ -125,7 +125,7 @@ class ArrayIterator {
private idx: number = 0,
private len: number = toLength(arr)) {
}
[$iterator$]() { return this; }
[$$iterator]() { return this; }
next() {
return this.idx < this.len ? {
done: false,
Expand All @@ -138,7 +138,7 @@ class ArrayIterator {
}

function getIterator(o) {
const i = o[$iterator$];
const i = o[$$iterator];
if (!i && typeof o === 'string') {
return new StringIterator(o);
}
Expand All @@ -148,7 +148,7 @@ function getIterator(o) {
if (!i) {
throw new TypeError('Object is not iterable');
}
return o[$iterator$]();
return o[$$iterator]();
}

function toLength(o) {
Expand Down
43 changes: 28 additions & 15 deletions src/observables/PromiseObservable.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,42 @@
import Observable from '../Observable';
import Subscriber from '../Subscriber';
import Scheduler from '../Scheduler';
import Subscription from '../Subscription';

export default class PromiseObservable<T> extends Observable<T> {

static create<T>(promise: Promise<T>) {
return new PromiseObservable(promise);
static create<T>(promise: Promise<T>, scheduler: Scheduler = Scheduler.immediate) {
return new PromiseObservable(promise, scheduler);
}

constructor(protected promise: Promise<T>) {
constructor(private promise: Promise<T>, private scheduler: Scheduler) {
super();
}

_subscribe(subscriber: Subscriber<T>) {
this.promise.then(
(x) => {
if(!subscriber.isUnsubscribed) {
subscriber.next(x);
const scheduler = this.scheduler;
const promise = this.promise;

if (scheduler === Scheduler.immediate) {
promise.then(value => {
subscriber.next(value);
subscriber.complete();
}
},
(e) => {
if(!subscriber.isUnsubscribed) {
subscriber.error(e);
}
}
);
},
err => subscriber.error(err));
} else {
let subscription = new Subscription();
promise.then(value => subscription.add(scheduler.schedule(0, { value, subscriber }, dispatchNext)),
err => subscription.add(scheduler.schedule(0, { err, subscriber }, dispatchError)));
return subscription;
}
}
}

function dispatchNext({ value, subscriber }) {
subscriber.next(value);
subscriber.complete();
}

function dispatchError({ err, subscriber }) {
subscriber.error(err);
}
2 changes: 1 addition & 1 deletion src/util/Symbol_iterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ if (!root.Symbol.iterator) {
}
}

export default root.Symbol.dispose;
export default root.Symbol.iterator;

// // Shim in iterator support
// export var $iterator$ = (typeof Symbol === 'function' && Symbol.iterator) || '_es6shim_iterator_';
Expand Down
3 changes: 3 additions & 0 deletions src/util/isArray.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const isArray = Array.isArray || (x => x && typeof x.length === 'number');

export default isArray;
5 changes: 5 additions & 0 deletions src/util/isObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import $$observer from './Symbol_observer';

export default function isObservable(x): boolean {
return x && typeof x[$$observer] === 'function';
}
3 changes: 3 additions & 0 deletions src/util/isPromise.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export default function isPromise(x): boolean {
return x && typeof x.then === 'function';
}

0 comments on commit 526d4c3

Please sign in to comment.