Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#73 Adopt Stream API #147

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,13 @@ to a single process.

#### Socket

A representation of a client. _Inherits from EventEmitter_.
A representation of a client. _Inherits from EventEmitter_. _Implements Stream_.

##### Events

- `close`
- Fired when the client is disconnected.
- `error` is fired prior to closing if the reason is other than server/transport/forced close
- **Arguments**
- `String`: reason for closing
- `Object`: description object (optional)
Expand Down Expand Up @@ -260,6 +261,29 @@ A representation of a client. _Inherits from EventEmitter_.
- `close`
- Disconnects the client
- **Returns** `Socket` for chaining
- `pipe`
- Pipes incoming data to a Writable Stream.
- **Parameters**
- `WritableStream`: destination
- `Object`: optional, does not close destination if options.end is false
- **Returns** `Socket` for chaining

##### Stream interface - see [Node.js docs](http://nodejs.org/api/stream.html#stream_event_drain)

- Readable Stream
- supported:
- events: `data`, `end`, `error`, `close`
- properties: `readable`
- methods: `destroy`
- not supported:
- methods: `setEncoding`, `pause`, `resume`
- Writable Stream
- supported:
- events: `error`, `close`
- properties: `writable`
- methods: `write`, `end`, `destroy`, `destroySoon`
- not supported:
- events: `drain`, `pipe`

### Client

Expand Down
67 changes: 67 additions & 0 deletions lib/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ function Socket (id, server, transport) {
this.server = server;
this.upgraded = false;
this.readyState = 'opening';
this.readable = true;
this.writable = true;
this.writeBuffer = [];
this.packetsFn = [];
this.sentCallbackFn = [];
this.on('error', function(){
debug('prevented error throwing');
});

this.setTransport(transport);
this.onOpen();
Expand Down Expand Up @@ -107,6 +112,26 @@ Socket.prototype.onPacket = function (packet) {
}
};


/**
* Pipes output from this Socket to given destination WritableStream.
*
* @param {WritableStream} destination
* @param {Object} optional, does not close destination if options.end is false
* @return {Socket} for chaining
* @api public
*/

Socket.prototype.pipe = function (destination, options) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use the real Stream#pipe function from node? - a lot of development work went into getting it right.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (destination.writable) {
this.on('data', destination.write.bind(destination));
if (!options || options.end !== false) {
this.on('end', destination.end.bind(destination));
}
}
return this;
}

/**
* Called upon transport error.
*
Expand Down Expand Up @@ -226,11 +251,19 @@ Socket.prototype.clearTransport = function () {

Socket.prototype.onClose = function (reason, description) {
if ('closed' != this.readyState) {
if (reason != 'server close' ||
reason != 'transport close' ||
reason != 'forced close') {
this.emit('error', description);
}
this.packetsFn = [];
this.sentCallbackFn = [];
this.clearTransport();
this.readyState = 'closed';
this.emit('close', reason, description);
this.emit('end');
this.readable = false;
this.writable = false;
}
};

Expand Down Expand Up @@ -347,13 +380,32 @@ Socket.prototype.getAvailableUpgrades = function () {
return availableUpgrades;
};


/**
* Writes any queued data and given data before closing the socket.
*
* @param {String} optional, data
* @return {Socket} for chaining
* @api public
*/
Socket.prototype.end = function (data) {
if (data) {
this.send(data);
} else {
this.flush();
}
this.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send() or flush() doesn't necessarily really flush writeBuffer to the underlying transport, so data may get lost.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close does not close the immediately, so no data will be lost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For websocket, close() immediately invokes the close method of the underlying socket. And at the time, this.writable might be false, causing the send() or flush() buffers the data only in this.writeBuffer, not into the underlying socket.

return this;
}

/**
* Closes the socket and underlying transport.
*
* @return {Socket} for chaining
* @api public
*/

Socket.prototype.destroy =
Socket.prototype.close = function () {
if ('open' == this.readyState) {
this.readyState = 'closing';
Expand All @@ -362,4 +414,19 @@ Socket.prototype.close = function () {
self.onClose('forced close');
});
}
return this;
};


/**
* Closes the socket and underlying transport once the queue is drained.
*
* @return {Socket} for chaining
* @api public
*/

Socket.prototype.destroySoon = function () {
this.on('drain', this.close.bind(this));
this.flush();
return this;
};
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
, "superagent": "*"
, "engine.io-client": "0.4.3"
, "s": "*"
, "morestreams": "*"
}
, "scripts" : { "test" : "make test" }
, "repository": {
Expand Down
126 changes: 125 additions & 1 deletion test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
*/

var http = require('http')
WebSocket = require('ws');
WebSocket = require('ws')
fs = require('fs')
streams = require('morestreams');

/**
* Tests.
Expand Down Expand Up @@ -1121,4 +1123,126 @@ describe('server', function () {
});
});

describe('stream', function () {
it('is pipable into', function (done) {
var engine = listen({}, function (port) {
var fileSizeOnServer;
engine.on('connection', function (conn) {
// send this test file and compare the received size
fs.createReadStream(__filename).pipe(conn);
fs.stat(__filename, function (err, stats) {
fileSizeOnServer = stats.size;
});
});
var socket = new eioc.Socket('ws://localhost:%d'.s(port));
socket.on('open', function () {
var dataLength = 0;
socket.on('data', function (data) {
dataLength += data.length;
});
socket.on('close', function () {
expect(dataLength).to.be(fileSizeOnServer);
done();
});
});
});
});

it('is pipable from', function (done) {
var buffered = new streams.BufferedStream();
// create simple non-buffered stream, bufferring requires outgoing pipe
delete buffered.chunks;

var engine = listen({}, function (port) {
engine.on('connection', function (conn) {
conn.pipe(buffered);
});
var socket = new eioc.Socket('ws://localhost:%d'.s(port));
socket.on('open', function () {
for(var i = 0; i < 10; i++) {
socket.send(i);
}
});
setTimeout(function () {
socket.close();
}, 1000);
var received = "";
buffered.on('data', function (data) {
received += data;
});
buffered.on('end', function() {
expect(received).to.eql("0123456789");
done();
});
});
});

it('should trigger error on `ping timeout`', function (done) {
var opts = { allowUpgrades: false, pingInterval: 5, pingTimeout: 5 };
var engine = listen(opts, function (port) {
var socket = new eioc.Socket('http://localhost:%d'.s(port));
socket.sendPacket = function (){};
engine.on('connection', function (conn) {
conn.on('error', function (err) {
done();
})
});
});
});

it('should trigger error on `transport error`', function ($done) {
var engine = listen({ allowUpgrades: false }, function (port) {
// hack to access the sockets created by node-xmlhttprequest
// see: https://github.com/driverdan/node-XMLHttpRequest/issues/44
var request = require('http').request;
var sockets = [];
http.request = function(opts){
var req = request.apply(null, arguments);
req.on('socket', function(socket){
sockets.push(socket);
});
return req;
};

function done(){
http.request = request;
$done();
}

var socket = new eioc.Socket('ws://localhost:%d'.s(port))
, serverSocket;

engine.on('connection', function(s){
serverSocket = s;
});

socket.transport.on('poll', function(){
// we set a timer to wait for the request to actually reach
setTimeout(function(){
// kill the underlying connection
sockets[1].end();
serverSocket.on('error', function(err){
expect(err.message).to.be('poll connection closed prematurely');
done();
});
}, 50);
});
});
});

it('should trigger error on `parse error`', function (done) {
var engine = listen({}, function (port) {
engine.on('connection', function (conn) {
conn.on('error', function (err) {
done();
});
});
var socket = new eioc.Socket('ws://localhost:%d'.s(port));
socket.on('open', function () {
// send bad badly parsed packet
socket.sendPacket('error', 'ignored');
});
});
});
});
});