Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
Added the support for the connection of peers in ZeromqAscoltatore.
Browse files Browse the repository at this point in the history
Closes #26.
  • Loading branch information
mcollina committed Mar 2, 2013
1 parent 08015cd commit 4b5ba36
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 14 deletions.
105 changes: 98 additions & 7 deletions lib/zeromq_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ var util = require("./util");
var wrap = util.wrap;
var MemoryAscoltatore = require("./memory_ascoltatore");
var AbstractAscoltatore = require('./abstract_ascoltatore');
var debug = require("debug")("ascoltatore:zmq");
var debug = require("debug")("ascoltatori:zmq");
var async = require("async");

/**
* ZeromqAscoltatore is a class that inherits from AbstractAscoltatore.
Expand All @@ -17,8 +18,9 @@ var debug = require("debug")("ascoltatore:zmq");
* however loops are avoided.
*
* The options are:
* - `port`, the zmq port to listen from;
* - `remotePorts`, the remote ports to connect to;
* - `port`, the zmq port where messages will be published;
* - `controlPort`, the zmq port where control messages will be exchanged;
* - `remotePorts`, the remote control ports that will be connected to;
* - `zmq`, the zmq module (it will automatically be required if not present);
* - `delay`, a delay that is applied to the `ready` and `closed` events (the default is 5ms);
*
Expand All @@ -34,7 +36,10 @@ function ZeromqAscoltatore(opts) {
this._ascoltatore = new MemoryAscoltatore();
this._startSubs();
this._startPub();
this._startControl();
this._ascoltatore.on("newTopic", this.emit.bind(this, "newTopic"));

this._connectedControls = [];
}

/**
Expand Down Expand Up @@ -78,15 +83,16 @@ ZeromqAscoltatore.prototype._startPub = function () {
var that = this;
if (that._pub_conn === undefined) {
that._pub_conn = createConn(that._opts, "pub");
debug("opening pub port " + that._opts.port);
that._pub_conn.bind(that._opts.port, function (err) {
if (err) {
throw err;
}

debug("bound the connection to port " + that._opts.port);
debug("bound the publish connection to port " + that._opts.port);

setTimeout(function () {
that.connect(that._opts.port, function () {
that._connectSub(that._opts.port, function () {
that.emit("ready");
});
}, that._opts.delay);
Expand All @@ -95,17 +101,91 @@ ZeromqAscoltatore.prototype._startPub = function () {
return that._pub_conn;
};

ZeromqAscoltatore.prototype._startControl = function () {
var that = this;
if (that._control_conn === undefined) {
that._control_conn = createConn(that._opts, "req");
debug("opening control port " + that._opts.controlPort);
that._control_conn.bind(that._opts.controlPort, function (err) {
if (err) {
throw err;
}

debug("bound the control connection to port " + that._opts.controlPort);

that._control_conn_interval = setInterval(function () {
var packet = that._sub_conns.map(function (c) {
return c.port;
}).join(",");
debug("sending control packet " + packet);
that._control_conn.send(packet);
}, 250);

that._control_conn.on("message", function (data) {
debug("received connect response from " + data);
that._connectSub(data);
});
});
}
};

/**
* Connect the Ascoltatore to a remote ZMQ port.
* Connect the Ascoltatore to the remote ZeromqAscoltatore exposed
* through the given port
*
* @param {String} port
* @param {String} port The control port of the remote ascoltatore
* @param {Function} callback
* @api public
*/
ZeromqAscoltatore.prototype.connect = function connect(port, callback) {
var that = this,
conn = createConn(that._opts, "rep");

conn.connect(port);

that._connectedControls.push(conn);

conn.on("message", function (data) {
debug("received connect request from " + data);
conn.send(that._opts.port);

var dests = String(data).split(",").filter(function (dest) {
var found = true;
that._sub_conns.forEach(function (conn) {
if(conn.port === dest) {
found = false;
}
});
return found;
}).map(function (dest) {
return function (cb) {
that._connectSub(dest, cb);
};
});

async.parallel(dests, function () {
setTimeout(function () {
wrap(callback)();
}, that._opts.delay);
});
});
};

/**
* Connect the Ascoltatore to the remote ZMQ port.
*
* @param {String} port The control port of the remote ascoltatore
* @param {Function} callback
* @api private
*/
ZeromqAscoltatore.prototype._connectSub = function (port, callback) {
var that = this,
conn = createConn(that._opts, "sub");

port = String(port);

debug("connecting to port " + port);
conn.port = port;
conn.connect(port);
conn.subscribe("");
that._sub_conns.push(conn);
Expand Down Expand Up @@ -172,11 +252,22 @@ ZeromqAscoltatore.prototype.close = function close(done) {
});
delete that._sub_conns;
}

that._connectedControls.forEach(function (s) {
s.close();
});

if (that._pub_conn !== undefined) {
that._pub_conn.close();
delete that._pub_conn;
}

if (that._control_conn !== undefined && that._control_conn._zmq.state === 0) {
that._control_conn.close();
clearInterval(that._control_conn_interval);
delete that._control_conn_interval;
}

setTimeout(function () {
debug("closed");
that._ascoltatore.close();
Expand Down
6 changes: 4 additions & 2 deletions test/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ global.redisSettings = function () {
};
};

var portCounter = 50042;
var portCounter = 9042;
global.nextPort = function () {
return ++portCounter;
};
Expand All @@ -19,7 +19,9 @@ global.zeromqSettings = function (remote_ports) {
return {
json: false,
zmq: require("zmq"),
port: "tcp://127.0.0.1:" + global.nextPort()
port: "tcp://127.0.0.1:" + global.nextPort(),
controlPort: "tcp://127.0.0.1:" + global.nextPort(),
delay: 10
};
};

Expand Down
54 changes: 49 additions & 5 deletions test/zeromq_ascoltatore_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@ describe("ascoltatori.ZeromqAscoltatore", function () {

behaveLikeAnAscoltatore();

var toClose = null;

beforeEach(function (done) {
this.instance = new ascoltatori.ZeromqAscoltatore(zeromqSettings());
this.instance.on("ready", done);
toClose = [this.instance];
});

afterEach(function (done) {
this.instance.close(done);
async.parallel(toClose.map(function (i) {
return function (cb) {
i.close(cb);
};
}), done);
});

it("should sync two instances", function (done) {
Expand All @@ -19,10 +26,9 @@ describe("ascoltatori.ZeromqAscoltatore", function () {
other.on("ready", cb);
},
function (cb) {
instance.connect(other._opts.port, cb);
},
function (cb) {
other.connect(instance._opts.port, cb);
toClose.push(other);
// we connect to the other instance control channel
other.connect(instance._opts.controlPort, cb);
},
function (cb) {
instance.subscribe("world", wrap(done), cb);
Expand All @@ -32,4 +38,42 @@ describe("ascoltatori.ZeromqAscoltatore", function () {
}
]);
});

it("should sync three instances", function (done) {
var instance = this.instance;
var other = new ascoltatori.ZeromqAscoltatore(zeromqSettings());
var other2 = new ascoltatori.ZeromqAscoltatore(zeromqSettings());

var count = 2;
var donner = function () {
if(--count === 0) {
done();
}
};

async.series([
function (cb) {
other.on("ready", cb);
},
function (cb) {
toClose.push(other);
other2.on("ready", cb);
},
function (cb) {
toClose.push(other2);
other.connect(instance._opts.controlPort);
other2.connect(instance._opts.controlPort);
setTimeout(cb, 500);
},
function (cb) {
other2.subscribe("world", donner, cb);
},
function (cb) {
other.subscribe("world", donner, cb);
},
function (cb) {
instance.publish("world", null, cb);
}
]);
});
});

0 comments on commit 4b5ba36

Please sign in to comment.