Skip to content

Commit

Permalink
pubsub: start work on the channel api
Browse files Browse the repository at this point in the history
fixes #31
  • Loading branch information
reqshark committed Mar 16, 2015
1 parent bdb36f2 commit fd950d6
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
21 changes: 21 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ function Socket (type, opts) {

this.binding = nn.Socket(this.af_domain, this.protocol);
this.queue = [];
this.channels = {};

for(var sokopt in sol){
if(opts.hasOwnProperty(sokopt)) this[sokopt](opts[sokopt]);
Expand Down Expand Up @@ -315,6 +316,26 @@ Socket.prototype.maxreconn = opt('maxreconn');
Socket.prototype.sndprio = opt('sndprio');
Socket.prototype.rcvprio = opt('rcvprio');

/* chan and rmchan sockopt methods. only relevant for subscription sockets */
Socket.prototype.chan = function(){
var args = arguments;
var channels = args.length;
while(--channels > -1) {
if(nn.Setopt(this.binding, nn.NN_SUB, nn.NN_SUB_SUBSCRIBE, args[channels]) == 0)
this.channels[args[channels]] = 'active';
};
}

Socket.prototype.rmchan = function(channel){
var args = arguments;
var channels = args.length;
while(--channels > -1) {
if(nn.Setopt(this.binding, nn.NN_SUB, nn.NN_SUB_UNSUBSCRIBE, args[channels]) == 0)
this.channels[args[channels]] = 'active';
};
}


/* tcpnodelay sockopt method. this one is a little different */
Socket.prototype.tcpnodelay = function (bool) {
if(arguments.length){
Expand Down
12 changes: 6 additions & 6 deletions src/node_nanomsg.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ NAN_METHOD(Socket) {
// Invoke nanomsg function.
int ret = nn_socket(domain, protocol);

if( (ret >= 0) && (protocol == NN_SUB)) {
if (nn_setsockopt(ret, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) != 0) {
return NanThrowError("Could not set subscribe option.");
}
}
//if( (ret >= 0) && (protocol == NN_SUB)) {
//if (nn_setsockopt(ret, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) != 0) {
//return NanThrowError("Could not set subscribe option.");
//}
//}

NanReturnValue(NanNew<Number>(ret));
}
Expand Down Expand Up @@ -136,7 +136,7 @@ NAN_METHOD(Setopt) {
int level = args[1].integer;
int option = args[2].integer;

if(option == NN_SOCKET_NAME){
if(option == NN_SOCKET_NAME || option == NN_SUB_SUBSCRIBE){
utf8 str(args[3]);
ret(NanNew<Number>(nn_setsockopt(S, level, option, *str, str.length())));
} else {
Expand Down

0 comments on commit fd950d6

Please sign in to comment.