Skip to content

Commit

Permalink
Connections fix (#27)
Browse files Browse the repository at this point in the history
* Fix how the client is setup and torn down so that it:
- Tracks connections that have been opened so they can be closed.
- Tracks background tasks that have been setup so they can be shut down.
- Closes connection sockets.
- Shuts down background tasks.

This was needed to run the test cases as a suite. Previously they only worked individually.

Bound client exit to exit function and also improved debug logging.

This is a step towards fixing #11 but more needs to be done. At least the test suite runs now :)
  • Loading branch information
mattyouill authored and Ning Shi committed Sep 15, 2017
1 parent 41e6522 commit 293b6c2
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 32 deletions.
33 changes: 28 additions & 5 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var EventEmitter = require('events').EventEmitter,
util = require('util'),
VoltConnection = require('./connection'),
VoltConstants = require('./voltconstants');
const debug = require("debug")("voltdb-client-nodejs:VoltClient");

VoltClient = function(configuration) {
EventEmitter.call(this);
Expand All @@ -35,7 +36,7 @@ VoltClient = function(configuration) {
this._getConnections = this.connect.bind(this);
this.call = this.call.bind(this);
this.callProcedure = this.callProcedure.bind(this);
this.exit = this.connect.bind(this);
this.exit = this.exit.bind(this);
this.connectionStats = this.connectionStats.bind(this);

this._connectListener = this._connectListener.bind(this);
Expand Down Expand Up @@ -67,7 +68,13 @@ VoltClient.prototype.connect = function(callback) {
con.on(VoltConstants.SESSION_EVENT.QUERY_RESPONSE_ERROR,this._queryResponseErrorListener);
con.on(VoltConstants.SESSION_EVENT.QUERY_DISPATCH_ERROR,this._queryDispatchErrorListener);
con.on(VoltConstants.SESSION_EVENT.FATAL_ERROR,this._fatalErrorListener);


/**
* Need to register the connection even before the socket connects otherwise
* it can't be torn down in the event of a socket failure.
*/
this._connections.push(con);

con.connect();
}
}
Expand Down Expand Up @@ -117,6 +124,16 @@ VoltClient.prototype.call = function(query, readCallback, writeCallback) {
}

VoltClient.prototype.exit = function(callback) {

debug("Exiting | Connections Length: %o", this._connections.length);

while(this._connections.length > 0){
var c = this._connections[0];
c.close();
this._connections.splice(0, 1);
}

if(callback) callback();
}

VoltClient.prototype.connectionStats = function() {
Expand All @@ -139,11 +156,17 @@ VoltClient.prototype._displayConnectionArrayStats = function(array) {
}
}

/**
* TODO: Not sure why SUCCESS can be both null and 1. Will leave it as is until
* I know why and just brute force map the null to 1 to get it as a String.
*/
function statusCodeToString(code){
return code === null ? VoltConstants.STATUS_CODE_STRINGS[1] : VoltConstants.STATUS_CODE_STRINGS[code];
}

VoltClient.prototype._connectListener = function(code, event, connection) {

if ( VoltConstants.STATUS_CODES.SUCCESS == code) {
this._connections.push(connection);
}
debug("Connected | Code: %o, Event: %o", statusCodeToString(code), event);

this.emit(VoltConstants.SESSION_EVENT.CONNECTION,
code,
Expand Down
19 changes: 16 additions & 3 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ crypto = require('crypto'),
util = require('util'),
Message = require('./message').Message,
VoltConstants = require('./voltconstants');
const debug = require("debug")("voltdb-client-nodejs:VoltConnection");

function VoltMessageManager(configuration) {
EventEmitter.call(this);
Expand Down Expand Up @@ -70,6 +71,9 @@ function VoltConnection(configuration) {
this.invocations = 0;
this.validConnection = true;
this.blocked = false;

this.outstandingQueryManager = null;
this.flusher = null;

}

Expand All @@ -86,8 +90,8 @@ VoltConnection.prototype.initSocket = function(socket) {
VoltConnection.prototype.connect = function() {
this.initSocket(new Socket())
this.socket.connect(this.config.port, this.config.host);
setInterval(this._manageOustandingQueries, this.config.queryTimeoutInterval);
setInterval(this._flush, this.config.flushInterval);
this.outstandingQueryManager = setInterval(this._manageOustandingQueries, this.config.queryTimeoutInterval);
this.flusher = setInterval(this._flush, this.config.flushInterval);
}

VoltConnection.prototype.isValidConnection = function() {
Expand Down Expand Up @@ -142,7 +146,15 @@ VoltConnection.prototype._zeros = function(num) {
}

VoltConnection.prototype.close = function(callback) {
this.socket.end();

debug("Closing");

if(this.outstandingQueryManager) clearInterval(this.outstandingQueryManager);
if(this.flusher) clearInterval(this.flusher);

if(this.socket) this.socket.end();

if(callback) callback();
}

VoltConnection.prototype.onConnect = function(results) {
Expand Down Expand Up @@ -257,6 +269,7 @@ VoltConnection.prototype._queue = function(buffer, track) {
};

VoltConnection.prototype._flush = function() {
debug("Flushing | Send Queue Length: %o", this._sendQueue.length);
var bytes = this._sendQueue.reduce(function(bytes, buffer) {
return bytes + buffer.length;
}, 0);
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
"dependencies": {
"bignumber": "^1.1.0",
"ctype": "^0.5.0",
"cli": "^1.0.0"
"cli": "^1.0.0",
"debug": "^3.0.1",
"supports-color": "^4.4.0"
},
"devDependencies": {
"nodeunit": "^0.11.0"
Expand Down
15 changes: 9 additions & 6 deletions test/cases/connections.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var VoltClient = require('../../lib/client');
var VoltConfiguration = require('../../lib/configuration');
var util = require('util');
var testCase = require('nodeunit');
const debug = require("debug")("voltdb-client-nodejs:ConnectionsTest");

function goodConfig() {
return config('localhost');
Expand All @@ -35,7 +36,7 @@ function badConfig() {
}

function config(host) {
console.log('this config got called');
debug('this config got called');
var config = new VoltConfiguration();
config.host = host;
var configs = [];
Expand All @@ -46,29 +47,31 @@ function config(host) {
exports.connections = {

setUp : function(callback) {
console.log('connections setup called');
debug('connections setup called');
callback();
},
tearDown : function(callback) {
console.log('connections teardown called');
debug('connections teardown called');
callback();
},
'Bad connection results' : function(test) {
console.log('running bad connection test');
debug('running bad connection test');
var client = new VoltClient(badConfig())
client.connect(function startup(code, event, results) {
console.log('bad connection test');
debug('bad connection test');
test.expect(1);
test.notEqual(code, null, 'There should not be a host named idontexists');
client.exit();
test.done();
});
},
'Good connection results' : function(test) {
console.log('running good connection test');
debug('running good connection test');
var client = new VoltClient(goodConfig())
client.connect(function startup(code, event, results) {
test.expect(1);
test.equal(code, null, 'Should have been able to connect, is Volt running on localhost?');
client.exit();
test.done();
});
}
Expand Down
32 changes: 15 additions & 17 deletions test/cases/typestest.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var VoltClient = require('../../lib/client');
var VoltConfiguration = require('../../lib/configuration');
var VoltProcedure = require('../../lib/query');
var VoltQuery = require('../../lib/query');
const debug = require("debug")("voltdb-client-nodejs:TypeTest");

var util = require('util');
var testCase = require('nodeunit');
Expand All @@ -46,40 +47,37 @@ function config() {
exports.typetest = {

setUp : function(callback) {
if(client == null) {
console.log('typetest setup called');
client = new VoltClient(config());
client.connect(function startup(code, event, results) {
console.log('dasda connected');
callback();
});
} else {
debug('typetest setup called');
client = new VoltClient(config());
client.connect(function startup(code, event, results) {
debug('dasda connected');
callback();
}
});
},
tearDown : function(callback) {
console.log('typetest teardown called');
debug('typetest teardown called');
client.exit();
callback();
},
'Init test' : function(test) {
console.log('init test');
debug('init test');
test.expect(2);

var initProc = new VoltProcedure('InitTestType', ['int']);
var query = initProc.getQuery();
query.setParameters([0]);

client.callProcedure(query, function read(code, event, results) {
console.log('results', results);
debug('results %o', results);
test.equals(code, null , 'did I get called');
test.done();
}, function write(code, event, results) {
test.equals(code, null, 'Write didn\'t had an error');
console.log('write ok');
debug('write ok');
});
},
'select test' : function(test) {
console.log('select test');
debug('select test');
test.expect(11);

var initProc = new VoltProcedure('TYPETEST.select', ['int']);
Expand All @@ -89,8 +87,8 @@ exports.typetest = {
client.callProcedure(query, function read(code, event, results) {

var testBuffer = new Buffer(4);
console.log('results inspection: ', results.table[0][0].TEST_TIMESTAMP);
console.log('inspect', util.inspect(results.table[0][0]));
debug('results inspection: %o', results.table[0][0].TEST_TIMESTAMP);
debug('inspect %s', util.inspect(results.table[0][0]));

test.equals(code, null, 'Invalid status: ' + results.status + 'should be 1');

Expand All @@ -109,7 +107,7 @@ exports.typetest = {

test.done();
}, function write(code, event, results) {
console.log('write ok');
debug('write ok');
test.ok(true, 'Write didn\'t get called');
});
}
Expand Down

0 comments on commit 293b6c2

Please sign in to comment.