Skip to content

Commit

Permalink
Merge branch 'master' into enhancement/#207-api-checks
Browse files Browse the repository at this point in the history
  • Loading branch information
yasserf committed Aug 28, 2016
2 parents 7205e55 + 591f0ca commit cb72b9e
Show file tree
Hide file tree
Showing 23 changed files with 1,118 additions and 126 deletions.
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@ services:
node_js:
- "stable"

script: npm run coverage
script:
- npm run coverage
- npm run new-e2e

after_script: "cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js"
140 changes: 127 additions & 13 deletions dist/deepstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4690,8 +4690,11 @@ exports.ACTIONS.SNAPSHOT = 'SN';
exports.ACTIONS.INVOKE = 'I';
exports.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND = 'SP';
exports.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED = 'SR';
exports.ACTIONS.SUBSCRIPTION_HAS_PROVIDER = 'SH';
exports.ACTIONS.LISTEN = 'L';
exports.ACTIONS.UNLISTEN = 'UL';
exports.ACTIONS.LISTEN_ACCEPT = 'LA';
exports.ACTIONS.LISTEN_REJECT = 'LR';
exports.ACTIONS.PROVIDER_UPDATE = 'PU';
exports.ACTIONS.QUERY = 'Q';
exports.ACTIONS.CREATEORREAD = 'CR';
Expand Down Expand Up @@ -5875,7 +5878,6 @@ var AnonymousRecord = function( recordHandler ) {
this._subscriptions = [];
this._proxyMethod( 'delete' );
this._proxyMethod( 'set' );
this._proxyMethod( 'unsubscribe' );
this._proxyMethod( 'discard' );
};

Expand Down Expand Up @@ -5959,7 +5961,7 @@ AnonymousRecord.prototype.unsubscribe = function() {
};

/**
* Sets the underlying record the anonymous record is boud
* Sets the underlying record the anonymous record is bound
* to. Can be called multiple times.
*
* @param {String} recordName
Expand Down Expand Up @@ -6627,11 +6629,13 @@ RecordHandler.prototype.getAnonymousRecord = function() {
* @returns {void}
*/
RecordHandler.prototype.listen = function( pattern, callback ) {
if( this._listener[ pattern ] && !this._listener[ pattern ].destroyPending ) {
return this._client._$onError( C.TOPIC.RECORD, C.EVENT.LISTENER_EXISTS, pattern );
}
if( this._listener[ pattern ] ) {
this._client._$onError( C.TOPIC.RECORD, C.EVENT.LISTENER_EXISTS, pattern );
} else {
this._listener[ pattern ] = new Listener( C.TOPIC.RECORD, pattern, callback, this._options, this._client, this._connection );
this._listener[ pattern ].destroy();
}
this._listener[ pattern ] = new Listener( C.TOPIC.RECORD, pattern, callback, this._options, this._client, this._connection );
};

/**
Expand All @@ -6644,9 +6648,9 @@ RecordHandler.prototype.listen = function( pattern, callback ) {
* @returns {void}
*/
RecordHandler.prototype.unlisten = function( pattern ) {
if( this._listener[ pattern ] ) {
this._listener[ pattern ].destroy();
delete this._listener[ pattern ];
var listener = this._listener[ pattern ];
if( listener && !listener.destroyPending ) {
listener.sendDestroy();
} else {
this._client._$onError( C.TOPIC.RECORD, C.EVENT.NOT_LISTENING, pattern );
}
Expand Down Expand Up @@ -6763,9 +6767,21 @@ RecordHandler.prototype._$handle = function( message ) {
this._hasRegistry.recieve( name, null, messageParser.convertTyped( message.data[ 1 ] ) );
}

if( this._listener[ name ] ) {
if( message.action === C.ACTIONS.ACK && message.data[ 0 ] === C.ACTIONS.UNLISTEN &&
this._listener[ name ] && this._listener[ name ].destroyPending
) {
processed = true;
this._listener[ name ].destroy();
delete this._listener[ name ];
} else if( this._listener[ name ] ) {
processed = true;
this._listener[ name ]._$onMessage( message );
} else if( message.action === C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED ) {
// An unlisten ACK was received before an PATTERN_REMOVED which is a valid case
processed = true;
} else if( message.action === C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER ) {
// record can receive a HAS_PROVIDER after discarding the record
processed = true;
}

if( !processed ) {
Expand Down Expand Up @@ -6851,6 +6867,7 @@ var Record = function( name, recordOptions, connection, options, client ) {
this._options = options;
this.isReady = false;
this.isDestroyed = false;
this.hasProvider = false;
this._$data = {};
this.version = null;
this._paths = {};
Expand Down Expand Up @@ -7129,6 +7146,9 @@ Record.prototype._$onMessage = function( message ) {
else if( message.data[ 0 ] === C.EVENT.MESSAGE_DENIED ) {
clearInterval( this._readAckTimeout );
clearInterval( this._readTimeout );
} else if( message.action === C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER ) {
this.hasProvider = message.data[ 1 ];
this.emit( 'hasProviderChanged', message.data[ 1 ] );
}
};

Expand Down Expand Up @@ -7967,6 +7987,18 @@ module.exports = AckTimeoutRegistry;
var C = _dereq_( '../constants/constants' );
var ResubscribeNotifier = _dereq_( './resubscribe-notifier' );

/*
* Creates a listener instance which is usedby deepstream Records and Events.
*
* @param {String} type One of CONSTANTS.TOPIC
* @param {String} pattern A pattern that can be compiled via new RegExp(pattern)
* @param {Function} callback The function which is called when pattern was found and removed
* @param {Connection} Connection The instance of the server connection
* @param {Object} options Deepstream options
* @param {Client} client deepstream.io client
*
* @constructor
*/
var Listener = function( type, pattern, callback, options, client, connection ) {
this._type = type;
this._callback = callback;
Expand All @@ -7977,35 +8009,117 @@ var Listener = function( type, pattern, callback, options, client, connection )
this._ackTimeout = setTimeout( this._onAckTimeout.bind( this ), this._options.subscriptionTimeout );
this._resubscribeNotifier = new ResubscribeNotifier( client, this._sendListen.bind( this ) );
this._sendListen();
this._responded = null;
this.destroyPending = false;
};

Listener.prototype.destroy = function() {
Listener.prototype.sendDestroy = function() {
this.destroyPending = true;
this._connection.sendMsg( this._type, C.ACTIONS.UNLISTEN, [ this._pattern ] );
this._resubscribeNotifier.destroy();

};

/*
* Resets internal properties. Is called when provider cals unlisten.
*
* @returns {void}
*/
Listener.prototype.destroy = function() {
this._callback = null;
this._pattern = null;
this._client = null;
this._connection = null;
};

/*
* Accepting a listener request informs deepstream that the current provider is willing to
* provide the record or event matching the subscriptionName . This will establish the current
* provider as the only publisher for the actual subscription with the deepstream cluster.
* Either accept or reject needs to be called by the listener, otherwise it prints out a deprecated warning.
*
* @returns {void}
*/
Listener.prototype.accept = function( name ) {
this._connection.sendMsg( this._type, C.ACTIONS.LISTEN_ACCEPT, [ this._pattern, name ] );
this._responded = true;
}

/*
* Rejecting a listener request informs deepstream that the current provider is not willing
* to provide the record or event matching the subscriptionName . This will result in deepstream
* requesting another provider to do so instead. If no other provider accepts or exists, the
* record will remain unprovided.
* Either accept or reject needs to be called by the listener, otherwise it prints out a deprecated warning.
*
* @returns {void}
*/
Listener.prototype.reject = function( name ) {
this._connection.sendMsg( this._type, C.ACTIONS.LISTEN_REJECT, [ this._pattern, name ] );
this._responded = true;
}

/*
* Wraps accept and reject as an argument for the callback function.
*
* @private
* @returns {Object}
*/
Listener.prototype._createCallbackResponse = function(message) {
return {
accept: this.accept.bind( this, message.data[ 1 ] ),
reject: this.reject.bind( this, message.data[ 1 ] )
}
}

/*
* Handles the incomming message.
*
* @private
* @returns {void}
*/
Listener.prototype._$onMessage = function( message ) {
if( message.action === C.ACTIONS.ACK ) {
clearTimeout( this._ackTimeout );
} else if ( message.action === C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND ) {
this._callback( message.data[ 1 ], true, this._createCallbackResponse( message) );
} else if ( message.action === C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED ) {
this._callback( message.data[ 1 ], false );
} else {
var isFound = message.action === C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND;
this._callback( message.data[ 1 ], isFound );
this._client._$onError( this._type, C.EVENT.UNSOLICITED_MESSAGE, message.data[ 0 ] + '|' + message.data[ 1 ] );
}

if( message.action === C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND && this._responded !== true ) {
var deprecatedMessage = 'DEPRECATED: listen should explicitly accept or reject for pattern: ' + message.data[ 0 ];
deprecatedMessage += '\nhttps://github.com/deepstreamIO/deepstream.io-client-js/issues/212';
if( console && console.warn ) {
console.warn( deprecatedMessage );
}
}
};

/*
* Sends a C.ACTIONS.LISTEN to deepstream.
*
* @private
* @returns {void}
*/
Listener.prototype._sendListen = function() {
this._connection.sendMsg( this._type, C.ACTIONS.LISTEN, [ this._pattern ] );
this._connection.sendMsg( this._type, C.ACTIONS.LISTEN, [ this._pattern ] );
};

/*
* Sends a C.EVENT.ACK_TIMEOUT to deepstream.
*
* @private
* @returns {void}
*/
Listener.prototype._onAckTimeout = function() {
this._client._$onError( this._type, C.EVENT.ACK_TIMEOUT, 'No ACK message received in time for ' + this._pattern );
};

module.exports = Listener;

},{"../constants/constants":35,"./resubscribe-notifier":52}],52:[function(_dereq_,module,exports){
var C = _dereq_( '../constants/constants' );

Expand Down
6 changes: 3 additions & 3 deletions dist/deepstream.min.js

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"e2e::watch": "watch \"npm run e2e\" ./src ./test-e2e",
"unit": "jasmine JASMINE_CONFIG_PATH=jasmine.json test-unit/unit/**/*Spec.js",
"unit::watch": "watch \"npm run unit\" ./src ./test-unit",
"webpack": "webpack --progress test-page-webpack/index.js test-page-webpack/dist/build.js"
"webpack": "webpack --progress test-page-webpack/index.js test-page-webpack/dist/build.js",
"new-e2e": " node ./node_modules/cucumber/bin/cucumber.js ./node_modules/deepstream.io-e2e --require ./test-e2e-gherkin/steps --fail-fast"
},
"repository": {
"type": "git",
Expand All @@ -31,6 +32,8 @@
"devDependencies": {
"browserify": "13.0.1",
"coveralls": "^2.11.9",
"cucumber": "^1.2.2",
"deepstream.io-e2e": "git+https://github.com/deepstreamIO/deepstream.io-e2e.git",
"deepstream.io": "git+https://github.com/deepstreamIO/deepstream.io.git",
"deepstream.io-cache-redis": "latest",
"deepstream.io-msg-redis": "latest",
Expand All @@ -45,6 +48,7 @@
"jasmine": "^2.4.1",
"jasmine-spec-reporter": "^2.4.0",
"proxyquire": "1.7.10",
"sinon": "^1.17.5",
"watch": "^0.19.1"
},
"author": "deepstreamHub GmbH",
Expand Down
3 changes: 3 additions & 0 deletions src/constants/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ exports.ACTIONS.SNAPSHOT = 'SN';
exports.ACTIONS.INVOKE = 'I';
exports.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND = 'SP';
exports.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED = 'SR';
exports.ACTIONS.SUBSCRIPTION_HAS_PROVIDER = 'SH';
exports.ACTIONS.LISTEN = 'L';
exports.ACTIONS.UNLISTEN = 'UL';
exports.ACTIONS.LISTEN_ACCEPT = 'LA';
exports.ACTIONS.LISTEN_REJECT = 'LR';
exports.ACTIONS.PROVIDER_UPDATE = 'PU';
exports.ACTIONS.QUERY = 'Q';
exports.ACTIONS.CREATEORREAD = 'CR';
Expand Down
40 changes: 30 additions & 10 deletions src/event/event-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ EventHandler.prototype.unsubscribe = function( name, callback ) {
};

/**
* Emits an event locally and sends a message to the server to
* Emits an event locally and sends a message to the server to
* broadcast the event to the other connected clients
*
* @param {String} name
* @param {String} name
* @param {Mixed} data will be serialized and deserialized to its original type.
*
* @public
Expand Down Expand Up @@ -117,11 +117,13 @@ EventHandler.prototype.listen = function( pattern, callback ) {
throw new Error( 'invalid argument callback' );
}

if( this._listener[ pattern ] ) {
this._client._$onError( C.TOPIC.EVENT, C.EVENT.LISTENER_EXISTS, pattern );
} else {
this._listener[ pattern ] = new Listener( C.TOPIC.EVENT, pattern, callback, this._options, this._client, this._connection );
if( this._listener[ pattern ] && !this._listener[ pattern ].destroyPending ) {
return this._client._$onError( C.TOPIC.EVENT, C.EVENT.LISTENER_EXISTS, pattern );
} else if( this._listener[ pattern ] ) {
this._listener[ pattern ].destroy();
}

this._listener[ pattern ] = new Listener( C.TOPIC.EVENT, pattern, callback, this._options, this._client, this._connection );
};

/**
Expand All @@ -138,12 +140,16 @@ EventHandler.prototype.unlisten = function( pattern ) {
throw new Error( 'invalid argument pattern' );
}

if( this._listener[ pattern ] ) {
var listener = this._listener[ pattern ];

if( listener && !listener.destroyPending ) {
listener.sendDestroy();
} else if( this._listener[ pattern ] ) {
this._ackTimeoutRegistry.add( pattern, C.EVENT.UNLISTEN );
this._listener[ pattern ].destroy();
delete this._listener[ pattern ];
} else {
this._client._$onError( C.TOPIC.EVENT, C.EVENT.NOT_LISTENING, pattern );
this._client._$onError( C.TOPIC.RECORD, C.EVENT.NOT_LISTENING, pattern );
}
};

Expand All @@ -159,6 +165,7 @@ EventHandler.prototype._$handle = function( message ) {
var name = message.data[ message.action === C.ACTIONS.ACK ? 1 : 0 ];

if( message.action === C.ACTIONS.EVENT ) {
processed = true;
if( message.data && message.data.length === 2 ) {
this._emitter.emit( name, messageParser.convertTyped( message.data[ 1 ], this._client ) );
} else {
Expand All @@ -167,16 +174,29 @@ EventHandler.prototype._$handle = function( message ) {
return;
}

if( this._listener[ name ] ) {
if( message.action === C.ACTIONS.ACK && message.data[ 0 ] === C.ACTIONS.UNLISTEN &&
this._listener[ name ] && this._listener[ name ].destroyPending
) {
this._listener[ name ].destroy();
delete this._listener[ name ];
return;
} else if( this._listener[ name ] ) {
processed = true;
this._listener[ name ]._$onMessage( message );
return;
} else if( message.action === C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED ) {
// An unlisten ACK was received before an PATTERN_REMOVED which is a valid case
return;
} else if( message.action === C.ACTIONS.SUBSCRIPTION_HAS_PROVIDER ) {
// record can receive a HAS_PROVIDER after discarding the record
return;
}

if( message.action === C.ACTIONS.ACK ) {
this._ackTimeoutRegistry.clear( message );
return;
}

if( message.action === C.ACTIONS.ERROR ) {
message.processedError = true;
this._client._$onError( C.TOPIC.EVENT, message.data[ 0 ], message.data[ 1 ] );
Expand Down
3 changes: 1 addition & 2 deletions src/record/anonymous-record.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ var AnonymousRecord = function( recordHandler ) {
this._subscriptions = [];
this._proxyMethod( 'delete' );
this._proxyMethod( 'set' );
this._proxyMethod( 'unsubscribe' );
this._proxyMethod( 'discard' );
};

Expand Down Expand Up @@ -108,7 +107,7 @@ AnonymousRecord.prototype.unsubscribe = function() {
};

/**
* Sets the underlying record the anonymous record is boud
* Sets the underlying record the anonymous record is bound
* to. Can be called multiple times.
*
* @param {String} recordName
Expand Down
Loading

0 comments on commit cb72b9e

Please sign in to comment.