Skip to content

Commit

Permalink
fix(bindCallback): only call function once even while scheduled
Browse files Browse the repository at this point in the history
The previous implementation had issues with only calling the source function one time when scheduled. Since bindCallback shares its result with all subscribers, it is multicast, that means that it would need to maintain a list of subscribers internally. In leiu of that, I'm using AsyncSubject (which might need a better name)

closes #881
  • Loading branch information
benlesh committed Dec 4, 2015
1 parent feea9a1 commit 8637d47
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 123 deletions.
276 changes: 220 additions & 56 deletions spec/observables/bindCallback-spec.js
Original file line number Diff line number Diff line change
@@ -1,82 +1,246 @@
/* globals describe, it, expect */
/* globals describe, it, expect, rxTestScheduler */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.bindCallback', function () {
it('should emit one value from a callback', function (done) {
describe('when not scheduled', function () {
it('should emit one value from a callback', function () {
function callback(datum, cb) {
cb(datum);
}
var boundCallback = Observable.bindCallback(callback);
var results = [];

boundCallback(42)
.subscribe(function (x) {
results.push(x);
}, null, function () {
results.push('done');
});

expect(results).toEqual([42, 'done']);
});

it('should emit one value chosen by a selector', function () {
function callback(datum, cb) {
cb(datum);
}
var boundCallback = Observable.bindCallback(callback, function (datum) { return datum; });
var results = [];

boundCallback(42)
.subscribe(function (x) {
results.push(x);
}, null, function () {
results.push('done');
});

expect(results).toEqual([42, 'done']);
});

it('should emit an error when the selector throws', function () {
function callback(cb) {
cb(42);
}
var boundCallback = Observable.bindCallback(callback, function (err) { throw new Error('Yikes!'); });
var results = [];

boundCallback()
.subscribe(function () {
throw 'should not next';
}, function (err) {
results.push(err);
}, function () {
throw 'should not complete';
});

expect(results).toEqual([new Error('Yikes!')]);
});

it('should not emit, throw or complete if immediately unsubscribed', function (done) {
var nextSpy = jasmine.createSpy('next');
var throwSpy = jasmine.createSpy('throw');
var completeSpy = jasmine.createSpy('complete');
var timeout;
function callback(datum, cb) {
// Need to cb async in order for the unsub to trigger
timeout = setTimeout(function () {
cb(datum);
});
}
var subscription = Observable.bindCallback(callback)(42)
.subscribe(nextSpy, throwSpy, completeSpy);
subscription.unsubscribe();

setTimeout(function () {
expect(nextSpy).not.toHaveBeenCalled();
expect(throwSpy).not.toHaveBeenCalled();
expect(completeSpy).not.toHaveBeenCalled();

clearTimeout(timeout);
done();
});
});
});

describe('when scheduled', function () {
it('should emit one value from a callback', function () {
function callback(datum, cb) {
cb(datum);
}
var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler);
var results = [];

boundCallback(42)
.subscribe(function (x) {
results.push(x);
}, null, function () {
results.push('done');
});

rxTestScheduler.flush();

expect(results).toEqual([42, 'done']);
});

it('should error if callback throws', function () {
function callback(datum, cb) {
throw new Error('haha no callback for you');
}
var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler);
var results = [];

boundCallback(42)
.subscribe(function (x) {
throw 'should not next';
}, function (err) {
results.push(err);
}, function () {
throw 'should not complete';
});

rxTestScheduler.flush();

expect(results).toEqual([new Error('haha no callback for you')]);
});

it('should error if selector throws', function () {
function callback(datum, cb) {
cb(datum);
}
function selector() {
throw new Error('what? a selector? I don\'t think so');
}
var boundCallback = Observable.bindCallback(callback, selector, rxTestScheduler);
var results = [];

boundCallback(42)
.subscribe(function (x) {
throw 'should not next';
}, function (err) {
results.push(err);
}, function () {
throw 'should not complete';
});

rxTestScheduler.flush();

expect(results).toEqual([new Error('what? a selector? I don\'t think so')]);
});

it('should use a selector', function () {
function callback(datum, cb) {
cb(datum);
}
function selector(x) {
return x + '!!!';
}
var boundCallback = Observable.bindCallback(callback, selector, rxTestScheduler);
var results = [];

boundCallback(42)
.subscribe(function (x) {
results.push(x);
}, null, function () {
results.push('done');
});

rxTestScheduler.flush();

expect(results).toEqual(['42!!!', 'done']);
});
});

it('should pass multiple inner arguments as an array', function () {
function callback(datum, cb) {
cb(datum);
cb(datum, 1, 2, 3);
}
var boundCallback = Observable.bindCallback(callback);
var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler);
var results = [];

boundCallback(42)
.subscribe(function (x) {
expect(x).toBe(42);
}, function () {
done.fail('should not be called');
},
done);
results.push(x);
}, null, function () {
results.push('done');
});

rxTestScheduler.flush();

expect(results).toEqual([[42, 1, 2, 3], 'done']);
});

it('should emit one value chosen by a selector', function (done) {
it('should pass multiple inner arguments to the selector if there is one', function () {
function callback(datum, cb) {
cb(null, datum);
cb(datum, 1, 2, 3);
}
function selector(a, b, c, d) {
expect([a, b, c, d]).toEqual([42, 1, 2, 3]);
return a + b + c + d;
}
var boundCallback = Observable.bindCallback(callback, function (err, datum) { return datum; });
var boundCallback = Observable.bindCallback(callback, selector, rxTestScheduler);
var results = [];

boundCallback(42)
.subscribe(function (x) {
expect(x).toBe(42);
}, function () {
done.fail('should not be called');
},
done);
});
results.push(x);
}, null, function () {
results.push('done');
});

it('should emit an error when the selector throws', function (done) {
function callback(cb) {
cb(42);
}
var boundCallback = Observable.bindCallback(callback, function (err) { throw new Error('Yikes!'); });

boundCallback()
.subscribe(function () {
// Considered a failure if we don't go directly to err handler
done.fail('should not be called');
},
function (err) {
expect(err.message).toBe('Yikes!');
done();
},
function () {
// Considered a failure if we don't go directly to err handler
done.fail('should not be called');
}
);
rxTestScheduler.flush();

expect(results).toEqual([48, 'done']);
});

it('should not emit, throw or complete if immediately unsubscribed', function (done) {
var nextSpy = jasmine.createSpy('next');
var throwSpy = jasmine.createSpy('throw');
var completeSpy = jasmine.createSpy('complete');
var timeout;
it('should cache value for next subscription and not call callbackFunc again', function () {
var calls = 0;
function callback(datum, cb) {
// Need to cb async in order for the unsub to trigger
timeout = setTimeout(function () {
cb(datum);
});
calls++;
cb(datum);
}
var subscription = Observable.bindCallback(callback)(42)
.subscribe(nextSpy, throwSpy, completeSpy);
subscription.unsubscribe();
var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler);
var results1 = [];
var results2 = [];

setTimeout(function () {
expect(nextSpy).not.toHaveBeenCalled();
expect(throwSpy).not.toHaveBeenCalled();
expect(completeSpy).not.toHaveBeenCalled();
var source = boundCallback(42);

clearTimeout(timeout);
done();
source.subscribe(function (x) {
results1.push(x);
}, null, function () {
results1.push('done');
});

source.subscribe(function (x) {
results2.push(x);
}, null, function () {
results2.push('done');
});

rxTestScheduler.flush();

expect(calls).toBe(1);
expect(results1).toEqual([42, 'done']);
expect(results2).toEqual([42, 'done']);
});
});
38 changes: 38 additions & 0 deletions spec/observables/jasmine-is-weird-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/* globals describe, it, expect, rxTestScheduler */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

/**
* I'm starting this file to collect tests that when put in other files break jasmine for
* no apparent reason. It seems like maybe we should move off of jasmine, but moving >1700 tests
* sounds really gross, so I don't want to do that...
*/
describe('jasmine is weird', function () {
describe('bindCallback', function () {
// HACK: If you move this test under the bindCallback-spec.js file, it will arbitrarily
// break one bufferWhen-spec.js test.
it('should not even call the callbackFn if immediately unsubscribed', function () {
var calls = 0;
function callback(datum, cb) {
calls++;
cb(datum);
}
var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler);
var results1 = [];

var source = boundCallback(42);

var subscription = source.subscribe(function (x) {
results1.push(x);
}, null, function () {
results1.push('done');
});

subscription.unsubscribe();

rxTestScheduler.flush();

expect(calls).toBe(0);
});
});
});
1 change: 1 addition & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import './add/operator/combineLatest-static';
import './add/operator/concat-static';
import './add/operator/merge-static';
import './add/observable/bindCallback';
import './add/observable/defer';
import './add/observable/empty';
import './add/observable/forkJoin';
import './add/observable/from';
Expand Down
3 changes: 2 additions & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import {Observable} from './Observable';
import './add/operator/combineLatest-static';
import './add/operator/concat-static';
import './add/operator/merge-static';
import './observable/bindCallback';
import './add/observable/bindCallback';
import './add/observable/defer';
import './add/observable/empty';
import './add/observable/forkJoin';
import './add/observable/from';
Expand Down
4 changes: 2 additions & 2 deletions src/add/observable/bindCallback.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import {Observable} from '../../Observable';
import {BoundCallbackObsevable} from '../../observable/bindCallback';
Observable.bindCallback = BoundCallbackObsevable.create;
import {BoundCallbackObservable} from '../../observable/bindCallback';
Observable.bindCallback = BoundCallbackObservable.create;
Loading

0 comments on commit 8637d47

Please sign in to comment.