diff --git a/lib/index.js b/lib/index.js index a83a11a..efaacf7 100755 --- a/lib/index.js +++ b/lib/index.js @@ -15,6 +15,7 @@ var Decoder = require('string_decoder').StringDecoder; var IntMap = require('./intMap'); var Queue = require('./queue'); var ReadableProxy = require('./readableProxy'); +var ObservableSubscription = require('./observableSubscription'); // Create quick slice reference variable for speed var slice = Array.prototype.slice; @@ -2108,6 +2109,187 @@ addMethod('toPromise', function (PromiseCtor) { }); }); +/** + * Coerce observer callbacks into observer object or return observer object + * if already created. Will throw an error if both an object and callback args + * are provided. + * + * @id createObserver + * @param {function|object} onNext Function to receive new values or observer + * @param {function} [onError] Optional callback to receive errors. + * @param {function} [onComplete] Optional callback when stream completes + * @return {object} Observer object with next, error, and complete methods + * @private + * + * createObserver( + * function (x) { console.log(x); }, + * function (err) { console.error(err); }, + * function () { console.log('done'); } + * ) + * + * createObserver( + * null, + * null, + * function () { console.log('done'); } + * ) + * + * createObserver({ + * next: function (x) { console.log(x); }, + * error: function (err) { console.error(err); }, + * complete: function () { console.log('done'); } + * }) + */ +function createObserver (onNext, onError, onComplete) { + var isObserver = onNext && !_.isFunction(onNext) && typeof onNext === 'object'; + + // ensure if we have an observer that we don't also have callbacks. Users + // must choose one. + if (isObserver && (onError || onComplete)) { + throw new Error('Subscribe requires either an observer object or optional callbacks.'); + } + + // onNext is actually an observer + if (isObserver) { + return onNext; + } + + // Otherwise create an observer object + return { + next: onNext, + error: onError, + complete: onComplete, + }; +} + +/** + * Consumes values using the Observable subscribe signature. Unlike other + * consumption methods, subscribe can be called multiple times. Each + * subscription will receive the current value before receiving the next value. + * Subscribing to an already consumed stream will result in an error. + * + * Implements the Observable subscribe functionality as defined by the spec: + * https://tc39.github.io/proposal-observable/#observable-prototype-subscribe + * + * @id subscribe + * @section Consumption + * @name Stream.subscribe(onNext, onError, onComplete) + * @param {Function|object|null} onNext - Handler for next value or observer + * @param {Function|null} onError - Handler function for errors. + * @param {Function|null} onCompleted - Handler Function when stream is done. + * @returns {ObservableSubscription} - Subscription with unsubscribed method + * @api public + * + * // with callbacks + * _([1, 2, 3, 4]).subscribe( + * function onNext (x) { + * // Called for each value that comes downstream + * console.log('Received onNext value', x); + * }, + * function onError (err) { + * // Called one time with error or zero if no errors occur upstream + * console.error('Single highland stream error', err); + * }, + * function onComplete () { + * // Receives no arguments + * // Called only once when stream is completed. + * console.log('Completed!'); + * } + * ); + * + * // with an observer + * _([1, 2, 3, 4]).subscribe({ + * next (x) { + * console.log('Received next value', x); + * }, + * error (err) { + * console.error('An error occurred upstream', err); + * }, + * complete () { + * console.log('Completed!') + * } + * }); + */ + +addMethod('subscribe', function (onNext, onError, onComplete) { + var self = this; + var observer = createObserver(onNext, onError, onComplete); + + // Don't let users subscribe to an already completed stream + if (self.ended) { + if (observer.error) { + observer.error(new Error('Subscribe called on an already completed stream.')); + } + + var closedSubscription = new ObservableSubscription(); + closedSubscription._cleanup(); + return closedSubscription; + } + + var source = self.fork(); + var subscription = new ObservableSubscription(source); + var consumer = source.consume(function (err, x, push, next) { + if (err) { + push(null, nil); + if (observer.error) { + observer.error(err); + } + subscription._cleanup(); + } + else if (x === nil) { + if (observer.complete) { + observer.complete(); + } + subscription._cleanup(); + } + else { + if (observer.next) { + observer.next(x); + } + next(); + } + }); + + consumer.resume(); + + return subscription; +}); + +/* + * Create a variable we can use as a dynamic method name depending on the + * environment. + * + * If Symbols are available get the observable symbol. Otherwise use the a + * fallback string. + * https://tc39.github.io/proposal-observable/#observable-prototype-@@observable + * + * Source taken from RxJS + * https://github.com/ReactiveX/rxjs/commit/4a5aaafc99825ae9b61e410bc0b5e86c7ae75837#diff-d26bc4881b94c82f3c0ae7d3914e9577R13 + */ +/* eslint-disable no-undef */ +var observable = typeof Symbol === 'function' && Symbol.observable || '@@observable'; +/* eslint-enable no-undef */ + +/** + * Returns an Observable spec-compliant instance (itself) that has a subscribe + * method and a Symbol.observable method. If Symbol is not available in the + * current environment it defaults to '@@observable'. Used by other tools and + * libraries that want to get an observable spec compliant stream interface. + * + * https://tc39.github.io/proposal-observable/#observable-prototype-@@observable + * + * @id Symbol.observable + * @section Consumption + * @name Symbol.observable + * @api public + * + * _([1, 2, 3])[Symbol.observable || "@@observable"]().subscribe(x => { + * console.log("Received value", x); + * }); + */ + +addMethod(observable, function () { + return this; +}); /** * Converts the stream to a node Readable Stream for use in methods diff --git a/lib/observableSubscription.js b/lib/observableSubscription.js new file mode 100644 index 0000000..b94c5f6 --- /dev/null +++ b/lib/observableSubscription.js @@ -0,0 +1,62 @@ +/** + * Observable Subscription + * An implementation of the TC39 Subscription object + * https://tc39.github.io/proposal-observable/#subscription-objects + * + * This class is intended for internal use only. + * + * @id ObservableSubscription + * @name ObservableSubscription + * @param {stream} source - Highland stream to subscribe to + * arguments to the callback. Only valid if `source` is a String. + * @api private + */ +function ObservableSubscription (source) { + this._source = source; + this.closed = false; +} + +// Instance Methods + +/** + * Cleanup + * Perform cleanup routine on a subscription. This can only be called once per + * subscription. Once its closed the subscription cannot be cleaned up again. + * + * Note: This relies heavily upon side-effects and mutates itself. + * + * @id ObservableSubscription.prototype._cleanup(subscription) + * @name ObservableSubscription.prototype._cleanup + * @returns {undefined} Side-effectful function cleans up subscription + * @api private + */ + +ObservableSubscription.prototype._cleanup = function cleanup () { + // Don't want to destroy\cleanup an already closed stream + if (this.closed) { + return; + } + this._source = null; + this.closed = true; +}; + +/** + * Unsubscribe + * Destroy the stream resources and cleanup the subscription. + * @id ObservableSubscription.prototype.unsubscribe() + * @name ObservableSubscription.prototype.unsubscribe() + * @returns {undefined} Side-effectful. Destroys stream and cleans up subscription. + * @api private + */ + +ObservableSubscription.prototype.unsubscribe = function unsubscribe () { + // Don't want to destroy\cleanup an already closed stream + if (this.closed) { + return; + } + + this._source.destroy(); + this._cleanup(); +}; + +module.exports = ObservableSubscription; diff --git a/package-lock.json b/package-lock.json index c85ddb5..10ef02d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "highland", - "version": "3.0.0-beta.7", + "version": "3.0.0-beta.9", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -2538,7 +2538,8 @@ "ansi-regex": { "version": "2.1.1", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "aproba": { "version": "1.2.0", @@ -2953,7 +2954,8 @@ "safe-buffer": { "version": "5.1.2", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "safer-buffer": { "version": "2.1.2", @@ -3009,6 +3011,7 @@ "version": "3.0.1", "bundled": true, "dev": true, + "optional": true, "requires": { "ansi-regex": "^2.0.0" } @@ -3052,12 +3055,14 @@ "wrappy": { "version": "1.0.2", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "yallist": { "version": "3.0.3", "bundled": true, - "dev": true + "dev": true, + "optional": true } } }, @@ -4959,6 +4964,7 @@ "version": "0.1.4", "bundled": true, "dev": true, + "optional": true, "requires": { "kind-of": "^3.0.2", "longest": "^1.0.1", @@ -6141,7 +6147,8 @@ "longest": { "version": "1.0.1", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "loose-envify": { "version": "1.3.1", diff --git a/test/test.js b/test/test.js index 8f5f511..fefce38 100755 --- a/test/test.js +++ b/test/test.js @@ -2811,6 +2811,198 @@ exports.toNodeStream = { }, }; +exports.subscribe = { + 'subscribe method exists and is a function': function (test) { + test.expect(1); + test.ok(_.isFunction(_.of(1).subscribe)); + test.done(); + }, + 'calls onNext handler with a single-value stream': function (test) { + test.expect(1); + _.of(1).subscribe(function (x) { + test.equal(x, 1); + test.done(); + }); + }, + 'calls onNext handler for an array stream': function (test) { + test.expect(3); + _([1, 2, 3]).subscribe(function (x) { + test.ok(x); + + if (x === 3) { + test.done(); + } + }); + }, + 'calls next when provided in observer object': function (test) { + test.expect(1); + _.of(1).subscribe({ + next: function (x) { + test.equal(x, 1); + test.done(); + }, + }); + }, + 'calls onError handler for a single-error stream': function (test) { + test.expect(2); + _.fromError(new Error('test error')).subscribe(null, function (err) { + test.ok(err instanceof Error); + test.equal(err.message, 'test error'); + test.done(); + }); + }, + 'calls onError once for a multi-error stream': function (test) { + test.expect(2); + var count = 0; + _([new Error('err1'), new Error('err2')]) + .flatMap(_.fromError) + .subscribe(null, function (err) { + count++; + test.ok(err instanceof Error); + test.equal(err.message, 'err1'); + test.done(); + }); + }, + 'calls error when provided in observer object': function (test) { + test.expect(2); + _.fromError(new Error('test error')).subscribe({ + error: function (err) { + test.ok(err instanceof Error); + test.equal(err.message, 'test error'); + test.done(); + }, + }); + }, + 'calls onComplete handler for a single-value stream': function (test) { + test.expect(1); + _.of(1).subscribe(null, null, function (x) { + test.ok(typeof x === 'undefined'); + test.done(); + }); + }, + 'calls onComplete handler once for a multi-value stream': function (test) { + test.expect(1); + _.of([1, 2, 3]).subscribe(null, null, function (x) { + test.ok(typeof x === 'undefined'); + test.done(); + }); + }, + 'calls onComplete handler for an empty, complete stream': function (test) { + test.expect(1); + _([]).subscribe(null, null, function (x) { + test.ok(typeof x === 'undefined'); + test.done(); + }); + }, + 'calls complete when provided in observer object': function (test) { + test.expect(1); + _.of(1).subscribe({ + complete: function (x) { + test.ok(typeof x === 'undefined'); + test.done(); + }, + }); + }, + 'consumes the stream without any handlers': function (test) { + test.expect(3); + _([1, 2, 3]) + .tap(function (x) { + test.ok(x); + if (x === 3) { + test.done(); + } + }) + .subscribe(); + }, + 'a subscription can be unsubscribed': function (test) { + test.expect(1); + var stream = _(); + var sub1 = stream.subscribe(function (x) { + test.ok(x); + }); + stream.write(1); + sub1.unsubscribe(); + stream.write(1); + stream.end(); + test.done(); + }, + 'a stream can be subscribed to multiple times': function (test) { + test.expect(3); + var stream = _(); + var sub1 = stream.subscribe(function (x) { + test.ok(x === 1 || x === 2); + }); + var sub2 = stream.subscribe(function (x) { + test.ok(x === 1); + }); + stream.write(1); + sub2.unsubscribe(); + stream.write(2); + stream.end(); + test.done(); + }, + 'subscription is closed after complete': function (test) { + test.expect(2); + var sub = _([1]).subscribe(null, null, function (x) { + test.ok(true); + }); + test.equal(sub.closed, true); + test.done(); + }, + 'subscription is closed after error': function (test) { + test.expect(2); + var sub = _.fromError(new Error('test error')).subscribe(null, function (x) { + test.ok(true); + }); + test.equal(sub.closed, true); + test.done(); + }, + 'subscription is closed after unsubscribe': function (test) { + test.expect(3); + var stream = _(); + var sub = stream.subscribe(function (x) { + test.equal(x, 1); + }); + stream.write(1); + test.equal(sub.closed, false); + sub.unsubscribe(); + test.equal(sub.closed, true); + test.done(); + }, + 'completed streams will not throw an error': function (test) { + test.expect(0); + var stream = _.empty(); + + stream.subscribe(function (x) { + throw new Error('I should not fire'); + }, null, test.done); + }, + 'consumed streams will throw an error': function (test) { + test.expect(1); + var stream = _.empty(); + + stream.done(function () {}); + + stream.subscribe(null, function (err) { + test.ok(err instanceof Error); + test.done(); + }); + }, + 'supports Symbol.observable or @@observable': function (test) { + test.expect(1); + /* eslint-disable no-undef */ + var observable = typeof Symbol === 'function' + && Symbol.observable + || '@@observable'; + /* eslint-enable no-undef */ + _([1])[observable]() + .subscribe(function (x) { + test.equals(x, 1); + test.done(); + }); + }, +}; + exports['calls generator on read'] = function (test) { test.expect(5); var gen_calls = 0;