Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created subscribe method for Observable spec compatibility #672

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var Decoder = require('string_decoder').StringDecoder;
var IntMap = require('./intMap');
var Queue = require('./queue');
var ReadableProxy = require('./readableProxy');
var ObservableSubscription = require('./observableSubscription');

// Create quick slice reference variable for speed
var slice = Array.prototype.slice;
Expand Down Expand Up @@ -2108,6 +2109,180 @@ addMethod('toPromise', function (PromiseCtor) {
});
});

/**
* Coerce observer callbacks into observer object or return observer object
* if already created. Will throw an error if both an object and callback args
* are provided.
*
* @id createObserver
* @param {function|object} onNext Function to receive new values or observer
* @param {function} [onError] Optional callback to receive errors.
* @param {function} [onComplete] Optional callback when stream completes
* @return {object} Observer object with next, error, and complete methods
* @private
*
* createObserver(
* function (x) { console.log(x); },
* function (err) { console.error(err); },
* function () { console.log('done'); }
* )
*
* createObserver(
* null,
* null,
* function () { console.log('done'); }
* )
*
* createObserver({
* next: function (x) { console.log(x); },
* error: function (err) { console.error(err); },
* complete: function () { console.log('done'); }
* })
*/
function createObserver (onNext, onError, onComplete) {
var isObserver = onNext && !_.isFunction(onNext) && typeof onNext === 'object';

// ensure if we have an observer that we don't also have callbacks. Users
// must choose one.
if (isObserver && (onError || onComplete)) {
throw new Error('Subscribe requires either an observer object or optional callbacks.');
}

// onNext is actually an observer
if (isObserver) {
return onNext;
}

// Otherwise create an observer object
return {
next: onNext,
error: onError,
complete: onComplete,
};
}

/**
* Consumes values using the Observable subscribe signature. Unlike other
* consumption methods, subscribe can be called multiple times. Each
* subscription will receive the current value before receiving the next value.
* Subscribing to an already consumed stream will result in an error.
*
* Implements the Observable subscribe functionality as defined by the spec:
* https://tc39.github.io/proposal-observable/#observable-prototype-subscribe
*
* @id subscribe
* @section Consumption
* @name Stream.subscribe(onNext, onError, onComplete)
* @param {Function|object|null} onNext - Handler for next value or observer
* @param {Function|null} onError - Handler function for errors.
* @param {Function|null} onCompleted - Handler Function when stream is done.
* @api public
*
* // with callbacks
* _([1, 2, 3, 4]).subscribe(
* function onNext (x) {
* // Called for each value that comes downstream
* console.log('Received onNext value', x);
* },
* function onError (err) {
* // Called one time with error or zero if no errors occur upstream
* console.error('Single highland stream error', err);
* },
* function onComplete () {
* // Receives no arguments
* // Called only once when stream is completed.
* console.log('Completed!');
* }
* );
*
* // with an observer
* _([1, 2, 3, 4]).subscribe({
* next (x) {
* console.log('Received next value', x);
* },
* error (err) {
* console.error('An error occurred upstream', err);
* },
* complete () {
* console.log('Completed!')
* }
* });
*/

addMethod('subscribe', function (onNext, onError, onComplete) {
var self = this;
var observer = createObserver(onNext, onError, onComplete);

// Don't let users subscribe to an already completed stream
if (self.ended && observer.error) {
observer.error(new Error('Subscribe called on an already completed stream.'));
jaidetree marked this conversation as resolved.
Show resolved Hide resolved
jaidetree marked this conversation as resolved.
Show resolved Hide resolved
}

var source = self.fork();
var subscription = new ObservableSubscription(source);
var consumer = source.consume(function (err, x, push, next) {
if (err) {
push(null, nil);
if (observer.error) {
observer.error(err);
}
subscription._cleanup();
}
else if (x === nil) {
if (observer.complete) {
observer.complete();
}
subscription._cleanup();
}
else {
if (observer.next) {
observer.next(x);
}
next();
}
});

consumer.resume();

return subscription;
});

/*
* Create a variable we can use as a dynamic method name depending on the
* environment.
*
* If Symbols are available get the observable symbol. Otherwise use the a
* fallback string.
* https://tc39.github.io/proposal-observable/#observable-prototype-@@observable
*
* Source taken from RxJS
* https://github.com/ReactiveX/rxjs/commit/4a5aaafc99825ae9b61e410bc0b5e86c7ae75837#diff-d26bc4881b94c82f3c0ae7d3914e9577R13
*/
/* eslint-disable no-undef */
var observable = typeof Symbol === 'function' && Symbol.observable || '@@observable';
/* eslint-enable no-undef */

/**
* Returns an Observable spec-compliant instance (itself) that has a subscribe
* method and a Symbol.observable method. If Symbol is not available in the
* current environment it defaults to '@@observable'. Used by other tools and
* libraries that want to get an observable spec compliant stream interface.
*
* https://tc39.github.io/proposal-observable/#observable-prototype-@@observable
*
* @id Symbol.observable
* @section Consumption
* @name Symbol.observable
* @api public
*
* _([1, 2, 3])[Symbol.observable || "@@observable"]().subscribe(x => {
* console.log("Received value", x);
* });
*/

addMethod(observable, function () {
return this;
});

/**
* Converts the stream to a node Readable Stream for use in methods
Expand Down
62 changes: 62 additions & 0 deletions lib/observableSubscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Observable Subscription
* An implementation of the TC39 Subscription object
* https://tc39.github.io/proposal-observable/#subscription-objects
*
* This class is intended for internal use only.
*
* @id ObservableSubscription
* @name ObservableSubscription
* @param {stream} source - Highland stream to subscribe to
* arguments to the callback. Only valid if `source` is a String.
* @api private
*/
function ObservableSubscription (source) {
this.source = source;
jaidetree marked this conversation as resolved.
Show resolved Hide resolved
this.closed = false;
}

// Instance Methods

/**
* Cleanup
* Perform cleanup routine on a subscription. This can only be called once per
* subscription. Once its closed the subscription cannot be cleaned up again.
*
* Note: This relies heavily upon side-effects and mutates itself.
*
* @id ObservableSubscription.prototype._cleanup(subscription)
* @name ObservableSubscription.prototype._cleanup
* @returns {undefined} Side-effectful function cleans up subscription
* @api private
*/

ObservableSubscription.prototype._cleanup = function cleanup () {
// Don't want to destroy\cleanup an already closed stream
if (this.closed) {
return;
}
this.source = null;
this.closed = true;
};

/**
* Unsubscribe
* Destroy the stream resources and cleanup the subscription.
* @id ObservableSubscription.prototype.unsubscribe()
* @name ObservableSubscription.prototype.unsubscribe()
* @returns {undefined} Side-effectful. Destroys stream and cleans up subscription.
* @api private
*/

ObservableSubscription.prototype.unsubscribe = function unsubscribe () {
// Don't want to destroy\cleanup an already closed stream
if (this.closed) {
return;
}

this.source.destroy();
jaidetree marked this conversation as resolved.
Show resolved Hide resolved
this._cleanup();
};

module.exports = ObservableSubscription;
19 changes: 13 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading