Skip to content

Commit

Permalink
Merge pull request #227 from deepstreamIO/enhancement/#207-api-checks
Browse files Browse the repository at this point in the history
Enhancement/#207 api checks
  • Loading branch information
yasserf authored Aug 28, 2016
2 parents 591f0ca + cb72b9e commit d30e4e7
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 19 deletions.
68 changes: 50 additions & 18 deletions src/event/event-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,52 +31,69 @@ var EventHandler = function( options, connection, client ) {
* Subscribe to an event. This will receive both locally emitted events
* as well as events emitted by other connected clients.
*
* @param {String} eventName
* @param {String} name
* @param {Function} callback
*
* @public
* @returns {void}
*/
EventHandler.prototype.subscribe = function( eventName, callback ) {
if( !this._emitter.hasListeners( eventName ) ) {
this._ackTimeoutRegistry.add( eventName, C.ACTIONS.SUBSCRIBE );
this._connection.sendMsg( C.TOPIC.EVENT, C.ACTIONS.SUBSCRIBE, [ eventName ] );
EventHandler.prototype.subscribe = function( name, callback ) {
if ( typeof name !== 'string' || name.length === 0 ) {
throw new Error( 'invalid argument name' );
}
if ( typeof callback !== 'function' ) {
throw new Error( 'invalid argument callback' );
}

this._emitter.on( eventName, callback );
if( !this._emitter.hasListeners( name ) ) {
this._ackTimeoutRegistry.add( name, C.ACTIONS.SUBSCRIBE );
this._connection.sendMsg( C.TOPIC.EVENT, C.ACTIONS.SUBSCRIBE, [ name ] );
}

this._emitter.on( name, callback );
};

/**
* Removes a callback for a specified event. If all callbacks
* for an event have been removed, the server will be notified
* that the client is unsubscribed as a listener
*
* @param {String} eventName
* @param {String} name
* @param {Function} callback
*
* @public
* @returns {void}
*/
EventHandler.prototype.unsubscribe = function( eventName, callback ) {
this._emitter.off( eventName, callback );

if( !this._emitter.hasListeners( eventName ) ) {
this._ackTimeoutRegistry.add( eventName, C.ACTIONS.UNSUBSCRIBE );
this._connection.sendMsg( C.TOPIC.EVENT, C.ACTIONS.UNSUBSCRIBE, [ eventName ] );
EventHandler.prototype.unsubscribe = function( name, callback ) {
if ( typeof name !== 'string' || name.length === 0 ) {
throw new Error( 'invalid argument name' );
}
if ( callback !== undefined && typeof callback !== 'function' ) {
throw new Error( 'invalid argument callback' );
}
this._emitter.off( name, callback );

if( !this._emitter.hasListeners( name ) ) {
this._ackTimeoutRegistry.add( name, C.ACTIONS.UNSUBSCRIBE );
this._connection.sendMsg( C.TOPIC.EVENT, C.ACTIONS.UNSUBSCRIBE, [ name ] );
}
};

/**
* Emits an event locally and sends a message to the server to
* Emits an event locally and sends a message to the server to
* broadcast the event to the other connected clients
*
* @param {String} name
* @param {String} name
* @param {Mixed} data will be serialized and deserialized to its original type.
*
* @public
* @returns {void}
*/
EventHandler.prototype.emit = function( name, data ) {
if ( typeof name !== 'string' || name.length === 0 ) {
throw new Error( 'invalid argument name' );
}

this._connection.sendMsg( C.TOPIC.EVENT, C.ACTIONS.EVENT, [ name, messageBuilder.typed( data ) ] );
this._emitter.emit( name, data );
};
Expand All @@ -93,13 +110,19 @@ EventHandler.prototype.emit = function( name, data ) {
* @returns {void}
*/
EventHandler.prototype.listen = function( pattern, callback ) {
if( this._listener[ pattern ] && !this._listener[ pattern ].destroyPending ) {
return this._client._$onError( C.TOPIC.EVENT, C.EVENT.LISTENER_EXISTS, pattern );
if ( typeof pattern !== 'string' || pattern.length === 0 ) {
throw new Error( 'invalid argument pattern' );
}
if ( typeof callback !== 'function' ) {
throw new Error( 'invalid argument callback' );
}

if( this._listener[ pattern ] ) {
if( this._listener[ pattern ] && !this._listener[ pattern ].destroyPending ) {
return this._client._$onError( C.TOPIC.EVENT, C.EVENT.LISTENER_EXISTS, pattern );
} else if( this._listener[ pattern ] ) {
this._listener[ pattern ].destroy();
}

this._listener[ pattern ] = new Listener( C.TOPIC.EVENT, pattern, callback, this._options, this._client, this._connection );
};

Expand All @@ -113,9 +136,18 @@ EventHandler.prototype.listen = function( pattern, callback ) {
* @returns {void}
*/
EventHandler.prototype.unlisten = function( pattern ) {
if ( typeof pattern !== 'string' || pattern.length === 0 ) {
throw new Error( 'invalid argument pattern' );
}

var listener = this._listener[ pattern ];

if( listener && !listener.destroyPending ) {
listener.sendDestroy();
} else if( this._listener[ pattern ] ) {
this._ackTimeoutRegistry.add( pattern, C.EVENT.UNLISTEN );
this._listener[ pattern ].destroy();
delete this._listener[ pattern ];
} else {
this._client._$onError( C.TOPIC.RECORD, C.EVENT.NOT_LISTENING, pattern );
}
Expand Down
4 changes: 4 additions & 0 deletions src/record/list.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ var EventEmitter = require( 'component-emitter' ),
* @constructor
*/
var List = function( recordHandler, name, options ) {
if ( typeof name !== 'string' || name.length === 0 ) {
throw new Error( 'invalid argument name' );
}

this._recordHandler = recordHandler;
this._record = this._recordHandler.getRecord( name, options );
this._record._applyUpdate = this._applyUpdate.bind( this );
Expand Down
27 changes: 27 additions & 0 deletions src/record/record-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,17 @@ RecordHandler.prototype.getAnonymousRecord = function() {
* @returns {void}
*/
RecordHandler.prototype.listen = function( pattern, callback ) {
if ( typeof pattern !== 'string' || pattern.length === 0 ) {
throw new Error( 'invalid argument pattern' );
}
if ( typeof callback !== 'function' ) {
throw new Error( 'invalid argument callback' );
}

if( this._listener[ pattern ] && !this._listener[ pattern ].destroyPending ) {
return this._client._$onError( C.TOPIC.RECORD, C.EVENT.LISTENER_EXISTS, pattern );
}

if( this._listener[ pattern ] ) {
this._listener[ pattern ].destroy();
}
Expand All @@ -121,9 +129,16 @@ RecordHandler.prototype.listen = function( pattern, callback ) {
* @returns {void}
*/
RecordHandler.prototype.unlisten = function( pattern ) {
if ( typeof pattern !== 'string' || pattern.length === 0 ) {
throw new Error( 'invalid argument pattern' );
}

var listener = this._listener[ pattern ];
if( listener && !listener.destroyPending ) {
listener.sendDestroy();
} else if( this._listener[ pattern ] ) {
this._listener[ pattern ].destroy();
delete this._listener[ pattern ];
} else {
this._client._$onError( C.TOPIC.RECORD, C.EVENT.NOT_LISTENING, pattern );
}
Expand All @@ -138,6 +153,10 @@ RecordHandler.prototype.unlisten = function( pattern ) {
* @public
*/
RecordHandler.prototype.snapshot = function( name, callback ) {
if ( typeof name !== 'string' || name.length === 0 ) {
throw new Error( 'invalid argument name' );
}

if( this._records[ name ] && this._records[ name ].isReady ) {
callback( null, this._records[ name ].get() );
} else {
Expand All @@ -154,6 +173,10 @@ RecordHandler.prototype.snapshot = function( name, callback ) {
* @public
*/
RecordHandler.prototype.has = function( name, callback ) {
if ( typeof name !== 'string' || name.length === 0 ) {
throw new Error( 'invalid argument name' );
}

if( this._records[ name ] ) {
callback( null, true );
} else {
Expand Down Expand Up @@ -287,6 +310,10 @@ RecordHandler.prototype._onRecordError = function( recordName, error ) {
* @returns {void}
*/
RecordHandler.prototype._onDestroyPending = function( recordName ) {
if ( !this._records[ recordName ] ) {
this.emit( 'error', 'Record \'' + recordName + '\' does not exists' );
return;
}
var onMessage = this._records[ recordName ]._$onMessage.bind( this._records[ recordName ] );
this._destroyEventEmitter.once( 'destroy_ack_' + recordName, onMessage );
this._removeRecord( recordName );
Expand Down
26 changes: 25 additions & 1 deletion src/record/record.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ var JsonPath = require( './json-path' ),
* @constructor
*/
var Record = function( name, recordOptions, connection, options, client ) {
if ( typeof name !== 'string' || name.length === 0 ) {
throw new Error( 'invalid argument name' );
}

this.name = name;
this.usages = 0;
this._recordOptions = recordOptions;
Expand Down Expand Up @@ -114,7 +118,10 @@ Record.prototype.get = function( path ) {
*/
Record.prototype.set = function( pathOrData, data ) {
if( arguments.length === 1 && typeof pathOrData !== 'object' ) {
throw new Error( 'Invalid record data ' + pathOrData + ': Record data must be an object' );
throw new Error( 'invalid argument data' );
}
if( arguments.length === 2 && ( typeof pathOrData !== 'string' || pathOrData.length === 0 ) ) {
throw new Error( 'invalid argument path' )
}

if( this._checkDestroyed( 'set' ) ) {
Expand Down Expand Up @@ -179,6 +186,13 @@ Record.prototype.set = function( pathOrData, data ) {
Record.prototype.subscribe = function( path, callback, triggerNow ) {
var args = this._normalizeArguments( arguments );

if ( args.path !== undefined && ( typeof args.path !== 'string' || args.path.length === 0 ) ) {
throw new Error( 'invalid argument path' );
}
if ( typeof args.callback !== 'function' ) {
throw new Error( 'invalid argument callback' );
}

if( this._checkDestroyed( 'subscribe' ) ) {
return;
}
Expand Down Expand Up @@ -217,6 +231,13 @@ Record.prototype.subscribe = function( path, callback, triggerNow ) {
Record.prototype.unsubscribe = function( pathOrCallback, callback ) {
var args = this._normalizeArguments( arguments );

if ( args.path !== undefined && ( typeof args.path !== 'string' || args.path.length === 0 ) ) {
throw new Error( 'invalid argument path' );
}
if ( args.callback !== undefined && typeof args.callback !== 'function' ) {
throw new Error( 'invalid argument callback' );
}

if( this._checkDestroyed( 'unsubscribe' ) ) {
return;
}
Expand All @@ -235,6 +256,9 @@ Record.prototype.unsubscribe = function( pathOrCallback, callback ) {
* @returns {void}
*/
Record.prototype.discard = function() {
if( this._checkDestroyed( 'discard' ) ) {
return;
}
this.whenReady( function() {
this.usages--;
if( this.usages <= 0 ) {
Expand Down
17 changes: 17 additions & 0 deletions src/rpc/rpc-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ var RpcHandler = function( options, connection, client ) {
* @returns void
*/
RpcHandler.prototype.provide = function( name, callback ) {
if ( typeof name !== 'string' || name.length === 0 ) {
throw new Error( 'invalid argument name' );
}
if( this._providers[ name ] ) {
throw new Error( 'RPC ' + name + ' already registered' );
}
if ( typeof callback !== 'function' ) {
throw new Error( 'invalid argument callback' );
}

this._ackTimeoutRegistry.add( name, C.ACTIONS.SUBSCRIBE );
this._providers[ name ] = callback;
Expand All @@ -65,6 +71,10 @@ RpcHandler.prototype.provide = function( name, callback ) {
* @returns {void}
*/
RpcHandler.prototype.unprovide = function( name ) {
if ( typeof name !== 'string' || name.length === 0 ) {
throw new Error( 'invalid argument name' );
}

if( this._providers[ name ] ) {
delete this._providers[ name ];
this._ackTimeoutRegistry.add( name, C.ACTIONS.UNSUBSCRIBE );
Expand All @@ -84,6 +94,13 @@ RpcHandler.prototype.unprovide = function( name ) {
* @returns {void}
*/
RpcHandler.prototype.make = function( name, data, callback ) {
if ( typeof name !== 'string' || name.length === 0 ) {
throw new Error( 'invalid argument name' );
}
if ( typeof callback !== 'function' ) {
throw new Error( 'invalid argument callback' );
}

var uid = this._client.getUid(),
typedData = messageBuilder.typed( data );

Expand Down

0 comments on commit d30e4e7

Please sign in to comment.