From 6e66ac331aeb68c2bad1d5b9b2fc31104e34e4d5 Mon Sep 17 00:00:00 2001 From: Jay Date: Wed, 27 Feb 2019 13:04:33 -0500 Subject: [PATCH 01/15] Created subscribe method for RxJS\Observable compatibility --- lib/index.js | 56 ++++++++++++++++++++++++++++++++++++++ test/test.js | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+) diff --git a/lib/index.js b/lib/index.js index a83a11a..362ffdf 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2108,6 +2108,62 @@ addMethod('toPromise', function (PromiseCtor) { }); }); +/** + * Consumes values using the Observable subscribe signature. + * + * Some existing libraries like Gulp v4 can automatically handle return values + * of observables by checking for a subscribe function. This method adds + * compatability for those use cases by providing the same subscribe method + * used by libraries like RxJS. + * + * @id toPromise + * @section Consumption + * @name Stream.subscribe(onNext, onError, onComplete) + * @param {Function|null} onNext - Handler function for each value. + * @param {Function|null} onError - Handler function for errors. + * @param {Function|null} onCompleted - Handler Function when stream is done. + * @api public + * + * _([1, 2, 3, 4]).subscribe( + * function onNext (x) { + * // Called for each value that comes downstream + * console.log('Received onNext value', x); + * }, + * function onError (err) { + * // Called one time with error or zero if no errors occur upstream + * console.error('Single highland stream error', err); + * }, + * function onComplete () { + * // Receives no arguments + * // Called only once when stream is completed. + * console.log('Completed the page.'); + * } + * ); + */ + +addMethod('subscribe', function (onNext, onError, onComplete) { + var self = this; + return self.consume(function (err, x, push, next) { + if (err) { + push(null, nil); + + if (_.isFunction(onError)) { + onError(err); + } + } + else if (x === nil) { + if (_.isFunction(onComplete)) { + onComplete(); + } + } + else { + if (_.isFunction(onNext)) { + onNext(x); + } + next(); + } + }).resume(); +}); /** * Converts the stream to a node Readable Stream for use in methods diff --git a/test/test.js b/test/test.js index 8f5f511..02b9590 100755 --- a/test/test.js +++ b/test/test.js @@ -2811,6 +2811,83 @@ exports.toNodeStream = { }, }; +exports.subscribe = { + 'subscribe method exists and is a function': function (test) { + test.expect(1); + test.ok(_.isFunction(_.of(1).subscribe)); + test.done(); + }, + 'calls onNext handler with a single-value stream': function (test) { + test.expect(1); + _.of(1).subscribe(function (x) { + test.equal(x, 1); + test.done(); + }); + }, + 'calls onNext handler for an array stream': function (test) { + test.expect(3); + _([1, 2, 3]).subscribe(function (x) { + test.ok(x); + + if (x === 3) { + test.done(); + } + }); + }, + 'calls onError handler for a single-error stream': function (test) { + test.expect(2); + _.fromError(new Error('test error')).subscribe(null, function (err) { + test.ok(err instanceof Error); + test.equal(err.message, 'test error'); + test.done(); + }); + }, + 'calls onError once for a multi-error stream': function (test) { + test.expect(2); + var count = 0; + _([new Error('err1'), new Error('err2')]) + .flatMap(_.fromError) + .subscribe(null, function (err) { + count++; + test.ok(err instanceof Error); + test.equal(err.message, 'err1'); + test.done(); + }); + }, + 'calls onComplete handler for a single-value stream': function (test) { + test.expect(1); + _.of(1).subscribe(null, null, function (x) { + test.ok(typeof x === 'undefined'); + test.done(); + }); + }, + 'calls onComplete handler once for a multi-value stream': function (test) { + test.expect(1); + _.of([1, 2, 3]).subscribe(null, null, function (x) { + test.ok(typeof x === 'undefined'); + test.done(); + }); + }, + 'calls onComplete handler for an empty, complete stream': function (test) { + test.expect(1); + _.of([]).subscribe(null, null, function (x) { + test.ok(typeof x === 'undefined'); + test.done(); + }); + }, + 'consumes the stream without any handlers': function (test) { + test.expect(3); + _([1, 2, 3]) + .tap(function (x) { + test.ok(x); + if (x === 3) { + test.done(); + } + }) + .subscribe(); + }, +}; + exports['calls generator on read'] = function (test) { test.expect(5); var gen_calls = 0; From 0831bdc7bd0917ff0c549de3d2a97371fd02fbca Mon Sep 17 00:00:00 2001 From: Jay Date: Thu, 28 Feb 2019 12:22:44 -0500 Subject: [PATCH 02/15] Implemented subscribe according to observable spec caolan/highland#672 --- lib/index.js | 117 +++++++++++++++++++++++++++++++++++++++++++++------ test/test.js | 69 +++++++++++++++++++++++++++++- 2 files changed, 171 insertions(+), 15 deletions(-) diff --git a/lib/index.js b/lib/index.js index 362ffdf..1f3753d 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2108,22 +2108,83 @@ addMethod('toPromise', function (PromiseCtor) { }); }); +/** + * Coerce observer callbacks into observer object or return observer object + * if already created. Will throw an error if both an object and callback args + * are provided. + * + * @id createObserver + * @param {function|object} onNext Function to receive new values or observer + * @param {function} [onError] Optional callback to receive errors. + * @param {function} [onComplete] Optional callback when stream completes + * @return {object} Observer object with next, error, and complete methods + * @private + * + * createObserver( + * function (x) { console.log(x); }, + * function (err) { console.error(err); }, + * function () { console.log('done'); } + * ) + * + * createObserver( + * null, + * null, + * function () { console.log('done'); } + * ) + * + * createObserver({ + * next: function (x) { console.log(x); }, + * error: function (err) { console.error(err); }, + * complete: function () { console.log('done'); } + * }) + */ +function createObserver (onNext, onError, onComplete) { + var isObserver = onNext && !onNext.call && typeof onNext === 'object'; + + // ensure if we have an observer that we don't also have callbacks. Users + // must choose one. + if (isObserver && (onError || onComplete)) { + throw new Error('Subscribe requires either an observer object or optional callbacks.'); + } + + // onNext is actually an observer + if (isObserver) { + return onNext; + } + + // Otherwise create an observer object + var observer = {}; + + if (_.isFunction(onNext)) { + observer.next = onNext; + } + + if (_.isFunction(onError)) { + observer.error = onError; + } + + if (_.isFunction(onComplete)) { + observer.complete = onComplete; + } + + return observer; +} + /** * Consumes values using the Observable subscribe signature. * - * Some existing libraries like Gulp v4 can automatically handle return values - * of observables by checking for a subscribe function. This method adds - * compatability for those use cases by providing the same subscribe method - * used by libraries like RxJS. + * Implements the Observable subscribe functionality as defined by the spec: + * https://tc39.github.io/proposal-observable/#observable-prototype-subscribe * * @id toPromise * @section Consumption * @name Stream.subscribe(onNext, onError, onComplete) - * @param {Function|null} onNext - Handler function for each value. + * @param {Function|object|null} onNext - Handler for next value or observer * @param {Function|null} onError - Handler function for errors. * @param {Function|null} onCompleted - Handler Function when stream is done. * @api public * + * // with callbacks * _([1, 2, 3, 4]).subscribe( * function onNext (x) { * // Called for each value that comes downstream @@ -2136,33 +2197,61 @@ addMethod('toPromise', function (PromiseCtor) { * function onComplete () { * // Receives no arguments * // Called only once when stream is completed. - * console.log('Completed the page.'); + * console.log('Completed!'); * } * ); + * + * // with an observer + * _[1, 2, 3, 4]).subscribe({ + * next (x) { + * console.log('Received next value', x); + * }, + * error (err) { + * console.error('An error occurred upstream', err); + * }, + * complete () { + * console.log('Completed!') + * } + * }); */ addMethod('subscribe', function (onNext, onError, onComplete) { var self = this; - return self.consume(function (err, x, push, next) { + + // Don't let users subscribe to an already completed stream + if (self._nil_pushed) { + throw new Error('Subscribe called on an already completed stream.'); + } + + var observer = createObserver(onNext, onError, onComplete); + var subscription = self.fork().consume(function (err, x, push, next) { if (err) { push(null, nil); - if (_.isFunction(onError)) { - onError(err); + if (_.isFunction(observer.error)) { + observer.error(err); } } else if (x === nil) { - if (_.isFunction(onComplete)) { - onComplete(); + if (_.isFunction(observer.complete)) { + observer.complete(); } } else { - if (_.isFunction(onNext)) { - onNext(x); + if (_.isFunction(observer.next)) { + observer.next(x); } next(); } - }).resume(); + }); + + subscription.resume(); + + return { + unsubscribe: function () { + subscription.destroy(); + }, + }; }); /** diff --git a/test/test.js b/test/test.js index 02b9590..989a34c 100755 --- a/test/test.js +++ b/test/test.js @@ -2834,6 +2834,15 @@ exports.subscribe = { } }); }, + 'calls next when provided in observer object': function (test) { + test.expect(1); + _.of(1).subscribe({ + next: function (x) { + test.equal(x, 1); + test.done(); + }, + }); + }, 'calls onError handler for a single-error stream': function (test) { test.expect(2); _.fromError(new Error('test error')).subscribe(null, function (err) { @@ -2854,6 +2863,16 @@ exports.subscribe = { test.done(); }); }, + 'calls error when provided in observer object': function (test) { + test.expect(2); + _.fromError(new Error('test error')).subscribe({ + error: function (err) { + test.ok(err instanceof Error); + test.equal(err.message, 'test error'); + test.done(); + }, + }); + }, 'calls onComplete handler for a single-value stream': function (test) { test.expect(1); _.of(1).subscribe(null, null, function (x) { @@ -2870,11 +2889,20 @@ exports.subscribe = { }, 'calls onComplete handler for an empty, complete stream': function (test) { test.expect(1); - _.of([]).subscribe(null, null, function (x) { + _([]).subscribe(null, null, function (x) { test.ok(typeof x === 'undefined'); test.done(); }); }, + 'calls complete when provided in observer object': function (test) { + test.expect(1); + _.of(1).subscribe({ + complete: function (x) { + test.ok(typeof x === 'undefined'); + test.done(); + }, + }); + }, 'consumes the stream without any handlers': function (test) { test.expect(3); _([1, 2, 3]) @@ -2886,6 +2914,45 @@ exports.subscribe = { }) .subscribe(); }, + 'a subscription can be unsubscribed': function (test) { + test.expect(1); + var stream = _(); + var sub1 = stream.subscribe(function (x) { + test.ok(x); + }); + stream.write(1); + sub1.unsubscribe(); + stream.write(1); + stream.end(); + test.done(); + }, + 'a stream can be subscribed to multiple times': function (test) { + test.expect(3); + var stream = _(); + var sub1 = stream.subscribe(function (x) { + test.ok(x === 1 || x === 2); + }); + var sub2 = stream.subscribe(function (x) { + test.ok(x === 1); + }); + stream.write(1); + sub2.unsubscribe(); + stream.write(2); + stream.end(); + test.done(); + }, + 'completed streams will throw an error': function (test) { + test.expect(1); + var stream = _(); + + stream.end(); + test.throws(function () { + stream.subscribe(function (x) { + test.ok(x); + }); + }); + test.done(); + }, }; exports['calls generator on read'] = function (test) { From 2a4f1212be8c69893f2115f86c0e39accda5a309 Mon Sep 17 00:00:00 2001 From: Jay Zawrotny Date: Fri, 1 Mar 2019 00:22:52 -0500 Subject: [PATCH 03/15] Added Symbol.observable support --- lib/index.js | 29 +++++++++++++++++++++++++++++ test/test.js | 13 +++++++++++++ 2 files changed, 42 insertions(+) diff --git a/lib/index.js b/lib/index.js index 1f3753d..fe1a878 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2254,6 +2254,35 @@ addMethod('subscribe', function (onNext, onError, onComplete) { }; }); +/* eslint-disable no-undef */ +// If Symbols are available get the observable symbol. Otherwise use the a +// fallback string. +// https://tc39.github.io/proposal-observable/#observable-prototype-@@observable +var observable = typeof Symbol === 'function' && Symbol.observable || '@@observable'; +/* eslint-enable no-undef */ + +/** + * Returns an Observable spec-compliant instance (itself) that has a subscribe + * method and a Symbol.observable method. If Symbol is not available in the + * current environment it defaults to '@@observable'. Used by other tools and + * libraries that want to get an observable spec compliant stream interface. + * + * https://tc39.github.io/proposal-observable/#observable-prototype-@@observable + * + * @id Symbol.observable + * @section Consumption + * @name Symbol.observable + * @api public + * + * _([1, 2, 3])[Symbol.observable || "@@observable"]().subscribe(x => { + * console.log("Received value", x); + * }); + */ + +addMethod(observable, function () { + return this; +}); + /** * Converts the stream to a node Readable Stream for use in methods * or pipes that depend on the native stream type. diff --git a/test/test.js b/test/test.js index 989a34c..7bf93d9 100755 --- a/test/test.js +++ b/test/test.js @@ -2953,6 +2953,19 @@ exports.subscribe = { }); test.done(); }, + 'supports Symbol.observable or @@observable': function (test) { + test.expect(1); + /* eslint-disable no-undef */ + var observable = typeof Symbol === 'function' + && Symbol.observable + || '@@observable'; + /* eslint-enable no-undef */ + _([1])[observable]() + .subscribe(function (x) { + test.equals(x, 1); + test.done(); + }); + }, }; exports['calls generator on read'] = function (test) { From 934daabf76fadc21a35c117e62a478d0ea4672ed Mon Sep 17 00:00:00 2001 From: Jay Zawrotny Date: Fri, 1 Mar 2019 00:32:39 -0500 Subject: [PATCH 04/15] Credit RxJS as source of observable symbol resolution --- lib/index.js | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/lib/index.js b/lib/index.js index fe1a878..0e1c66b 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2254,10 +2254,18 @@ addMethod('subscribe', function (onNext, onError, onComplete) { }; }); +/* + * Create a variable we can use as a dynamic method name depending on the + * environment. + * + * If Symbols are available get the observable symbol. Otherwise use the a + * fallback string. + * https://tc39.github.io/proposal-observable/#observable-prototype-@@observable + * + * Source taken from RxJS + * https://github.com/ReactiveX/rxjs/commit/4a5aaafc99825ae9b61e410bc0b5e86c7ae75837#diff-d26bc4881b94c82f3c0ae7d3914e9577R13 + */ /* eslint-disable no-undef */ -// If Symbols are available get the observable symbol. Otherwise use the a -// fallback string. -// https://tc39.github.io/proposal-observable/#observable-prototype-@@observable var observable = typeof Symbol === 'function' && Symbol.observable || '@@observable'; /* eslint-enable no-undef */ From 48fa72a10c46c753025f533e5a727c6e93181f48 Mon Sep 17 00:00:00 2001 From: Jay Zawrotny Date: Sat, 2 Mar 2019 02:52:43 -0500 Subject: [PATCH 05/15] Pull review requested changes for caolan/highland#672 Resolves comments: https://github.com/caolan/highland/pull/672#discussion_r261811626 https://github.com/caolan/highland/pull/672#discussion_r261811702 https://github.com/caolan/highland/pull/672#discussion_r261812558 https://github.com/caolan/highland/pull/672#discussion_r261812624 --- lib/index.js | 13 ++++++++----- package-lock.json | 19 +++++++++++++------ test/test.js | 15 ++++++++++++--- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/lib/index.js b/lib/index.js index 0e1c66b..9e0c55e 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2139,7 +2139,7 @@ addMethod('toPromise', function (PromiseCtor) { * }) */ function createObserver (onNext, onError, onComplete) { - var isObserver = onNext && !onNext.call && typeof onNext === 'object'; + var isObserver = onNext && !_.isFunction(onNext) && typeof onNext === 'object'; // ensure if we have an observer that we don't also have callbacks. Users // must choose one. @@ -2171,7 +2171,10 @@ function createObserver (onNext, onError, onComplete) { } /** - * Consumes values using the Observable subscribe signature. + * Consumes values using the Observable subscribe signature. Unlike other + * consumption methods, subscribe can be called multiple times. Each + * subscription will receive the current value before receiving the next value. + * Subscribing to an already consumed stream will result in an error. * * Implements the Observable subscribe functionality as defined by the spec: * https://tc39.github.io/proposal-observable/#observable-prototype-subscribe @@ -2184,7 +2187,7 @@ function createObserver (onNext, onError, onComplete) { * @param {Function|null} onCompleted - Handler Function when stream is done. * @api public * - * // with callbacks + * // with callbacks * _([1, 2, 3, 4]).subscribe( * function onNext (x) { * // Called for each value that comes downstream @@ -2202,7 +2205,7 @@ function createObserver (onNext, onError, onComplete) { * ); * * // with an observer - * _[1, 2, 3, 4]).subscribe({ + * _([1, 2, 3, 4]).subscribe({ * next (x) { * console.log('Received next value', x); * }, @@ -2219,7 +2222,7 @@ addMethod('subscribe', function (onNext, onError, onComplete) { var self = this; // Don't let users subscribe to an already completed stream - if (self._nil_pushed) { + if (self.ended) { throw new Error('Subscribe called on an already completed stream.'); } diff --git a/package-lock.json b/package-lock.json index c85ddb5..10ef02d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "highland", - "version": "3.0.0-beta.7", + "version": "3.0.0-beta.9", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -2538,7 +2538,8 @@ "ansi-regex": { "version": "2.1.1", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "aproba": { "version": "1.2.0", @@ -2953,7 +2954,8 @@ "safe-buffer": { "version": "5.1.2", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "safer-buffer": { "version": "2.1.2", @@ -3009,6 +3011,7 @@ "version": "3.0.1", "bundled": true, "dev": true, + "optional": true, "requires": { "ansi-regex": "^2.0.0" } @@ -3052,12 +3055,14 @@ "wrappy": { "version": "1.0.2", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "yallist": { "version": "3.0.3", "bundled": true, - "dev": true + "dev": true, + "optional": true } } }, @@ -4959,6 +4964,7 @@ "version": "0.1.4", "bundled": true, "dev": true, + "optional": true, "requires": { "kind-of": "^3.0.2", "longest": "^1.0.1", @@ -6141,7 +6147,8 @@ "longest": { "version": "1.0.1", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "loose-envify": { "version": "1.3.1", diff --git a/test/test.js b/test/test.js index 7bf93d9..59c53b2 100755 --- a/test/test.js +++ b/test/test.js @@ -2941,11 +2941,20 @@ exports.subscribe = { stream.end(); test.done(); }, - 'completed streams will throw an error': function (test) { + 'completed streams will not throw an error': function (test) { + test.expect(0); + var stream = _.empty(); + + stream.subscribe(function (x) { + throw new Error('I should not fire'); + }, null, test.done); + }, + 'consumed streams will throw an error': function (test) { test.expect(1); - var stream = _(); + var stream = _.empty(); + + stream.done(function () {}); - stream.end(); test.throws(function () { stream.subscribe(function (x) { test.ok(x); From a1edb49b38524972044a9ee1c91a301c1530e889 Mon Sep 17 00:00:00 2001 From: Jay Zawrotny Date: Sat, 2 Mar 2019 19:08:26 -0500 Subject: [PATCH 06/15] Fixed subscribe method PR requested changes Steps: - https://github.com/caolan/highland/pull/672#discussion_r261845340 - https://github.com/caolan/highland/pull/672#discussion_r261811779 - https://github.com/caolan/highland/pull/672#discussion_r261812558 --- lib/index.js | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/lib/index.js b/lib/index.js index 9e0c55e..c148027 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2153,21 +2153,11 @@ function createObserver (onNext, onError, onComplete) { } // Otherwise create an observer object - var observer = {}; - - if (_.isFunction(onNext)) { - observer.next = onNext; - } - - if (_.isFunction(onError)) { - observer.error = onError; - } - - if (_.isFunction(onComplete)) { - observer.complete = onComplete; - } - - return observer; + return { + next: onNext, + error: onError, + complete: onComplete, + }; } /** @@ -2179,7 +2169,7 @@ function createObserver (onNext, onError, onComplete) { * Implements the Observable subscribe functionality as defined by the spec: * https://tc39.github.io/proposal-observable/#observable-prototype-subscribe * - * @id toPromise + * @id subscribe * @section Consumption * @name Stream.subscribe(onNext, onError, onComplete) * @param {Function|object|null} onNext - Handler for next value or observer @@ -2220,28 +2210,28 @@ function createObserver (onNext, onError, onComplete) { addMethod('subscribe', function (onNext, onError, onComplete) { var self = this; + var observer = createObserver(onNext, onError, onComplete); // Don't let users subscribe to an already completed stream - if (self.ended) { - throw new Error('Subscribe called on an already completed stream.'); + if (self.ended && observer.error) { + observer.error(new Error('Subscribe called on an already completed stream.')); } - var observer = createObserver(onNext, onError, onComplete); var subscription = self.fork().consume(function (err, x, push, next) { if (err) { push(null, nil); - if (_.isFunction(observer.error)) { + if (observer.error) { observer.error(err); } } else if (x === nil) { - if (_.isFunction(observer.complete)) { + if (observer.complete) { observer.complete(); } } else { - if (_.isFunction(observer.next)) { + if (observer.next) { observer.next(x); } next(); From 01e77839b4c2933773d972b5613ff4464a500681 Mon Sep 17 00:00:00 2001 From: Jay Zawrotny Date: Sat, 2 Mar 2019 19:35:23 -0500 Subject: [PATCH 07/15] Fixed subscribe tests when subscribing to completed stream --- test/test.js | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/test/test.js b/test/test.js index 59c53b2..990c557 100755 --- a/test/test.js +++ b/test/test.js @@ -2955,12 +2955,10 @@ exports.subscribe = { stream.done(function () {}); - test.throws(function () { - stream.subscribe(function (x) { - test.ok(x); - }); + stream.subscribe(null, function (err) { + test.ok(err instanceof Error); + test.done(); }); - test.done(); }, 'supports Symbol.observable or @@observable': function (test) { test.expect(1); From 890aa05a234776948096fd2dc0f8583068838e55 Mon Sep 17 00:00:00 2001 From: Jay Zawrotny Date: Sun, 3 Mar 2019 14:19:54 -0500 Subject: [PATCH 08/15] Tests for observableSubscription behavior --- lib/index.js | 11 ++++------- lib/observableSubscription.js | 34 ++++++++++++++++++++++++++++++++++ test/test.js | 25 ++++++++++++++++++++++++- 3 files changed, 62 insertions(+), 8 deletions(-) create mode 100644 lib/observableSubscription.js diff --git a/lib/index.js b/lib/index.js index c148027..2861780 100755 --- a/lib/index.js +++ b/lib/index.js @@ -15,6 +15,7 @@ var Decoder = require('string_decoder').StringDecoder; var IntMap = require('./intMap'); var Queue = require('./queue'); var ReadableProxy = require('./readableProxy'); +var ObservableSubscription = require('./observableSubscription'); // Create quick slice reference variable for speed var slice = Array.prototype.slice; @@ -2217,7 +2218,7 @@ addMethod('subscribe', function (onNext, onError, onComplete) { observer.error(new Error('Subscribe called on an already completed stream.')); } - var subscription = self.fork().consume(function (err, x, push, next) { + var source = self.fork().consume(function (err, x, push, next) { if (err) { push(null, nil); @@ -2238,13 +2239,9 @@ addMethod('subscribe', function (onNext, onError, onComplete) { } }); - subscription.resume(); + source.resume(); - return { - unsubscribe: function () { - subscription.destroy(); - }, - }; + return ObservableSubscription.create(source); }); /* diff --git a/lib/observableSubscription.js b/lib/observableSubscription.js new file mode 100644 index 0000000..6fad7b4 --- /dev/null +++ b/lib/observableSubscription.js @@ -0,0 +1,34 @@ +/** + * Observable Subscription + * An implementation of the TC39 Subscription object + * https://tc39.github.io/proposal-observable/#subscription-objects + * + * @id ObservableSubscription + * @name ObservableSubscription + * @param {stream} source - Highland stream to subscribe to + * arguments to the callback. Only valid if `source` is a String. + * @api private + */ +function ObservableSubscription (source) { + this.source = source; + this.closed = false; +} + +// Static Methods + +ObservableSubscription.create = function create (source) { + return new ObservableSubscription(source); +}; + +// Instance Methods + +ObservableSubscription.prototype.unsubscribe = function unsubscribe () { + if (this.closed) { + return; + } + this.closed = true; + this.source.destroy(); + this.source = null; +}; + +module.exports = ObservableSubscription; diff --git a/test/test.js b/test/test.js index 990c557..7988d45 100755 --- a/test/test.js +++ b/test/test.js @@ -2941,6 +2941,29 @@ exports.subscribe = { stream.end(); test.done(); }, + 'subscription is closed after complete': function (test) { + test.expect(2); + var sub = _([1]).subscribe(null, null, function (x) { + test.ok(true); + }); + test.equal(sub.closed, true); + }, + 'subscription is closed after error': function (test) { + test.expect(2); + var sub = _.fromError(new Error('test error')).subscribe(null, function (x) { + test.ok(true); + }); + test.equal(sub.closed, true); + }, + 'subscription is closed after unsubscribe': function (test) { + test.expect(3); + var sub = _([1]).subscribe(function (x) { + test.equal(x, 1); + }); + test.equal(sub.closed, false); + sub.unsubscribe(); + test.equal(sub.closed, true); + }, 'completed streams will not throw an error': function (test) { test.expect(0); var stream = _.empty(); @@ -2955,7 +2978,7 @@ exports.subscribe = { stream.done(function () {}); - stream.subscribe(null, function (err) { + stream.subscribe(null, null, function (err) { test.ok(err instanceof Error); test.done(); }); From f2dae557b0fb2f3de183435db3cf135f602302c9 Mon Sep 17 00:00:00 2001 From: Jay Zawrotny Date: Sun, 3 Mar 2019 14:39:35 -0500 Subject: [PATCH 09/15] Finished first draft of observableSubscription --- lib/index.js | 6 +++++- lib/observableSubscription.js | 16 ++++++++++------ test/test.js | 9 +++++++-- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/lib/index.js b/lib/index.js index 2861780..c770be2 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2218,6 +2218,7 @@ addMethod('subscribe', function (onNext, onError, onComplete) { observer.error(new Error('Subscribe called on an already completed stream.')); } + var subscription = ObservableSubscription.create(); var source = self.fork().consume(function (err, x, push, next) { if (err) { push(null, nil); @@ -2225,11 +2226,13 @@ addMethod('subscribe', function (onNext, onError, onComplete) { if (observer.error) { observer.error(err); } + ObservableSubscription.cleanup(subscription); } else if (x === nil) { if (observer.complete) { observer.complete(); } + ObservableSubscription.cleanup(subscription); } else { if (observer.next) { @@ -2238,10 +2241,11 @@ addMethod('subscribe', function (onNext, onError, onComplete) { next(); } }); + subscription.source = source; source.resume(); - return ObservableSubscription.create(source); + return subscription; }); /* diff --git a/lib/observableSubscription.js b/lib/observableSubscription.js index 6fad7b4..6f95085 100644 --- a/lib/observableSubscription.js +++ b/lib/observableSubscription.js @@ -20,15 +20,19 @@ ObservableSubscription.create = function create (source) { return new ObservableSubscription(source); }; +ObservableSubscription.cleanup = function cleanup (subscription) { + if (subscription.closed) { + return; + } + subscription.source.destroy(); + subscription.source = null; + subscription.closed = true; +}; + // Instance Methods ObservableSubscription.prototype.unsubscribe = function unsubscribe () { - if (this.closed) { - return; - } - this.closed = true; - this.source.destroy(); - this.source = null; + ObservableSubscription.cleanup(this); }; module.exports = ObservableSubscription; diff --git a/test/test.js b/test/test.js index 7988d45..fefce38 100755 --- a/test/test.js +++ b/test/test.js @@ -2947,6 +2947,7 @@ exports.subscribe = { test.ok(true); }); test.equal(sub.closed, true); + test.done(); }, 'subscription is closed after error': function (test) { test.expect(2); @@ -2954,15 +2955,19 @@ exports.subscribe = { test.ok(true); }); test.equal(sub.closed, true); + test.done(); }, 'subscription is closed after unsubscribe': function (test) { test.expect(3); - var sub = _([1]).subscribe(function (x) { + var stream = _(); + var sub = stream.subscribe(function (x) { test.equal(x, 1); }); + stream.write(1); test.equal(sub.closed, false); sub.unsubscribe(); test.equal(sub.closed, true); + test.done(); }, 'completed streams will not throw an error': function (test) { test.expect(0); @@ -2978,7 +2983,7 @@ exports.subscribe = { stream.done(function () {}); - stream.subscribe(null, null, function (err) { + stream.subscribe(null, function (err) { test.ok(err instanceof Error); test.done(); }); From 7a053ae79601a900ae6abba07da2317fa08dc617 Mon Sep 17 00:00:00 2001 From: Jay Zawrotny Date: Sun, 3 Mar 2019 14:50:59 -0500 Subject: [PATCH 10/15] Added more documentation to observableSubscription --- lib/observableSubscription.js | 38 ++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/lib/observableSubscription.js b/lib/observableSubscription.js index 6f95085..5f02258 100644 --- a/lib/observableSubscription.js +++ b/lib/observableSubscription.js @@ -3,6 +3,8 @@ * An implementation of the TC39 Subscription object * https://tc39.github.io/proposal-observable/#subscription-objects * + * This class is intended for internal use only. + * * @id ObservableSubscription * @name ObservableSubscription * @param {stream} source - Highland stream to subscribe to @@ -16,22 +18,56 @@ function ObservableSubscription (source) { // Static Methods +/** + * Create + * Return an instance of an ObservableSubscription class + * + * @id ObservableSubscription.create(source) + * @name ObservableSubscription.create + * @param {stream} [source] - Optional highland stream source + * @returns {ObservableSubscription} A subscription that can be unsubscribed + * @api private + */ + ObservableSubscription.create = function create (source) { return new ObservableSubscription(source); }; +/** + * Cleanup + * Perform cleanup routine on a subscription. This can only be called once per + * subscription. Once its closed the subscription cannot be cleaned up again. + * + * Note: This relies heavily upon side-effects and mutates the subscription. + * + * @id ObservableSubscription.cleanup(subscription) + * @name ObservableSubscription.cleanup + * @param {ObservableSubscription} subscription - Subscription to cleanup + * @returns {undefined} Side-effectful function cleans up subscription + * @api private + */ + ObservableSubscription.cleanup = function cleanup (subscription) { if (subscription.closed) { return; } - subscription.source.destroy(); subscription.source = null; subscription.closed = true; }; // Instance Methods +/** + * Unsubscribe + * Destroy the stream resources and cleanup the subscription. + * @id ObservableSubscription.prototype.unsubscribe() + * @name ObservableSubscription.prototype.unsubscribe() + * @returns {undefined} Side-effectful. Destroys stream and cleans up subscription. + * @api private + */ + ObservableSubscription.prototype.unsubscribe = function unsubscribe () { + this.source.destroy(); ObservableSubscription.cleanup(this); }; From ffbb15a1406d840670db74bbf60c3aca2964fb23 Mon Sep 17 00:00:00 2001 From: Jay Zawrotny Date: Sun, 3 Mar 2019 17:49:12 -0500 Subject: [PATCH 11/15] Update ObservableSubscription for PR requested changes: Requested Changes: - https://github.com/caolan/highland/pull/672#discussion_r261888538 - https://github.com/caolan/highland/pull/672#discussion_r261888322 - https://github.com/caolan/highland/pull/672#discussion_r261888306 - https://github.com/caolan/highland/pull/672#discussion_r261888122 --- lib/index.js | 13 +++++------ lib/observableSubscription.js | 42 +++++++++++++---------------------- 2 files changed, 21 insertions(+), 34 deletions(-) diff --git a/lib/index.js b/lib/index.js index c770be2..9bf66fd 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2218,21 +2218,21 @@ addMethod('subscribe', function (onNext, onError, onComplete) { observer.error(new Error('Subscribe called on an already completed stream.')); } - var subscription = ObservableSubscription.create(); - var source = self.fork().consume(function (err, x, push, next) { + var source = self.fork(); + var subscription = new ObservableSubscription(source); + var consumer = source.consume(function (err, x, push, next) { if (err) { push(null, nil); - if (observer.error) { observer.error(err); } - ObservableSubscription.cleanup(subscription); + subscription._cleanup(); } else if (x === nil) { if (observer.complete) { observer.complete(); } - ObservableSubscription.cleanup(subscription); + subscription._cleanup(); } else { if (observer.next) { @@ -2241,9 +2241,8 @@ addMethod('subscribe', function (onNext, onError, onComplete) { next(); } }); - subscription.source = source; - source.resume(); + consumer.resume(); return subscription; }); diff --git a/lib/observableSubscription.js b/lib/observableSubscription.js index 5f02258..7d6d862 100644 --- a/lib/observableSubscription.js +++ b/lib/observableSubscription.js @@ -16,47 +16,30 @@ function ObservableSubscription (source) { this.closed = false; } -// Static Methods - -/** - * Create - * Return an instance of an ObservableSubscription class - * - * @id ObservableSubscription.create(source) - * @name ObservableSubscription.create - * @param {stream} [source] - Optional highland stream source - * @returns {ObservableSubscription} A subscription that can be unsubscribed - * @api private - */ - -ObservableSubscription.create = function create (source) { - return new ObservableSubscription(source); -}; +// Instance Methods /** * Cleanup * Perform cleanup routine on a subscription. This can only be called once per * subscription. Once its closed the subscription cannot be cleaned up again. * - * Note: This relies heavily upon side-effects and mutates the subscription. + * Note: This relies heavily upon side-effects and mutates itself. * - * @id ObservableSubscription.cleanup(subscription) - * @name ObservableSubscription.cleanup - * @param {ObservableSubscription} subscription - Subscription to cleanup + * @id ObservableSubscription.prototype._cleanup(subscription) + * @name ObservableSubscription.prototype._cleanup * @returns {undefined} Side-effectful function cleans up subscription * @api private */ -ObservableSubscription.cleanup = function cleanup (subscription) { - if (subscription.closed) { +ObservableSubscription.prototype._cleanup = function cleanup () { + // Don't want to destroy\cleanup an already closed stream + if (this.closed) { return; } - subscription.source = null; - subscription.closed = true; + this.source = null; + this.closed = true; }; -// Instance Methods - /** * Unsubscribe * Destroy the stream resources and cleanup the subscription. @@ -67,8 +50,13 @@ ObservableSubscription.cleanup = function cleanup (subscription) { */ ObservableSubscription.prototype.unsubscribe = function unsubscribe () { + // Don't want to destroy\cleanup an already closed stream + if (this.closed) { + return; + } + this.source.destroy(); - ObservableSubscription.cleanup(this); + this._cleanup(); }; module.exports = ObservableSubscription; From fb7c14960152100158af905e0ca89df19ec22587 Mon Sep 17 00:00:00 2001 From: Jay Zawrotny Date: Sun, 3 Mar 2019 20:34:56 -0500 Subject: [PATCH 12/15] Fixed subscribe PR changes in caolan/highland#672 PR Comments: - https://github.com/caolan/highland/pull/672#discussion_r261901290 --- lib/index.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/index.js b/lib/index.js index 9bf66fd..c9078ba 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2216,6 +2216,7 @@ addMethod('subscribe', function (onNext, onError, onComplete) { // Don't let users subscribe to an already completed stream if (self.ended && observer.error) { observer.error(new Error('Subscribe called on an already completed stream.')); + return; } var source = self.fork(); From 1be35e9f18c3b1fca8c010b7f83ade0910821a71 Mon Sep 17 00:00:00 2001 From: Jay Zawrotny Date: Sun, 3 Mar 2019 21:52:27 -0500 Subject: [PATCH 13/15] Fixed lint error in subscribe method --- lib/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/index.js b/lib/index.js index c9078ba..799e7d0 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2216,7 +2216,7 @@ addMethod('subscribe', function (onNext, onError, onComplete) { // Don't let users subscribe to an already completed stream if (self.ended && observer.error) { observer.error(new Error('Subscribe called on an already completed stream.')); - return; + return self; } var source = self.fork(); From 5ca90ae696eb2a7f7ce2dd5136784f6dad1f5edb Mon Sep 17 00:00:00 2001 From: Jay Zawrotny Date: Sun, 3 Mar 2019 23:24:00 -0500 Subject: [PATCH 14/15] Fixed subscribe PR requested changes Requested Changes: - https://github.com/caolan/highland/pull/672#issuecomment-469102853 - https://github.com/caolan/highland/pull/672#pullrequestreview-209959826 --- lib/index.js | 6 +++++- lib/observableSubscription.js | 6 +++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/lib/index.js b/lib/index.js index 799e7d0..22a2d0a 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2176,6 +2176,7 @@ function createObserver (onNext, onError, onComplete) { * @param {Function|object|null} onNext - Handler for next value or observer * @param {Function|null} onError - Handler function for errors. * @param {Function|null} onCompleted - Handler Function when stream is done. + * @returns {ObservableSubscription - Subscription with unsubscribed method * @api public * * // with callbacks @@ -2216,7 +2217,10 @@ addMethod('subscribe', function (onNext, onError, onComplete) { // Don't let users subscribe to an already completed stream if (self.ended && observer.error) { observer.error(new Error('Subscribe called on an already completed stream.')); - return self; + var closedSubscription = new ObservableSubscription(); + + closedSubscription.closed = true; + return closedSubscription; } var source = self.fork(); diff --git a/lib/observableSubscription.js b/lib/observableSubscription.js index 7d6d862..b94c5f6 100644 --- a/lib/observableSubscription.js +++ b/lib/observableSubscription.js @@ -12,7 +12,7 @@ * @api private */ function ObservableSubscription (source) { - this.source = source; + this._source = source; this.closed = false; } @@ -36,7 +36,7 @@ ObservableSubscription.prototype._cleanup = function cleanup () { if (this.closed) { return; } - this.source = null; + this._source = null; this.closed = true; }; @@ -55,7 +55,7 @@ ObservableSubscription.prototype.unsubscribe = function unsubscribe () { return; } - this.source.destroy(); + this._source.destroy(); this._cleanup(); }; From a2a3b25c0100650be8566838fba4d67a9321b996 Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 4 Mar 2019 12:38:36 -0500 Subject: [PATCH 15/15] Submit method PR requested changes Requested Changes: - https://github.com/caolan/highland/pull/672#discussion_r261927055 - https://github.com/caolan/highland/pull/672#discussion_r261927200 - https://github.com/caolan/highland/pull/672#discussion_r261927374 --- lib/index.js | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/index.js b/lib/index.js index 22a2d0a..efaacf7 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2176,7 +2176,7 @@ function createObserver (onNext, onError, onComplete) { * @param {Function|object|null} onNext - Handler for next value or observer * @param {Function|null} onError - Handler function for errors. * @param {Function|null} onCompleted - Handler Function when stream is done. - * @returns {ObservableSubscription - Subscription with unsubscribed method + * @returns {ObservableSubscription} - Subscription with unsubscribed method * @api public * * // with callbacks @@ -2215,11 +2215,13 @@ addMethod('subscribe', function (onNext, onError, onComplete) { var observer = createObserver(onNext, onError, onComplete); // Don't let users subscribe to an already completed stream - if (self.ended && observer.error) { - observer.error(new Error('Subscribe called on an already completed stream.')); - var closedSubscription = new ObservableSubscription(); + if (self.ended) { + if (observer.error) { + observer.error(new Error('Subscribe called on an already completed stream.')); + } - closedSubscription.closed = true; + var closedSubscription = new ObservableSubscription(); + closedSubscription._cleanup(); return closedSubscription; }