Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX #103 - Implementing discovered servers API #143

Merged
merged 1 commit into from
May 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions lib/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var net = require('net'),
* Constants
*/

var VERSION = '0.7.16',
var VERSION = '0.7.18',

DEFAULT_PORT = 4222,
DEFAULT_PRE = 'nats://localhost:',
Expand Down Expand Up @@ -482,7 +482,8 @@ Client.prototype.sendConnect = function() {
'lang' : 'node',
'version' : VERSION,
'verbose' : this.options.verbose,
'pedantic': this.options.pedantic
'pedantic': this.options.pedantic,
'protocol': 1,
};
if (this.user !== undefined) {
cs.user = this.user;
Expand All @@ -494,7 +495,6 @@ Client.prototype.sendConnect = function() {
if (this.options.name !== undefined) {
cs.name = this.options.name;
}

// If we enqueued requests before we received INFO from the server, or we
// reconnected, there be other data pending, write this immediately instead
// of adding it to the queue.
Expand Down Expand Up @@ -797,6 +797,33 @@ Client.prototype.processInbound = function() {
if (client.checkTLSMismatch() === true) {
return;
}

// Always try to read the connect_urls from info
if(client.info.connect_urls && client.info.connect_urls.length > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, indentation looks strange.
Second, are you not going to do that for any protocol message received by the client? Looks to me that you should ensure that protocol being processed is an INFO here.
Also, not sure if you support randomization, but what the other clients do is randomize (if allowed) the array or connect_urls prior to add the URLs to the server pool.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I preserved the indentation that was in the file :)
The block we are in is only handling INFO messages. See https://sourcegraph.com/github.com/nats-io/node-nats@b5db73ee06428471568a730c3b0ff7f907fa6943/-/blob/lib/nats.js#L798:1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with this link, I am sorry but it looks horrible. It is very difficult to know what is in which block.

// don't add duplicates
var known = [];
client.servers.forEach(function(server) {
known.push(server.url.href);
});
// add new ones
var toAdd = [];
client.info.connect_urls.forEach(function(server) {
var u = 'nats://' + server;
if(known.indexOf(u) === -1) {
toAdd.push(new Server(url.parse(u)));
}
});

if(toAdd.length > 0) {
if(client.options.noRandomize !== true) {
shuffle(toAdd);
}
toAdd.forEach(function(s) {
client.servers.push(s);
});
}
}

// Process first INFO
if (client.infoReceived === false) {
// Switch over to TLS as needed.
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nats",
"version": "0.7.16",
"version": "0.7.18",
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system",
"keywords": [
"nats",
Expand Down
143 changes: 143 additions & 0 deletions test/dyncluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/* jslint node: true */
/* global describe: false, before: false, after: false, it: false, afterEach: false, beforeEach: false */
/* jshint -W030 */
'use strict';

var NATS = require ('../'),
nsc = require('./support/nats_server_control'),
should = require('should');

describe('Dynamic Cluster - Connect URLs', function() {
this.timeout(10000);

// this to enable per test cleanup
var servers;
// Shutdown our servers
afterEach(function() {
nsc.stop_cluster(servers);
servers = [];
});

it('adding cluster performs update', function(done) {
var route_port = 54220;
var port = 54221;

// start a new cluster with single server
servers = nsc.start_cluster([port], route_port, function() {
should(servers.length).be.equal(1);

// connect the client
var nc = NATS.connect({'port': port, 'reconnectTimeWait': 100});
nc.on('connect', function () {
// start adding servers
process.nextTick(function () {
var others = nsc.add_member_with_delay([port + 1, port + 2], route_port, 250, function () {
// verify that 2 servers were added
should(others.length).be.equal(2);
others.forEach(function (o) {
// add them so they can be reaped
servers.push(o);
});
// give some time for the server to send infos
setTimeout(function () {
// we should know of 3 servers - the one we connected and the 2 we added
should(nc.servers.length).be.equal(3);
done();
}, 1000);
});
});
});
});
});

it('added servers are shuffled at the end of the list', function(done) {
var route_port = 54320;
var port = 54321;
// start a cluster of one server
var ports = [];
for (var i = 0; i < 10; i++) {
ports.push(port + i);
}
var map = {};
servers = nsc.start_cluster(ports, route_port, function () {
should(servers.length).be.equal(10);

var connectCount = 0;
function connectAndRecordPorts(check) {
var nc = NATS.connect({'port': port, 'reconnectTimeWait': 100});
nc.on('connect', function () {
var have = [];
nc.servers.forEach(function (s) {
have.push(s.url.port);
});

connectCount++;
should.ok(have[0] == port);
var key = have.join("_");
map[key] = map[key] ? map[key] + 1 : 1;
nc.close();
if (connectCount === 10) {
check();
}
});
}

// we should have more than one property if there was randomization
function check() {
var keys = Object.getOwnPropertyNames(map);
should.ok(keys.length > 1);
done();
}
// connect several times...
for (var i = 0; i < 10; i++) {
connectAndRecordPorts(check);
}
});
});

it('added servers not shuffled when noRandomize is set', function(done) {
var route_port = 54320;
var port = 54321;
// start a cluster of one server
var ports = [];
for (var i = 0; i < 10; i++) {
ports.push(port + i);
}
var map = {};
servers = nsc.start_cluster(ports, route_port, function () {
should(servers.length).be.equal(10);

var connectCount = 0;
function connectAndRecordPorts(check) {
var nc = NATS.connect({'port': port, 'reconnectTimeWait': 100, 'noRandomize': true});
nc.on('connect', function () {
var have = [];
nc.servers.forEach(function (s) {
have.push(s.url.port);
});

connectCount++;
should.ok(have[0] == port);
var key = have.join("_");
map[key] = map[key] ? map[key] + 1 : 1;
nc.close();
if (connectCount === 10) {
check();
}
});
}

// we should have more than one property if there was randomization
function check() {
var keys = Object.getOwnPropertyNames(map);
should.ok(keys.length === 1);
should.ok(map[keys[0]] === 10);
done();
}
// connect several times...
for (var i = 0; i < 10; i++) {
connectAndRecordPorts(check);
}
});
});
});
85 changes: 82 additions & 3 deletions test/support/nats_server_control.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ var net = require('net');
var SERVER = (process.env.TRAVIS) ? 'gnatsd/gnatsd' : 'gnatsd';
var DEFAULT_PORT = 4222;

exports.start_server = function(port, opt_flags, done) {
function start_server(port, opt_flags, done) {
if (!port) {
port = DEFAULT_PORT;
}
Expand Down Expand Up @@ -94,10 +94,89 @@ exports.start_server = function(port, opt_flags, done) {
});

return server;
};
}

exports.start_server = start_server;

exports.stop_server = function(server) {
function stop_server(server) {
if (server !== undefined) {
server.kill();
}
}

exports.stop_server = stop_server;

// starts a number of servers in a cluster at the specified ports.
// must call with at least one port.
function start_cluster(ports, route_port, opt_flags, done) {
if (typeof opt_flags == 'function') {
done = opt_flags;
opt_flags = null;
}
var servers = [];
var started = 0;
var server = add_member(ports[0], route_port, route_port, function() {
started++;
servers.push(server);
if(started === ports.length) {
done();
}
});

var others = ports.slice(1);
others.forEach(function(p){
var s = add_member(p, route_port, p+1000, opt_flags, function() {
started++;
servers.push(s);
if(started === ports.length) {
done();
}
});
});
return servers;
}

// adds more cluster members, if more than one server is added additional
// servers are added after the specified delay.
function add_member_with_delay(ports, route_port, delay, opt_flags, done) {
if (typeof opt_flags == 'function') {
done = opt_flags;
opt_flags = null;
}
var servers = [];
ports.forEach(function(p, i) {
setTimeout(function() {
var s = add_member(p, route_port, p+1000, opt_flags, function() {
servers.push(s);
if(servers.length === ports.length) {
done();
}
});
}, i*delay);
});

return servers;
}
exports.add_member_with_delay = add_member_with_delay;

function add_member(port, route_port, cluster_port, opt_flags, done) {
if (typeof opt_flags == 'function') {
done = opt_flags;
opt_flags = null;
}
opt_flags = opt_flags || [];
var opts = JSON.parse(JSON.stringify(opt_flags));
opts.push('--routes', 'nats://localhost:' + route_port);
opts.push('--cluster', 'nats://localhost:' + cluster_port);
return start_server(port, opts, done);
}

exports.start_cluster = start_cluster;
exports.add_member = add_member;

exports.stop_cluster = function(servers) {
servers.forEach(function(s) {
stop_server(s);
});
};