diff --git a/.gitignore b/.gitignore
index dc4673d..18418d1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,4 +14,4 @@ libtool
nanocat
perf
.libs
-Makefile
+v8
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..f79d628
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,25 @@
+.PHONY: clean check test perf bench full
+
+ALL:
+ npm i
+
+check:
+ npm t
+
+test:
+ npm t
+
+clean:
+ rm -fr build && rm -rf node_modules
+
+#perf:
+# node perf/local_lat.js tcp://127.0.0.1:5555 1 100000& node perf/remote_lat.js tcp://127.0.0.1:5555 1 100000
+# node perf/local_thr.js tcp://127.0.0.1:5556 1 100000& node perf/remote_thr.js tcp://127.0.0.1:5556 1 100000#
+
+#bench:
+# node perf/local_lat.js tcp://127.0.0.1:5555 10 1000& node perf/remote_lat.js tcp://127.0.0.1:5555 10 1000
+# node perf/local_thr.js tcp://127.0.0.1:5556 10 100000& node perf/remote_thr.js tcp://127.0.0.1:5556 10 100000
+
+full:
+ rm -fr build && rm -rf node_modules
+ npm i && npm t
diff --git a/README.md b/README.md
index a97ef8f..952f2cf 100644
--- a/README.md
+++ b/README.md
@@ -2,13 +2,13 @@
[![Build status](https://ci.appveyor.com/api/projects/status/07j7o9juuktas2uk)](https://ci.appveyor.com/project/tcr/node-nanomsg) [![Build Status](https://travis-ci.org/tcr/node-nanomsg.svg)](https://travis-ci.org/tcr/node-nanomsg)
-Install:
+### install:
```
npm install nanomsg
```
-check it out:
+### check it out:
```js
var nano = require('nanomsg');
@@ -31,12 +31,213 @@ setTimeout(function () {
}, 100);
```
-## contributing
+# API
-Issues and pull requests welcome!
+### nano.socket(type, [options,])
+
+Starts a new socket. The nanomsg socket can bind or connect to multiple heterogeneous endpoints as well as shutdown any of these established links.
+
+#### `options`
+* `'raw'` *(Boolean, default: `false`)*: determines the domain of the socket. `AF_SP`, the default, creates a standard full-blown SP socket. `AF_SP_RAW` family sockets operate over internal network protocols and interfaces. Raw sockets omit the end-to-end functionality found in `AF_SP` sockets and thus can be used to implement intermediary devices in SP topologies, see [nanomsg docs](http://nanomsg.org/v0.5/nn_socket.3.html) or consult your man page entry `socket(2)` for more info.
+```js
+//ex. starting raw sockets
+nano.socket('bus', { raw: true } )
+```
+* `'tcpnodelay'` *(Boolean, default: `false`)*: see [`socket.tcpnodelay(boolean)`](https://github.com/nickdesaulniers/node-nanomsg#sockettcpnodelayboolean).
+* `'linger'` *(Number, default: `1000`)*: see [`socket.linger(duration)`](https://github.com/nickdesaulniers/node-nanomsg#socketlingerduration).
+* `'sndbuf'` *(Number, size in bytes, default: `128kB`)*: see [`socket.sndbuf(size)`](https://github.com/nickdesaulniers/node-nanomsg#socketsndbufsize).
+* `'rcvbuf'` *(Number, size in bytes, default: `128kB`)*: see [`socket.rcvbuf(size)`](https://github.com/nickdesaulniers/node-nanomsg#socketrcvbufsize).
+* `'sndtimeo'` *(Number, default: `-1`)*: see [`socket.sndtimeo(duration)`](https://github.com/nickdesaulniers/node-nanomsg#socketsndtimeoduration).
+* `'rcvtimeo'` *(Number, default: `-1`)*: see [`socket.rcvtimeo(duration)`](https://github.com/nickdesaulniers/node-nanomsg#socketrcvtimeoduration).
+* `'reconn'` *(Number, default: `100`)*: see [`socket.reconn(duration)`](https://github.com/nickdesaulniers/node-nanomsg#socketreconnduration).
+* `'maxreconn'` *(Number, default: `0`)*: see [`socket.maxreconn(duration)`](https://github.com/nickdesaulniers/node-nanomsg#socketmaxreconnduration).
+* `'sndprio'` *(Number, default: `0`)*: see [`socket.sndprio(priority)`](https://github.com/nickdesaulniers/node-nanomsg#socketsndpriopriority).
+* `'rcvprio'` *(Number, default: `0`)*: see [`socket.rcvprio(priority)`](https://github.com/nickdesaulniers/node-nanomsg#socketrcvpriopriority).
+
+### socket.shutdown(address)
+
+*(Function, param: String)*: Removes an endpoint established by calls to `bind()` or `connect()`. The nanomsg library will try to deliver any outstanding outbound messages to the endpoint for the time specified by `linger`.
+
+```js
+socket.shutdown('tcp://127.0.0.1:5555')
+```
+
+### socket.bind(address)
+
+*(Function, param: String)*: Adds a local endpoint to the socket. The endpoint can be then used by other applications to connect.
+
+`bind()` (or `connect()`) may be called multiple times on the same socket thus allowing the socket to communicate with multiple heterogeneous endpoints.
+
+```js
+socket.bind('tcp://eth0:5555')
+```
+
+*recommend checking your machine's `ifconfig` first before using a named interface. `ipconfig` on windows.*
+
+### socket.connect(address)
+
+*(Function, param: String)*: Adds a remote endpoint to the socket. The nanomsg library would then try to connect to the specified remote endpoint.
+
+`connect()` (as well as `bind()`) may be called multiple times on the same socket thus allowing the socket to communicate with multiple heterogeneous endpoints.
+
+```js
+socket.connect('tcp://127.0.0.1:5555')
+```
+
+*When connecting over remote TCP allow `100ms` or more depending on round trip time for the operation to complete.*
+
+##### *[a note on address strings](docs/address_strings.markdown)*
+
+### socket.close()
+
+*(Function, param: Function)*: Closes the socket. Any buffered inbound messages that were not yet received by the application will be discarded. The nanomsg library will try to deliver any outstanding outbound messages for the time specified by `linger`.
+
+### socket.tcpnodelay(boolean)
+
+*(Function, param: Boolean, default: false)*: When set, disables Nagle’s algorithm. It also disables delaying of TCP acknowledgments. Using this option improves latency at the expense of throughput.
+
+Pass no parameter for current tcp nodelay setting.
+
+```js
+//default
+console.log(socket.tcpnodelay()) //tcp nodelay: off
+
+socket.tcpnodelay(true) //disabling Nagle's algorithm
+
+console.log(socket.tcpnodelay()) //tcp nodelay: on
+```
+
+### socket.linger(duration)
+
+*(Function, param: Number, default: `1000`)*: Specifies how long the socket should try to send pending outbound messages after `socket.close()` or `socket.shutdown()` is called, in milliseconds.
+
+Pass no parameter for the linger duration.
+
+```js
+socket.linger(5000)
+console.log(socket.linger()) //5000
+```
+
+### socket.sndbuf(size)
+
+*(Function, param: Number, size in bytes, default: `128kB`)*: Size of the send buffer, in bytes. To prevent blocking for messages larger than the buffer, exactly one message may be buffered in addition to the data in the send buffer.
+
+Pass no parameter for the socket's send buffer size.
+
+```js
+socket.sndbuf(131072)
+console.log(socket.sndbuf()) // 131072
+```
+
+### socket.rcvbuf(size)
+
+*(Function, param: Number, size in bytes, default: `128kB`)*: Size of the receive buffer, in bytes. To prevent blocking for messages larger than the buffer, exactly one message may be buffered in addition to the data in the receive buffer.
+
+Pass no parameter for the socket's receive buffer size.
+
+```js
+socket.rcvbuf(20480)
+console.log(socket.rcvbuf()) // 20480
+```
+
+### socket.sndtimeo(duration)
+
+*(Function, param: Number, default: `-1`)*: The timeout for send operation on the socket, in milliseconds.
+
+Pass no parameter for the socket's send timeout.
+
+```js
+socket.sndtimeo(200)
+console.log(socket.sndtimeo()) // 200
+```
+
+### socket.rcvtimeo(duration)
+
+*(Function, param: Number, default: `-1`)*: The timeout for recv operation on the socket, in milliseconds.
+
+Pass no parameter for the socket's recv timeout.
+
+```js
+socket.rcvtimeo(50)
+console.log(socket.rcvtimeo()) // 50
+```
+
+### socket.reconn(duration)
+
+*(Function, param: Number, default: `100`)*: For connection-based transports such as TCP, this option specifies how long to wait, in milliseconds, when connection is broken before trying to re-establish it. Note that actual reconnect interval may be randomized to some extent to prevent severe reconnection storms.
+
+Pass no parameter for the socket's `reconnect` interval.
+
+```js
+socket.reconn(600)
+console.log(socket.reconn()) // 600
+```
+
+### socket.maxreconn(duration)
+
+*(Function, param: Number, default: `0`)*: Only to be used in addition to `socket.reconn()`. `maxreconn()` specifies maximum reconnection interval. On each reconnect attempt, the previous interval is doubled until `maxreconn` is reached. Value of zero means that no exponential backoff is performed and reconnect interval is based only on `reconn`. If `maxreconn` is less than `reconn`, it is ignored.
+
+Pass no parameter for the socket's `maxreconn` interval.
+
+```js
+socket.maxreconn(60000)
+console.log(socket.maxreconn()) // 60000
+```
+
+### socket.sndprio(priority)
+
+*(Function, param: Number, default: `8`)*: Sets outbound priority for endpoints subsequently added to the socket.
+
+This option has no effect on socket types that send messages to all the peers. However, if the socket type sends each message to a single peer (or a limited set of peers), peers with high priority take precedence over peers with low priority.
+
+Highest priority is 1, lowest is 16. Pass no parameter for the socket's current outbound priority.
+
+```js
+socket.sndprio(2)
+console.log(socket.sndprio()) // 2
+```
+
+### socket.rcvprio(priority)
+
+*(Function, param: Number, default: `8`)*: Sets inbound priority for endpoints subsequently added to the socket.
+
+This option has no effect on socket types that are not able to receive messages.
+
+When receiving a message, messages from peer with higher priority are received before messages from peer with lower priority.
+
+Highest priority is 1, lowest is 16. Pass no parameter for the socket's current inbound priority.
+
+```js
+socket.rcvprio(10)
+console.log(socket.rcvprio()) // 10
+```
+
+# test
+
+```bash
+$ git clone https://github.com/nickdesaulniers/node-nanomsg.git nano
+$ cd nano && git submodule update --init
+
+# now you can build the project and run the test suite:
+$ make && make check
+
+# or perhaps you'd prefer to use the npm commands instead:
+$ npm i
+$ npm t
+
+# let's say you switch to another version of node/iojs, you might want to run:
+$ make clean && make && make check
+
+# for the super deluxe make clean, rebuild, and test suite:
+$ make full
+```
Note: you must `git submodule update --init` to initialize the nanomsg repository.
-# license
+## contributing
+
+Issues and pull requests welcome!
+
+## license
MIT
diff --git a/docs/address_strings.markdown b/docs/address_strings.markdown
new file mode 100644
index 0000000..d02867b
--- /dev/null
+++ b/docs/address_strings.markdown
@@ -0,0 +1,8 @@
+### socket addresses
+
+*(Strings)*
+
+Socket address strings consist of two parts as follows: `transport://address`. The transport specifies the underlying transport protocol to use. The meaning of the address part is specific to the underlying transport protocol.
+* *TCP transport mechanism*: `'tcp://127.0.0.1:65000'` When binding a TCP socket, address of the form `tcp://interface:port` should be used. Port is the TCP port number to use. Interface is either: `IPv4` or `IPv6` address of a local network interface, or DNS name of the remote box. It is possible to use named interfaces like `eth0`. For more info see [nanomsg docs](http://nanomsg.org/v0.5/nn_tcp.7.html).
+* *in-process transport mechanism*: `'inproc://bar'` The `inproc` transport allows messages between threads or modules inside a process. In-process address is an arbitrary case-sensitive string preceded by `inproc://` protocol specifier. All in-process addresses are visible from any module within the process. They are not visible from outside of the process. The overall buffer size for an inproc connection is determined by `rcvbuf` socket option on the receiving end of the connection. `sndbuf` is ignored. In addition to the buffer, one message of arbitrary size will fit into the buffer. That way, even messages larger than the buffer can be transfered via inproc connection.
+* *inter-process transport mechanism*: `'ipc:///tmp/foo.ipc'` The `ipc` transport allows for sending messages between processes within a single box. The nanomsg implementation uses native IPC mechanism provided by the local operating system and the IPC addresses are thus OS-specific. On POSIX-compliant systems, UNIX domain sockets are used and IPC addresses are file references. Note that both relative (`ipc://test.ipc`) and absolute (`ipc:///tmp/test.ipc`) paths may be used. Also note that access rights on the IPC files must be set in such a way that the appropriate applications can actually use them. On Windows, named pipes are used for IPC. The Windows IPC address is an arbitrary case-insensitive string containing any character except for backslash: internally, address `ipc://test` means that named pipe `\\.\pipe\test` will be used.
diff --git a/lib/index.js b/lib/index.js
index 73c9948..a1e8e7b 100644
--- a/lib/index.js
+++ b/lib/index.js
@@ -3,24 +3,39 @@ var nn = require('bindings')('node_nanomsg.node');
var util = require('util');
var EventEmitter = require('events').EventEmitter;
+/**
+ * generic socket-level NN_SOL_SOCKET options
+ */
+var sol = {
+ linger : nn.NN_LINGER,
+ sndbuf : nn.NN_SNDBUF,
+ rcvbuf : nn.NN_RCVBUF,
+ sndtimeo : nn.NN_SNDTIMEO,
+ rcvtimeo : nn.NN_RCVTIMEO,
+ reconn : nn.NN_RECONNECT_IVL,
+ maxreconn : nn.NN_RECONNECT_IVL_MAX,
+ sndprio : nn.NN_SNDPRIO,
+
+ // TODO: Issue #50
+ rcvprio : 9,
+ tcpnodelay : nn.NN_TCP_NODELAY,
+}
/**
* Socket implementation
*/
-function Socket (domain, type) {
- // DO NOT attempt to rename to this.domain, unless you like EventEmitter pain!
- this.af_domain = domain;
- this.type = type;
- if((domain != nn.AF_SP) && (domain != nn.AF_SP_RAW)) {
- throw new Error('unrecognised socket domain');
- }
+function Socket (type, opts) {
+
+ opts = opts || {};
+ this.af_domain = opts.raw ? nn.AF_SP_RAW : nn.AF_SP;
+ this.type = type;
switch(type) {
case 'req':
this.protocol = nn.NN_REQ;
this.sender=true;
- this.receiver=true;
+ this.receiver=true;
break;
case 'rep':
@@ -85,6 +100,10 @@ function Socket (domain, type) {
this.binding = nn.Socket(this.af_domain, this.protocol);
this.queue = [];
+ for(var sokopt in sol){
+ if(opts.hasOwnProperty(sokopt)) this[sokopt](opts[sokopt]);
+ }
+
if(this.af_domain == nn.AF_SP) {
if (this.receiver) this._startPollReceive();
}
@@ -95,7 +114,7 @@ util.inherits(Socket, EventEmitter);
Socket.prototype._protect = function (ret, unwind) {
if(ret < 0) {
if (unwind) unwind.call(this);
- this.emit('error', new Error(nn.Strerr(nn.Errno())));
+ this.emit('error', new Error(nn.Err()));
return null;
}
return ret;
@@ -108,7 +127,7 @@ Socket.prototype._protect = function (ret, unwind) {
Socket.prototype._protectArray = function (ret, unwind) {
if(ret[0] < 0) {
if (unwind) unwind.call(this);
- this.emit('error', new Error(nn.Strerr(nn.Errno())));
+ this.emit('error', new Error(nn.Err()));
return null;
}
return ret[1];
@@ -142,7 +161,7 @@ Socket.prototype._receive = function () {
}
}
if (msg == -1) return;
-
+
if (this.restore && typeof this.restore === 'function') msg = this.restore(msg);
this.emit('message', msg);
};
@@ -274,7 +293,7 @@ function Device (sock1,sock2) {
this._timer = setImmediate(function () {
nn.DeviceWorker(that.s1, that.s2, function (err) {
- that.emit('error', new Error(nn.Strerr(err)));
+ that.emit('error', new Error('lib err: '+ err +'\n'+ nn.Err()));
});
});
@@ -285,12 +304,59 @@ function Device (sock1,sock2) {
util.inherits(Device, EventEmitter);
+/**
+ * sockopt API
+ */
+Socket.prototype.linger = opt('linger');
+Socket.prototype.sndbuf = opt('sndbuf');
+Socket.prototype.rcvbuf = opt('rcvbuf');
+Socket.prototype.sndtimeo = opt('sndtimeo');
+Socket.prototype.rcvtimeo = opt('rcvtimeo');
+Socket.prototype.reconn = opt('reconn');
+Socket.prototype.maxreconn = opt('maxreconn');
+Socket.prototype.sndprio = opt('sndprio');
+Socket.prototype.rcvprio = opt('rcvprio');
+
+/* tcpnodelay sockopt method. this one is a little different */
+Socket.prototype.tcpnodelay = function (bool) {
+ if(arguments.length){
+ if(bool){
+ if(nn.Setopt(this.binding, nn.NN_TCP, nn.NN_TCP_NODELAY, 1) > -1)
+ return true;
+ throw new Error(nn.Err() + ': '+this.type + ' nodelay@'+'activing'+'\n');
+ } else {
+ if(nn.Setopt(this.binding, nn.NN_TCP, nn.NN_TCP_NODELAY, 0) > -1)
+ return false;
+ throw new Error(nn.Err() + ': '+this.type+' nodelay@'+'deactiving'+'\n');
+ }
+ } else {
+ switch(nn.Getopt(this.binding, nn.NN_TCP, nn.NN_TCP_NODELAY)){
+ case 1: return true;
+ case 0: return false;
+ default:
+ throw new Error(nn.Err() +': '+this.type+' nodelay@'+'getsockopt'+'\n');
+ }
+ }
+}
+
+/* sockopt API workhorse */
+function opt (option) {
+ return function (value) {
+ if (value === undefined)
+ return nn.Getopt(this.binding, nn.NN_SOL_SOCKET, sol[option]);
+
+ if(nn.Setopt(this.binding, nn.NN_SOL_SOCKET, sol[option], value) > -1)
+ return true;
+
+ throw new Error(nn.Err() + ': ' + this.type + option + '@' + value + '\n');
+ }
+};
+
/**
* module API
*/
function createSocket (type, opts) {
- var domain = (opts || {}).raw ? nn.AF_SP_RAW : nn.AF_SP;
- return new Socket(domain, type);
+ return new Socket(type, opts);
}
function symbolInfo (symbol) {
diff --git a/package.json b/package.json
index 4e5b6a1..92f7e36 100644
--- a/package.json
+++ b/package.json
@@ -1,11 +1,11 @@
{
"name": "nanomsg",
- "version": "0.3.5",
+ "version": "0.4.0",
"description": "Node bindings for nanomsg",
"main": "lib/index.js",
"dependencies": {
"bindings": "1.1.1",
- "nan": "~1.5.0"
+ "nan": "~1.6.2"
},
"devDependencies": {
"mocha": "~1.18.2",
diff --git a/src/node_nanomsg.cc b/src/node_nanomsg.cc
index b188595..28af59f 100644
--- a/src/node_nanomsg.cc
+++ b/src/node_nanomsg.cc
@@ -23,6 +23,11 @@ using v8::Object;
using v8::String;
using v8::Value;
+#define ret NanReturnValue
+#define utf8 v8::String::Utf8Value
+#define integer As()->IntegerValue()
+#define S args[0].integer
+
NAN_METHOD(Socket) {
NanScope();
@@ -125,6 +130,41 @@ NAN_METHOD(Getsockopt) {
NanReturnValue(obj);
}
+NAN_METHOD(Setopt) {
+ NanScope();
+
+ int level = args[1].integer;
+ int option = args[2].integer;
+
+ if(option == NN_SOCKET_NAME){
+ utf8 str(args[3]);
+ ret(NanNew(nn_setsockopt(S, level, option, *str, str.length())));
+ } else {
+ int optval = args[3].integer;
+ ret(NanNew(nn_setsockopt(S, level, option, &optval, sizeof(optval))));
+ }
+}
+
+NAN_METHOD(Getopt) {
+ NanScope();
+
+ int optval[1];
+ int option = args[2].integer;
+ size_t optsize = sizeof(optval);
+
+ //check if the function succeeds
+ if(nn_getsockopt(S, args[1].integer, option, optval, &optsize) == 0){
+
+ if(option == NN_SOCKET_NAME) ret(NanNew((char *)optval));
+
+ ret(NanNew(optval[0]));
+
+ } else {
+ //pass the error back as an undefined return
+ NanReturnUndefined();
+ }
+}
+
NAN_METHOD(Bind) {
NanScope();
@@ -195,13 +235,21 @@ NAN_METHOD(Recv) {
char *buf = NULL;
int len = nn_recv(s, &buf, NN_MSG, flags);
- v8::Local h = NanNewBufferHandle(len);
- memcpy(node::Buffer::Data(h), buf, len);
+ if(len > -1) {
+
+ v8::Local h = NanNewBufferHandle(len);
+ memcpy(node::Buffer::Data(h), buf, len);
- //dont memory leak
- nn_freemsg (buf);
+ //dont memory leak
+ nn_freemsg (buf);
- NanReturnValue(h);
+ ret(h);
+
+ } else {
+
+ ret(NanNew(len));
+
+ }
}
NAN_METHOD(SymbolInfo) {
@@ -277,18 +325,7 @@ NAN_METHOD(Errno) {
NanReturnValue(NanNew(ret));
}
-
-NAN_METHOD(Strerr) {
- NanScope();
-
- int errnum = args[0]->Uint32Value();
-
- // Invoke nanomsg function.
- const char* err = nn_strerror(errnum);
-
- NanReturnValue(NanNew(err));
-}
-
+NAN_METHOD(Err){ NanScope(); ret(NanNew(nn_strerror(nn_errno()))); }
typedef struct nanomsg_socket_s {
uv_poll_t poll_handle;
@@ -426,7 +463,6 @@ void InitAll(Handle