Skip to content

Commit

Permalink
Merge pull request #673 from eccentric-j/subscribe-with-nil-file
Browse files Browse the repository at this point in the history
Implement the Observable spec.
  • Loading branch information
vqvu authored Mar 8, 2019
2 parents 07d737f + f14d5ef commit b1f1c92
Show file tree
Hide file tree
Showing 8 changed files with 521 additions and 20 deletions.
55 changes: 55 additions & 0 deletions lib/createObserver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
var isFunction = require('./isFunction');

/**
* Coerce observer callbacks into observer object or return observer object
* if already created. Will throw an error if both an object and callback args
* are provided.
*
* @id createObserver
* @param {function|object} onNext Function to receive new values or observer
* @param {function} [onError] Optional callback to receive errors.
* @param {function} [onComplete] Optional callback when stream completes
* @return {object} Observer object with next, error, and complete methods
* @private
*
* createObserver(
* function (x) { console.log(x); },
* function (err) { console.error(err); },
* function () { console.log('done'); }
* )
*
* createObserver(
* null,
* null,
* function () { console.log('done'); }
* )
*
* createObserver({
* next: function (x) { console.log(x); },
* error: function (err) { console.error(err); },
* complete: function () { console.log('done'); }
* })
*/
function createObserver (onNext, onError, onComplete) {
var isObserver = onNext && !isFunction(onNext) && typeof onNext === 'object';

// ensure if we have an observer that we don't also have callbacks. Users
// must choose one.
if (isObserver && (onError || onComplete)) {
throw new Error('Subscribe requires either an observer object or optional callbacks.');
}

// onNext is actually an observer
if (isObserver) {
return onNext;
}

// Otherwise create an observer object
return {
next: onNext,
error: onError,
complete: onComplete,
};
}

module.exports = createObserver;
22 changes: 22 additions & 0 deletions lib/global.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Return a global context upon which to install Highland globals. Takes a
* default namespace to use if both the node global and browser window
* namespace cannot be found.
*
* @returns {object} Global namespace context
*/

// Use the nodejs global namespace
if (typeof global !== 'undefined') {
module.exports = global;
}
// Use the browser window namespace
else if (typeof window !== 'undefined') {
module.exports = window;
}
// If neither the global namespace or browser namespace is avaiable
// Use this module as the default context
else {
module.exports = this;
}

112 changes: 98 additions & 14 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var Decoder = require('string_decoder').StringDecoder;

var createObserver = require('./createObserver');
var isFunction = require('./isFunction');
var IntMap = require('./intMap');
var ObservableSubscription = require('./observableSubscription');
var Queue = require('./queue');
var ReadableProxy = require('./readableProxy');

Expand All @@ -21,13 +24,7 @@ var slice = Array.prototype.slice;
var hasOwn = Object.prototype.hasOwnProperty;

// Set up the global object.
var _global = this;
if (typeof global !== 'undefined') {
_global = global;
}
else if (typeof window !== 'undefined') {
_global = window;
}
var _global = require('./global');

// ES5 detected value, used for switch between ES5 and ES3 code
var isES5 = (function () {
Expand Down Expand Up @@ -317,9 +314,7 @@ _.isUndefined = function (x) {
return typeof x === 'undefined';
};

_.isFunction = function (x) {
return typeof x === 'function';
};
_.isFunction = isFunction;

_.isObject = function (x) {
return typeof x === 'object' && x !== null;
Expand Down Expand Up @@ -382,10 +377,7 @@ else {

// set up a global nil object in cases where you have multiple Highland
// instances installed (often via npm)
if (!_global.nil) {
_global.nil = {};
}
var nil = _.nil = _global.nil;
var nil = _.nil = require('./nil');

/**
* Transforms a function with specific arity (all arguments must be
Expand Down Expand Up @@ -2109,6 +2101,98 @@ addMethod('toPromise', function (PromiseCtor) {
});


/**
* Consumes values using the Observable subscribe signature. Unlike other
* consumption methods, subscribe can be called multiple times. Each
* subscription will receive the current value before receiving the next value.
* Subscribing to an already consumed stream will result in an error.
*
* Implements the Observable subscribe functionality as defined by the spec:
* https://tc39.github.io/proposal-observable/#observable-prototype-subscribe
*
* @id subscribe
* @section Consumption
* @name Stream.subscribe(onNext, onError, onComplete)
* @param {Function|object|null} onNext - Handler for next value or observer
* @param {Function|null} onError - Handler function for errors.
* @param {Function|null} onCompleted - Handler Function when stream is done.
* @returns {ObservableSubscription} - Subscription with unsubscribed method
* @api public
*
* // with callbacks
* _([1, 2, 3, 4]).subscribe(
* function onNext (x) {
* // Called for each value that comes downstream
* console.log('Received onNext value', x);
* },
* function onError (err) {
* // Called one time with error or zero if no errors occur upstream
* console.error('Single highland stream error', err);
* },
* function onComplete () {
* // Receives no arguments
* // Called only once when stream is completed.
* console.log('Completed!');
* }
* );
*
* // with an observer
* _([1, 2, 3, 4]).subscribe({
* next (x) {
* console.log('Received next value', x);
* },
* error (err) {
* console.error('An error occurred upstream', err);
* },
* complete () {
* console.log('Completed!')
* }
* });
*/

addMethod('subscribe', function (onNext, onError, onComplete) {
var observer = createObserver(onNext, onError, onComplete);

return new ObservableSubscription(this, observer);
});

/*
* Create a variable we can use as a dynamic method name depending on the
* environment.
*
* If Symbols are available get the observable symbol. Otherwise use the a
* fallback string.
* https://tc39.github.io/proposal-observable/#observable-prototype-@@observable
*
* Source taken from RxJS
* https://github.com/ReactiveX/rxjs/commit/4a5aaafc99825ae9b61e410bc0b5e86c7ae75837#diff-d26bc4881b94c82f3c0ae7d3914e9577R13
*/
/* eslint-disable no-undef */
var observable = typeof Symbol === 'function' && Symbol.observable || '@@observable';
/* eslint-enable no-undef */

/**
* Returns an Observable spec-compliant instance (itself) that has a subscribe
* method and a Symbol.observable method. If Symbol is not available in the
* current environment it defaults to '@@observable'. Used by other tools and
* libraries that want to get an observable spec compliant stream interface.
*
* https://tc39.github.io/proposal-observable/#observable-prototype-@@observable
*
* @id Symbol.observable
* @section Consumption
* @name Symbol.observable
* @api public
*
* _([1, 2, 3])[Symbol.observable || "@@observable"]().subscribe(x => {
* console.log("Received value", x);
* });
*/

addMethod(observable, function () {
return this;
});

/**
* Converts the stream to a node Readable Stream for use in methods
* or pipes that depend on the native stream type.
Expand Down
14 changes: 14 additions & 0 deletions lib/isFunction.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Predicate function takes a value and returns true if it is a function.
*
* @id isFunction
* @name isFunction(x)
* @param {any} x - Any value to test against
* @returns {bool} True if x is a function
*/

function isFunction (x) {
return typeof x === 'function';
}

module.exports = isFunction;
21 changes: 21 additions & 0 deletions lib/nil.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
var _global = require('./global');

/*
* Resolve nil value from global namespace if it exists. This may happen when
* there are multiple versions of highland (like npm).
*
* nil is only equal to itself:
*
* nil === {} => false
* nil === nil => true
*
* This property makes it valuable for determining a lack of input from a
* falsey value such as nil or undefined. When a highland stream encounters
* nil it knows for sure the intention is to end the stream.
*/

if (!_global.nil) {
_global.nil = {};
}

module.exports = _global.nil;
106 changes: 106 additions & 0 deletions lib/observableSubscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
var nil = require('./nil');

/**
* An implementation of the TC39 Subscription object
* https://tc39.github.io/proposal-observable/#subscription-objects
*
* This class is intended for internal use only.
*
* Constructor takes a source highland stream, and an observer object with
* an optional next, error, and complete methods.
*
* Returns a subscription object with a closed boolean and unsubscribe
* method.
*
* @id ObservableSubscription
* @name ObservableSubscription
* @param {stream} stream - Highland stream to subscribe to
* @param {object} observer - Observer to publish from stream subscription
* @api private
*/
function ObservableSubscription (stream, observer) {
var self = this;

// Set attributes
this._source = stream.fork();
this.closed = false;

// Don't let users subscribe to an already completed stream
if (stream.ended) {
if (observer.error) {
observer.error(new Error('Subscribe called on an already completed stream.'));
}

this._cleanup();

return;
}

// Consume the stream and emit data to the observer
this._source = this._source.consume(function (err, x, push, next) {
if (err) {
push(null, nil);
if (observer.error) {
observer.error(err);
}
self._cleanup();
}
else if (x === nil) {
if (observer.complete) {
observer.complete();
}
self._cleanup();
}
else {
if (observer.next) {
observer.next(x);
}
next();
}
});

this._source.resume();
}

// Instance Methods

/**
* Perform cleanup routine on a subscription. This can only be called once per
* subscription. Once its closed the subscription cannot be cleaned up again.
*
* Note: This relies heavily upon side-effects and mutates itself.
*
* @id ObservableSubscription.prototype._cleanup(subscription)
* @name ObservableSubscription.prototype._cleanup
* @returns {undefined} Side-effectful function cleans up subscription
* @api private
*/

ObservableSubscription.prototype._cleanup = function cleanup () {
// Don't want to destroy\cleanup an already closed stream
if (this.closed) {
return;
}
this._source = null;
this.closed = true;
};

/**
* Destroy the stream resources and cleanup the subscription.
* @id ObservableSubscription.prototype.unsubscribe()
* @name ObservableSubscription.prototype.unsubscribe()
* @returns {undefined} Side-effectful. Destroys stream and cleans up subscription.
* @api private
*/

ObservableSubscription.prototype.unsubscribe = function unsubscribe () {
// Don't want to destroy\cleanup an already closed stream
if (this.closed) {
return;
}

this._source.destroy();
this._cleanup();
};

module.exports = ObservableSubscription;
Loading

0 comments on commit b1f1c92

Please sign in to comment.