diff --git a/lib/socket.js b/lib/socket.js index b1314899b..5d07ce12e 100644 --- a/lib/socket.js +++ b/lib/socket.js @@ -26,6 +26,7 @@ function Socket (id, server, transport, req) { this.writeBuffer = []; this.packetsFn = []; this.sentCallbackFn = []; + this.cleanupFn = []; this.request = req; // Cache IP since it might not be in the req later @@ -93,7 +94,6 @@ Socket.prototype.onPacket = function (packet) { break; case 'error': - this.transport.close(); this.onClose('parse error'); break; @@ -129,7 +129,6 @@ Socket.prototype.setPingTimeout = function () { var self = this; clearTimeout(self.pingTimeoutTimer); self.pingTimeoutTimer = setTimeout(function () { - self.transport.close(); self.onClose('ping timeout'); }, self.server.pingInterval + self.server.pingTimeout); }; @@ -142,13 +141,25 @@ Socket.prototype.setPingTimeout = function () { */ Socket.prototype.setTransport = function (transport) { + var onError = this.onError.bind(this); + var onPacket = this.onPacket.bind(this); + var flush = this.flush.bind(this); + var onClose = this.onClose.bind(this, 'transport close'); + this.transport = transport; - this.transport.once('error', this.onError.bind(this)); - this.transport.on('packet', this.onPacket.bind(this)); - this.transport.on('drain', this.flush.bind(this)); - this.transport.once('close', this.onClose.bind(this, 'transport close')); + this.transport.once('error', onError); + this.transport.on('packet', onPacket); + this.transport.on('drain', flush); + this.transport.once('close', onClose); //this function will manage packet events (also message callbacks) this.setupSendCallback(); + + this.cleanupFn.push(function() { + transport.removeListener('error', onError); + transport.removeListener('packet', onPacket); + transport.removeListener('drain', flush); + transport.removeListener('close', onClose); + }); }; /** @@ -178,6 +189,7 @@ Socket.prototype.maybeUpgrade = function (transport) { function onPacket(packet){ if ('ping' == packet.type && 'probe' == packet.data) { transport.send([{ type: 'pong', data: 'probe', options: { compress: true } }]); + self.emit('upgrading', transport); clearInterval(self.checkIntervalTimer); self.checkIntervalTimer = setInterval(check, 100); } else if ('upgrade' == packet.type && self.readyState != 'closed') { @@ -252,6 +264,9 @@ Socket.prototype.maybeUpgrade = function (transport) { */ Socket.prototype.clearTransport = function () { + var cleanup; + while (cleanup = this.cleanupFn.shift()) cleanup(); + // silence further transport errors and prevent uncaught exceptions this.transport.on('error', function(){ debug('error triggered by discarded transport'); @@ -271,6 +286,7 @@ Socket.prototype.clearTransport = function () { Socket.prototype.onClose = function (reason, description) { if ('closed' != this.readyState) { + this.readyState = 'closed'; clearTimeout(this.pingTimeoutTimer); clearInterval(this.checkIntervalTimer); this.checkIntervalTimer = null; @@ -284,7 +300,6 @@ Socket.prototype.onClose = function (reason, description) { this.packetsFn = []; this.sentCallbackFn = []; this.clearTransport(); - this.readyState = 'closed'; this.emit('close', reason, description); } }; @@ -297,8 +312,14 @@ Socket.prototype.onClose = function (reason, description) { Socket.prototype.setupSendCallback = function () { var self = this; + this.transport.on('drain', onDrain); + + this.cleanupFn.push(function() { + self.transport.removeListener('drain', onDrain); + }); + //the message was sent successfully, execute the callback - this.transport.on('drain', function() { + function onDrain() { if (self.sentCallbackFn.length > 0) { var seqFn = self.sentCallbackFn.splice(0,1)[0]; if ('function' == typeof seqFn) { @@ -313,7 +334,7 @@ Socket.prototype.setupSendCallback = function () { } } } - }); + } }; /** diff --git a/lib/transports/polling.js b/lib/transports/polling.js index c2d6d6436..a659d77c1 100644 --- a/lib/transports/polling.js +++ b/lib/transports/polling.js @@ -28,6 +28,8 @@ module.exports = Polling; function Polling (req) { Transport.call(this, req); + + this.closeTimeout = 30 * 1000; } /** @@ -364,6 +366,9 @@ Polling.prototype.compress = function (data, encoding, callback) { Polling.prototype.doClose = function (fn) { debug('closing'); + var self = this; + var closeTimeoutTimer; + if (this.dataReq) { debug('aborting ongoing data request'); this.dataReq.destroy(); @@ -372,9 +377,16 @@ Polling.prototype.doClose = function (fn) { if (this.writable) { debug('transport writable - closing right away'); this.send([{ type: 'close', options: { compress: true } }]); - fn(); + onClose(); } else { debug('transport not writable - buffering orderly close'); - this.shouldClose = fn; + this.shouldClose = onClose; + closeTimeoutTimer = setTimeout(onClose, this.closeTimeout); + } + + function onClose() { + clearTimeout(closeTimeoutTimer); + fn(); + self.onClose(); } }; diff --git a/test/server.js b/test/server.js index ad1f8deb0..4868076a5 100644 --- a/test/server.js +++ b/test/server.js @@ -843,6 +843,84 @@ describe('server', function () { }); }); + it('should close transport upon ping timeout (ws)', function (done) { + var opts = { allowUpgrades: false, transports: ['websocket'], pingInterval: 50, pingTimeout: 30 }; + var engine = listen(opts, function (port) { + engine.on('connection', function (conn) { + conn.transport.on('close', done); + }); + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] }); + // override to simulate an inactive client + socket.sendPacket = socket.onHeartbeat = function (){}; + }); + }); + + it('should close transport upon ping timeout (polling)', function (done) { + var opts = { allowUpgrades: false, transports: ['polling'], pingInterval: 50, pingTimeout: 30 }; + var engine = listen(opts, function (port) { + engine.on('connection', function (conn) { + conn.transport.on('close', done); + }); + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); + // override to simulate an inactive client + socket.sendPacket = socket.onHeartbeat = function (){}; + }); + }); + + it('should close transport upon parse error (ws)', function (done) { + var opts = { allowUpgrades: false, transports: ['websocket'] }; + var engine = listen(opts, function (port) { + engine.on('connection', function (conn) { + conn.transport.on('close', done); + }); + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] }); + socket.on('open', function () { + socket.transport.ws.send('invalid'); + }); + }); + }); + + it('should close transport upon parse error (polling)', function (done) { + var opts = { allowUpgrades: false, transports: ['polling'] }; + var engine = listen(opts, function (port) { + engine.on('connection', function (conn) { + conn.transport.closeTimeout = 100; + conn.transport.on('close', done); + }); + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); + socket.on('open', function () { + socket.transport.doWrite('invalid', function (){}); + }); + }); + }); + + it('should close upgrading transport upon socket close', function (done) { + var engine = listen(function (port) { + engine.on('connection', function (conn) { + conn.on('upgrading', function (transport) { + transport.on('close', done); + conn.close(); + }); + }); + new eioc.Socket('ws://localhost:%d'.s(port)); + }); + }); + + it('should close upgrading transport upon upgrade timeout', function (done) { + var opts = { upgradeTimeout: 100 }; + var engine = listen(opts, function (port) { + engine.on('connection', function (conn) { + conn.on('upgrading', function (transport) { + transport.on('close', done); + }); + }); + var socket = new eioc.Socket('ws://localhost:%d'.s(port)); + socket.on('upgrading', function (transport) { + // override not to complete upgrading + transport.send = function (){}; + }); + }); + }); }); describe('messages', function () {