From f5f7efd4a4dff31e192066b5b66b4822e5d16d65 Mon Sep 17 00:00:00 2001 From: Bent Cardan Date: Mon, 16 Mar 2015 09:47:57 -0400 Subject: [PATCH] pubsub: the channel api Fixes: #31 PR: #65 --- lib/index.js | 63 +++++++++-- src/node_nanomsg.cc | 101 +++--------------- test/chan.js | 247 ++++++++++++++++++++++++++++++++++++++++++++ test/getsockopt.js | 107 ------------------- test/setsockopt.js | 135 ------------------------ 5 files changed, 317 insertions(+), 336 deletions(-) create mode 100644 test/chan.js delete mode 100644 test/getsockopt.js delete mode 100644 test/setsockopt.js diff --git a/lib/index.js b/lib/index.js index 40c9e69..fe473ec 100644 --- a/lib/index.js +++ b/lib/index.js @@ -96,16 +96,32 @@ function Socket (type, opts) { break; } - this.binding = nn.Socket(this.af_domain, this.protocol); - this.queue = []; + this.binding = nn.Socket(this.af_domain, this.protocol); + this.queue = []; - for(var sokopt in sol){ - if(opts.hasOwnProperty(sokopt)) this[sokopt](opts[sokopt]); - } + /* subscription filter control */ + this.channels = {}; - if(this.af_domain == nn.AF_SP) { - if (this.receiver) this._startPollReceive(); + /* subscription handling at initialization */ + if (opts.hasOwnProperty('chan')) { + if (Array.isArray(opts.chan)) { + opts.chan.forEach(this._register.bind(this)); + } else { + throw new TypeError('chan requires an Array'); } + } else if (type === 'sub') { + this._register(''); //default topic is an empty string + } + + /* sockopt api handling at initialization */ + for(var sokopt in sol){ + if(opts.hasOwnProperty(sokopt)) this[sokopt](opts[sokopt]); + } + + /* start listening for inbound messages */ + if(this.af_domain === nn.AF_SP) { + if (this.receiver) this._startPollReceive(); + } } util.inherits(Socket, EventEmitter); @@ -192,6 +208,17 @@ Socket.prototype._stopPollReceive = function () { this._pollReceive = null; } +Socket.prototype._register = function(chan){ + if (this.channels.hasOwnProperty('')) { + this.rmchan(''); + this._register(chan); + } else if (nn.Chan(this.binding, nn.NN_SUB_SUBSCRIBE, chan) !== -1) { + this.channels[chan] = true; + } else { + this.emit('error', new Error( nn.Err() + ' : ' + chan)); + } +}; + /** * Socket API */ @@ -373,6 +400,28 @@ function opt (option) { } }; +/* chan and rmchan sockopt methods. only relevant for subscription sockets */ +Socket.prototype.chan = function (list) { + if (Array.isArray(list)) { + list.forEach(this._register.bind(this)); + } else { + throw new TypeError('chan requires an Array'); + } +} + +Socket.prototype.rmchan = function() { + var i = arguments.length; + while(i--) { + if (this.channels[arguments[i]]) { + if (nn.Chan(this.binding, nn.NN_SUB_UNSUBSCRIBE, arguments[i]) > -1) { + return delete this.channels[arguments[i]]; + } else { + this.emit('error', new Error( nn.Err() + ' : ' + chan)); + } + } + }; +} + /** * module API */ diff --git a/src/node_nanomsg.cc b/src/node_nanomsg.cc index 6e4fe3c..710f505 100644 --- a/src/node_nanomsg.cc +++ b/src/node_nanomsg.cc @@ -37,12 +37,6 @@ NAN_METHOD(Socket) { // Invoke nanomsg function. int ret = nn_socket(domain, protocol); - if ((ret >= 0) && (protocol == NN_SUB)) { - if (nn_setsockopt(ret, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) != 0) { - return NanThrowError("Could not set subscribe option."); - } - } - NanReturnValue(NanNew(ret)); } @@ -57,86 +51,14 @@ NAN_METHOD(Close) { NanReturnValue(NanNew(ret)); } -NAN_METHOD(Setsockopt) { - NanScope(); - - int s = args[0]->Uint32Value(); - int level = args[1]->Uint32Value(); - int option = args[2]->Uint32Value(); - int ret; - - switch (option) { - /* string setters */ - case NN_SOCKET_NAME: { - String::Utf8Value str(args[3]); - - // Invoke nanomsg function. - ret = nn_setsockopt(s, level, option, *str, str.length()); - } break; - - /* int setters */ - default: { - int optval = args[3]->Uint32Value(); - - // Invoke nanomsg function. - ret = nn_setsockopt(s, level, option, &optval, sizeof(optval)); - } break; - } - - NanReturnValue(NanNew(ret)); -} - -// returns an array n where: -// n[0] is the return code (0 good, negative bad) -// n[1] is an int or string representing the option's value -NAN_METHOD(Getsockopt) { - NanScope(); - - int s = args[0]->Uint32Value(); - int level = args[1]->Uint32Value(); - int option = args[2]->Uint32Value(); - // int optval = args[3]->Uint32Value(); - int optval[64]; - - // Invoke nanomsg function. - size_t optsize = sizeof(optval); - int ret = nn_getsockopt(s, level, option, optval, &optsize); - - Local obj = NanNew(2); - obj->Set(0, NanNew(ret)); - - if (ret == 0) { - switch (option) { - /* string return values */ - case NN_SOCKET_NAME: - obj->Set(1, NanNew((char *)optval)); - break; - - /* int return values */ - default: - obj->Set(1, NanNew(optval[0])); - break; - } - } - - // otherwise pass the error back - NanReturnValue(obj); -} - NAN_METHOD(Setopt) { NanScope(); int level = args[1].integer; int option = args[2].integer; + int optval = args[3].integer; - if (option == NN_SOCKET_NAME) { - utf8 str(args[3]); - ret(NanNew(nn_setsockopt(S, level, option, *str, str.length()))); - } else { - int optval = args[3].integer; - ret(NanNew( - nn_setsockopt(S, level, option, &optval, sizeof(optval)))); - } + ret(NanNew(nn_setsockopt(S, level, option, &optval, sizeof(optval)))); } NAN_METHOD(Getopt) { @@ -148,18 +70,24 @@ NAN_METHOD(Getopt) { // check if the function succeeds if (nn_getsockopt(S, args[1].integer, option, optval, &optsize) == 0) { - - if (option == NN_SOCKET_NAME) - ret(NanNew((char *)optval)); - ret(NanNew(optval[0])); - } else { // pass the error back as an undefined return NanReturnUndefined(); } } +NAN_METHOD(Chan) { + NanScope(); + + int level = NN_SUB; + int option = args[1].integer; + + utf8 str(args[2]); + + ret(NanNew(nn_setsockopt(S, level, option, *str, str.length()))); +} + NAN_METHOD(Bind) { NanScope(); @@ -455,8 +383,7 @@ void InitAll(Handle exports) { // Export functions. EXPORT_METHOD(exports, Socket); EXPORT_METHOD(exports, Close); - EXPORT_METHOD(exports, Setsockopt); - EXPORT_METHOD(exports, Getsockopt); + EXPORT_METHOD(exports, Chan); EXPORT_METHOD(exports, Bind); EXPORT_METHOD(exports, Connect); EXPORT_METHOD(exports, Shutdown); diff --git a/test/chan.js b/test/chan.js new file mode 100644 index 0000000..323058d --- /dev/null +++ b/test/chan.js @@ -0,0 +1,247 @@ +var nano = require('..'); +var test = require('tape'); + +test('adding and removing subscription channels', function (t) { + + t.plan(7); + + var pub = nano.socket('pub'); + + //register some channels: hello, foo, bar + var sub = nano.socket('sub', { + chan: ['hello', 'foo', 'bar'] + }); + + pub.bind('inproc://filter'); + sub.connect('inproc://filter'); + + //the main array we'll use to test channel registration and removal + var msgs = ['hello world','hello world','hello world','bar world','foo world']; + var msgqty = -1; //starting here so first msg counted can be zero + var sent = 0; + + sub.on('message', function (buf) { + + var msg = String(buf); + + //increment msgqty when a msg is received (happening about every 100ms) + //we'll call finish() on the 5th msg received + switch(++msgqty){ + case 0: return t.equal(msg, msgs[msgqty]); //msgs[0], 'hello world' + case 1: return t.equal(msg, msgs[msgqty]); //msgs[1], 'hello world' + case 2: return t.equal(msg, msgs[msgqty]); //msgs[2], 'hello world' + case 3: return t.equal(msg, msgs[msgqty]); //msgs[3], 'bar world' + case 4: return finish (msg); + } + + }); + + setTimeout(send, 0); //send msgs[0], 'hello world' + setTimeout(send, 100); //send msgs[1], 'hello world' + setTimeout(send, 200); //send msgs[2], 'hello world' + setTimeout(removeHello, 300); //send msgs[3], 'bar world' and remove hello + setTimeout(send, 400); //send msgs[4], 'foo world' + + function send(){ + pub.send(msgs[sent++]); //lazy incrementing + } + + function removeHello(){ + + //stop listening for msg prefix: hello + sub.rmchan('hello'); + + //publish about 10 extra hello worlds to see if we can increment msgqty + var i = 0; + while(i++ < 10) pub.send('hello world'); + + send(); // send something with a registered prefix: 'bar world' + } + + function finish(msg){ + t.equal(msg, msgs[msgqty]); //'foo world' + t.equal(msg, msgs[4]); //msgqty count + t.equal(msg, 'foo world'); //prove case 4, the fifth msg + sub.close(); + pub.close(); + } +}); + +test('without explicitly registering any channels, socket recvs 2+ msgs of different prefixes', function(t){ + t.plan(2); + + var pub = nano.socket('pub'); + var sub = nano.socket('sub'); + + var msgs = 0; + var msg1 = 'foo world'; + var msg2 = 'bar world'; + var addr = 'inproc://default'; + + pub.bind(addr); + sub.connect(addr); + + sub.on('message', function (buf) { + + var msg = String(buf); + + if(++msgs < 2){ + t.equal(msg, 'foo world'); + } else { + t.equal(msg, 'bar world'); + sub.close(); + pub.close(); + } + }); + + pub.send(msg1); + + setTimeout(function () { + pub.send(msg2); + }, 100); + +}); + +test('registration after socket creation: chan receives correctly prefixed messages but not others', function(t){ + t.plan(2); + + var pub = nano.socket('pub'); + var sub = nano.socket('sub'); + + var msgs = 0; + var msg1 = 'foo world'; + var msg2 = 'bar world'; + var msg3 = 'hi world'; + var msg4 = 'hello world'; + var addr = 'inproc://prefixed'; + + pub.bind(addr); + sub.connect(addr); + + sub.chan(['foo','hello']); + + sub.on('message', function (buf) { + + var msg = String(buf); + + if(++msgs < 2){ + t.equal(msg, 'foo world'); + } else { + t.equal(msg, 'hello world'); + sub.close(); + pub.close(); + } + }); + + pub.send(msg1); + pub.send(msg2); + pub.send(msg3); + process.nextTick(function(){ + pub.send(msg4); + }) +}); + +test('channels registered by constructor get appropriately prefixed messages but not others', function(t){ + t.plan(2); + + var pub = nano.socket('pub'); + var sub = nano.socket('sub',{ + chan: ['foo','hello'] + }); + + var msgs = 0; + var msg1 = 'foo world'; + var msg2 = 'bar world'; + var msg3 = 'hi world'; + var msg4 = 'hello world'; + var addr = 'inproc://prefixed'; + + pub.bind(addr); + sub.connect(addr); + + sub.on('message', function (buf) { + + var msg = String(buf); + + if(++msgs < 2){ + t.equal(msg, 'foo world'); + } else { + t.equal(msg, 'hello world'); + sub.close(); + pub.close(); + } + }); + + pub.send(msg1); + pub.send(msg2); + pub.send(msg3); + process.nextTick(function(){ + pub.send(msg4); + }) +}); + +test('multi-topic registration followed by calls to rmchan on all but one stops all channels no longer registerd', function(t){ + t.plan(5); + + var pub = nano.socket('pub'); + var sub = nano.socket('sub',{ + chan: ['foo','bar','hi','hello'] + }); + + var msgs = 0; + var msg1 = 'foo world'; + var msg2 = 'bar world'; + var msg3 = 'hi world'; + var msg4 = 'hello world'; + var addr = 'inproc://prefixed'; + + pub.bind(addr); + sub.connect(addr); + + sub.on('message', function (buf) { + + var msg = String(buf); + + if(++msgs === 1) { + t.equal(msg, 'foo world'); + sub.rmchan('foo') + } else if(msgs === 2) { + t.equal(msg, 'bar world'); + sub.rmchan('bar'); + } else if (msgs === 3) { + t.equal(msg, 'hi world'); + sub.rmchan('hi'); + } else if (msgs === 4) { + t.equal(msg, 'hello world'); + } else if (msgs === 5) { + t.equal(msg, 'hello world'); + sub.close(); + pub.close(); + } + }); + + pub.send(msg1); // 1st msg + process.nextTick(function(){ + pub.send(msg2); // 2nd msg + pub.send(msg1); + process.nextTick(function(){ + pub.send(msg3); // 3rd msg + pub.send(msg2); + pub.send(msg1); + process.nextTick(function(){ + pub.send(msg4); // 4th msg + pub.send(msg3); + pub.send(msg2); + pub.send(msg1); + process.nextTick(function(){ + pub.send(msg3); + pub.send(msg2); + pub.send(msg1); + process.nextTick(function(){ + pub.send(msg4); //5th msg recv'd ends test + }); + }); + }); + }); + }); +}); diff --git a/test/getsockopt.js b/test/getsockopt.js deleted file mode 100644 index 7cbbc89..0000000 --- a/test/getsockopt.js +++ /dev/null @@ -1,107 +0,0 @@ -//https://github.com/chuckremes/nn-core/blob/master/spec/nn_getsockopt_spec.rb - -var nano = require('../'); -var nn = nano._bindings; -var test = require('tape'); - -test('NN_LINGER returns a default value of 1000', function (t) { - t.plan(1); - - var sock = nano.socket('pub'); - var rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_LINGER); - t.equal(rc, 1000); - sock.close(); -}); - -test('NN_SNDBUF returns a default value of 128KB', function (t) { - t.plan(1); - - var sock = nano.socket('pub'); - var rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_SNDBUF); - t.equal(rc, 131072); - sock.close(); -}); - -test('NN_RCVBUF returns a default value of 128KB', function (t) { - t.plan(1); - - var sock = nano.socket('pub'); - var rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_RCVBUF); - t.equal(rc, 131072); - sock.close(); -}); - -test('NN_SNDTIMEO returns a default value of -1', function (t) { - t.plan(1); - - var sock = nano.socket('pub'); - var rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_SNDTIMEO); - t.equal(rc, -1); - sock.close(); -}); - -test('NN_RCVTIMEO returns a default value of -1', function (t) { - t.plan(1); - - var sock = nano.socket('pub'); - var rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_RCVTIMEO); - t.equal(rc, -1); - sock.close(); -}); - -test('NN_RECONNECT_IVL returns a default value of 100', function (t) { - t.plan(1); - - var sock = nano.socket('pub'); - var rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_RECONNECT_IVL); - t.equal(rc, 100); - sock.close(); -}); - -test('NN_RECONNECT_IVL_MAX returns a default value of 0', function (t) { - t.plan(1); - - var sock = nano.socket('pub'); - var rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_RECONNECT_IVL_MAX); - t.equal(rc, 0); - sock.close(); -}); - -test('NN_SNDPRIO returns a default value of 8', function (t) { - t.plan(1); - - var sock = nano.socket('pub'); - var rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_SNDPRIO); - t.equal(rc, 8); - sock.close(); -}); - -test('getsockopt throws exception for unsupported socket level', function (t) { - t.plan(2); - - var sock = nano.socket('pub'); - - sock.on('error', function (err) { - t.ok(err, 'exception thrown for unsupported socket level'); - t.equal(nn.Errno(), nn.ENOPROTOOPT); - sock.close(); - }); - - sock.getsockopt(999999, nn.NN_SNDPRIO); -}); - -test('getsockopt throws exception for unsupported socket option', function (t) { - t.plan(2); - - var sock = nano.socket('pub'); - - sock.on('error', function (err) { - t.ok(err, 'exception thrown for unsupported socket option'); - t.equal(nn.Errno(), nn.ENOPROTOOPT); - sock.close(); - }); - - sock.getsockopt(nn.NN_SOL_SOCKET, 999999); -}); - - diff --git a/test/setsockopt.js b/test/setsockopt.js deleted file mode 100644 index 5bd07fc..0000000 --- a/test/setsockopt.js +++ /dev/null @@ -1,135 +0,0 @@ -// https://github.com/chuckremes/nn-core/blob/master/spec/nn_setsockopt_spec.rb - -var nano = require('../'); -var nn = nano._bindings; -var test = require('tape'); - -test('NN_LINGER can be set to 1500', function (t) { - t.plan(2); - - var sock = nano.socket('pub'); - var rc = sock.setsockopt(nn.NN_SOL_SOCKET, nn.NN_LINGER, 1500); - t.equal(rc, 0); - rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_LINGER); - t.equal(rc, 1500); - sock.close(); -}); - -test('NN_SNDBUF can be set to 64KB', function (t) { - t.plan(2); - - var sock = nano.socket('pub'); - var rc = sock.setsockopt(nn.NN_SOL_SOCKET, nn.NN_SNDBUF, 65536); - t.equal(rc, 0); - rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_SNDBUF); - t.equal(rc, 65536); - sock.close(); -}); - -test('NN_RCVBUF can be set to 64KB', function (t) { - t.plan(2); - - var sock = nano.socket('pub'); - var rc = sock.setsockopt(nn.NN_SOL_SOCKET, nn.NN_RCVBUF, 65536); - t.equal(rc, 0); - rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_RCVBUF); - t.equal(rc, 65536); - sock.close(); -}); - -test('NN_SNDTIMEO can be set to 1000', function (t) { - t.plan(2); - - var sock = nano.socket('pub'); - var rc = sock.setsockopt(nn.NN_SOL_SOCKET, nn.NN_SNDTIMEO, 1000); - t.equal(rc, 0); - rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_SNDTIMEO); - t.equal(rc, 1000); - sock.close(); -}); - -test('NN_RCVTIMEO can be set to 1000', function (t) { - t.plan(2); - - var sock = nano.socket('pub'); - var rc = sock.setsockopt(nn.NN_SOL_SOCKET, nn.NN_RCVTIMEO, 1000); - t.equal(rc, 0); - rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_RCVTIMEO); - t.equal(rc, 1000); - sock.close(); -}); - -test('NN_RECONNECT_IVL can be set to 1000', function (t) { - t.plan(2); - - var sock = nano.socket('pub'); - var rc = sock.setsockopt(nn.NN_SOL_SOCKET, nn.NN_RECONNECT_IVL, 1000); - t.equal(rc, 0); - rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_RECONNECT_IVL); - t.equal(rc, 1000); - sock.close(); -}); - -test('NN_RECONNECT_IVL_MAX can be set to 1000', function (t) { - t.plan(2); - - var sock = nano.socket('pub'); - var rc = sock.setsockopt(nn.NN_SOL_SOCKET, nn.NN_RECONNECT_IVL_MAX, 1000); - t.equal(rc, 0); - rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_RECONNECT_IVL_MAX); - t.equal(rc, 1000); - sock.close(); -}); - -test('NN_SNDPRIO can be set to 16', function (t) { - t.plan(2); - - var sock = nano.socket('pub'); - var rc = sock.setsockopt(nn.NN_SOL_SOCKET, nn.NN_SNDPRIO, 16); - t.equal(rc, 0); - rc = sock.getsockopt(nn.NN_SOL_SOCKET, nn.NN_SNDPRIO); - t.equal(rc, 16); - sock.close(); -}); - -test('setsockopt throws exception for unsupported send priority', function (t) { - t.plan(2); - - var sock = nano.socket('pub'); - - sock.on('error', function (err) { - t.ok(err, 'exception thrown for unsupported send priority'); - t.equal(nn.Errno(), nn.EINVAL); - sock.close(); - }); - - sock.setsockopt(nn.NN_SOL_SOCKET, nn.NN_SNDPRIO, 32); -}); - -test('setsockopt throws exception for unsupported socket level', function (t) { - t.plan(2); - - var sock = nano.socket('pub'); - - sock.on('error', function (err) { - t.ok(err, 'exception thrown for unsupported socket level'); - t.equal(nn.Errno(), nn.ENOPROTOOPT); - sock.close(); - }); - - sock.setsockopt(999999, nn.NN_SNDPRIO, 8); -}); - -test('setsockopt throws exception for unsupported socket option', function (t) { - t.plan(2); - - var sock = nano.socket('pub'); - - sock.on('error', function (err) { - t.ok(err, 'exception thrown for unsupported socket option'); - t.equal(nn.Errno(), nn.ENOPROTOOPT); - sock.close(); - }); - - sock.getsockopt(nn.NN_SOL_SOCKET, 999999, 8); -});