Skip to content

Commit

Permalink
feat(bindNodeCallback): add Observable.bindNodeCallback
Browse files Browse the repository at this point in the history
- add bindNodeCallback corresponding to fromNodeCallback in RxJS4

closes #736
  • Loading branch information
kwonoj authored and benlesh committed Jan 13, 2016
1 parent a5927ba commit 497bb0d
Show file tree
Hide file tree
Showing 7 changed files with 421 additions and 0 deletions.
2 changes: 2 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ enabling "composite" subscription behavior.
|`flatMapFirst`|`exhaustMap`|
|`flatMapLatest`|`switchMap`|
|`flatMapWithMaxConcurrent`|`mergeMap` or `flatMap`(alias)|
|`fromCallback`|`bindCallback`|
|`fromNodeCallback`|`bindNodeCallback`|
|`publishValue`|`publishBehavior`|
|`replay`|`publishReplay`|
|`select`|`map`|
Expand Down
292 changes: 292 additions & 0 deletions spec/observables/bindNodeCallback-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
/* globals describe, it, expect, rxTestScheduler */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.bindNodeCallback', function () {
describe('when not scheduled', function () {
it('should emit one value from a callback', function () {
function callback(datum, cb) {
cb(null, datum);
}
var boundCallback = Observable.bindNodeCallback(callback);
var results = [];

boundCallback(42)
.subscribe(function (x) {
results.push(x);
}, null, function () {
results.push('done');
});

expect(results).toEqual([42, 'done']);
});

it('should emit one value chosen by a selector', function () {
function callback(datum, cb) {
cb(null, datum);
}
var boundCallback = Observable.bindNodeCallback(callback, function (datum) { return datum; });
var results = [];

boundCallback(42)
.subscribe(function (x) {
results.push(x);
}, null, function () {
results.push('done');
});

expect(results).toEqual([42, 'done']);
});

it('should raise error from callback', function () {
var error = new Error();

function callback(cb) {
cb(error);
}

var boundCallback = Observable.bindNodeCallback(callback);
var results = [];

boundCallback()
.subscribe(function () {
throw 'should not next';
}, function (err) {
results.push(err);
}, function () {
throw 'should not complete';
});

expect(results).toEqual([error]);
});

it('should emit an error when the selector throws', function () {
function callback(cb) {
cb(null, 42);
}
var boundCallback = Observable.bindNodeCallback(callback, function (err) { throw new Error('Yikes!'); });
var results = [];

boundCallback()
.subscribe(function () {
throw 'should not next';
}, function (err) {
results.push(err);
}, function () {
throw 'should not complete';
});

expect(results).toEqual([new Error('Yikes!')]);
});

it('should not emit, throw or complete if immediately unsubscribed', function (done) {
var nextSpy = jasmine.createSpy('next');
var throwSpy = jasmine.createSpy('throw');
var completeSpy = jasmine.createSpy('complete');
var timeout;
function callback(datum, cb) {
// Need to cb async in order for the unsub to trigger
timeout = setTimeout(function () {
cb(null, datum);
});
}
var subscription = Observable.bindNodeCallback(callback)(42)
.subscribe(nextSpy, throwSpy, completeSpy);
subscription.unsubscribe();

setTimeout(function () {
expect(nextSpy).not.toHaveBeenCalled();
expect(throwSpy).not.toHaveBeenCalled();
expect(completeSpy).not.toHaveBeenCalled();

clearTimeout(timeout);
done();
});
});
});

describe('when scheduled', function () {
it('should emit one value from a callback', function () {
function callback(datum, cb) {
cb(null, datum);
}
var boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler);
var results = [];

boundCallback(42)
.subscribe(function (x) {
results.push(x);
}, null, function () {
results.push('done');
});

rxTestScheduler.flush();

expect(results).toEqual([42, 'done']);
});

it('should error if callback throws', function () {
function callback(datum, cb) {
throw new Error('haha no callback for you');
}
var boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler);
var results = [];

boundCallback(42)
.subscribe(function (x) {
throw 'should not next';
}, function (err) {
results.push(err);
}, function () {
throw 'should not complete';
});

rxTestScheduler.flush();

expect(results).toEqual([new Error('haha no callback for you')]);
});

it('should raise error from callback', function () {
var error = new Error();

function callback(cb) {
cb(error);
}

var boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler);
var results = [];

boundCallback()
.subscribe(function () {
throw 'should not next';
}, function (err) {
results.push(err);
}, function () {
throw 'should not complete';
});

rxTestScheduler.flush();

expect(results).toEqual([error]);
});

it('should error if selector throws', function () {
function callback(datum, cb) {
cb(null, datum);
}
function selector() {
throw new Error('what? a selector? I don\'t think so');
}
var boundCallback = Observable.bindNodeCallback(callback, selector, rxTestScheduler);
var results = [];

boundCallback(42)
.subscribe(function (x) {
throw 'should not next';
}, function (err) {
results.push(err);
}, function () {
throw 'should not complete';
});

rxTestScheduler.flush();

expect(results).toEqual([new Error('what? a selector? I don\'t think so')]);
});

it('should use a selector', function () {
function callback(datum, cb) {
cb(null, datum);
}
function selector(x) {
return x + '!!!';
}
var boundCallback = Observable.bindNodeCallback(callback, selector, rxTestScheduler);
var results = [];

boundCallback(42)
.subscribe(function (x) {
results.push(x);
}, null, function () {
results.push('done');
});

rxTestScheduler.flush();

expect(results).toEqual(['42!!!', 'done']);
});
});

it('should pass multiple inner arguments as an array', function () {
function callback(datum, cb) {
cb(null, datum, 1, 2, 3);
}
var boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler);
var results = [];

boundCallback(42)
.subscribe(function (x) {
results.push(x);
}, null, function () {
results.push('done');
});

rxTestScheduler.flush();

expect(results).toEqual([[42, 1, 2, 3], 'done']);
});

it('should pass multiple inner arguments to the selector if there is one', function () {
function callback(datum, cb) {
cb(null, datum, 1, 2, 3);
}
function selector(a, b, c, d) {
expect([a, b, c, d]).toEqual([42, 1, 2, 3]);
return a + b + c + d;
}
var boundCallback = Observable.bindNodeCallback(callback, selector, rxTestScheduler);
var results = [];

boundCallback(42)
.subscribe(function (x) {
results.push(x);
}, null, function () {
results.push('done');
});

rxTestScheduler.flush();

expect(results).toEqual([48, 'done']);
});

it('should cache value for next subscription and not call callbackFunc again', function () {
var calls = 0;
function callback(datum, cb) {
calls++;
cb(null, datum);
}
var boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler);
var results1 = [];
var results2 = [];

var source = boundCallback(42);

source.subscribe(function (x) {
results1.push(x);
}, null, function () {
results1.push('done');
});

source.subscribe(function (x) {
results2.push(x);
}, null, function () {
results2.push('done');
});

rxTestScheduler.flush();

expect(calls).toBe(1);
expect(results1).toEqual([42, 'done']);
expect(results2).toEqual([42, 'done']);
});
});
2 changes: 2 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {concat as concatStatic} from './operator/concat-static';
import {merge as mergeStatic} from './operator/merge-static';
import {zip as zipStatic} from './operator/zip-static';
import {BoundCallbackObservable} from './observable/bindCallback';
import {BoundNodeCallbackObservable} from './observable/bindNodeCallback';
import {DeferObservable} from './observable/defer';
import {EmptyObservable} from './observable/empty';
import {ForkJoinObservable} from './observable/forkJoin';
Expand Down Expand Up @@ -165,6 +166,7 @@ export class Observable<T> implements CoreOperators<T> {
// static method stubs
static ajax: AjaxCreationMethod;
static bindCallback: typeof BoundCallbackObservable.create;
static bindNodeCallback: typeof BoundNodeCallbackObservable.create;
static combineLatest: typeof combineLatestStatic;
static concat: typeof concatStatic;
static defer: typeof DeferObservable.create;
Expand Down
1 change: 1 addition & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import './add/operator/concat-static';
import './add/operator/merge-static';
import './add/operator/race-static';
import './add/observable/bindCallback';
import './add/observable/bindNodeCallback';
import './add/observable/defer';
import './add/observable/empty';
import './add/observable/forkJoin';
Expand Down
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import './add/operator/concat-static';
import './add/operator/merge-static';
import './add/operator/race-static';
import './add/observable/bindCallback';
import './add/observable/bindNodeCallback';
import './add/observable/defer';
import './add/observable/empty';
import './add/observable/forkJoin';
Expand Down
5 changes: 5 additions & 0 deletions src/add/observable/bindNodeCallback.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import {Observable} from '../../Observable';
import {BoundNodeCallbackObservable} from '../../observable/bindNodeCallback';
Observable.bindNodeCallback = BoundNodeCallbackObservable.create;

export var _void: void;
Loading

0 comments on commit 497bb0d

Please sign in to comment.