diff --git a/lib/agent.js b/lib/agent.js index 333e9d67..b3b74ffa 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -70,9 +70,6 @@ Agent.prototype._init = function initSock(socket) { try { packet = parse(msg) } catch(err) { - message = generate({ code: '5.00', payload: new Buffer('Unable to parse packet') }) - that._sock.send(message, 0, message.length, - rsinfo.port, rsinfo.address) return } @@ -150,9 +147,8 @@ Agent.prototype._handle = function handle(msg, rsinfo, outSocket) { req = this._tkToReq[packet.token.readUInt32BE(0)] } - if (packet.ack && !req) { - // nothing to do, somehow there was - // a duplicate ack + if ((packet.ack || packet.reset) && !req) { + // Nothing to do on unknown or duplicate ACK/RST packet return } @@ -191,7 +187,8 @@ Agent.prototype._handle = function handle(msg, rsinfo, outSocket) { req.sender.reset() - if (packet.code == '0.00') + // Drop empty messages (ACKs), but process RST + if (packet.code == '0.00' && !packet.reset) return var block2Buff = getOption(packet.options, 'Block2') diff --git a/lib/outgoing_message.js b/lib/outgoing_message.js index e85d3215..1979a23d 100644 --- a/lib/outgoing_message.js +++ b/lib/outgoing_message.js @@ -59,19 +59,19 @@ OutgoingMessage.prototype.end = function(a, b) { BufferList.prototype.end.call(this, a, b) var packet = this._packet - , message , that = this packet.code = toCode(this.code || this.statusCode) packet.payload = this + + if (this._ackTimer) + clearTimeout(this._ackTimer) + this._send(this, packet) // easy clean up after generating the packet delete this._packet.payload - if (this._ackTimer) - clearTimeout(this._ackTimer) - return this } @@ -79,22 +79,22 @@ OutgoingMessage.prototype.reset = function() { BufferList.prototype.end.call(this) var packet = this._packet - , message , that = this packet.code = '0.00' packet.payload = '' packet.reset = true; packet.ack = false + packet.token = '' + + if (this._ackTimer) + clearTimeout(this._ackTimer) this._send(this, packet) // easy clean up after generating the packet delete this._packet.payload - if (this._ackTimer) - clearTimeout(this._ackTimer) - return this } diff --git a/test/request.js b/test/request.js index fc75b88f..d5b61e38 100644 --- a/test/request.js +++ b/test/request.js @@ -18,11 +18,13 @@ describe('request', function() { var server , server2 , port + , clock beforeEach(function (done) { port = nextPort() server = dgram.createSocket('udp4') server.bind(port, done) + clock = sinon.useFakeTimers() }) afterEach(function () { @@ -32,8 +34,16 @@ describe('request', function() { server2.close() server = server2 = null + + clock.restore() }) + function fastForward(increase, max) { + clock.tick(increase) + if (increase < max) + setImmediate(fastForward.bind(null, increase, max - increase)) + } + function ackBack(msg, rsinfo) { var packet = parse(msg) , toSend = generate({ @@ -306,6 +316,7 @@ describe('request', function() { setTimeout(function () { done() }, 20) + fastForward(5, 25) }) req.on('response', function (res) { @@ -342,6 +353,99 @@ describe('request', function() { req.end() }) + it('should emit a response on reset', function (done) { + var req = request({ + port: port + }) + + server.on('message', function (msg, rsinfo) { + var packet = parse(msg) + , toSend = generate({ + messageId: packet.messageId + , code: '0.00' + , ack: false + , reset: true + }) + server.send(toSend, 0, toSend.length, rsinfo.port, rsinfo.address) + }) + + req.on('response', function (res) { + if (res.code === '0.00') { + done() + } else { + done(new Error('Unexpected response')) + } + }) + + req.end() + }) + + it('should stop retrying on reset', function (done) { + var req = request({ + port: port + }) + var messages = 0 + + server.on('message', function (msg, rsinfo) { + var packet = parse(msg) + , toSend = generate({ + messageId: packet.messageId + , code: '0.00' + , ack: false + , reset: true + }) + messages++ + server.send(toSend, 0, toSend.length, rsinfo.port, rsinfo.address) + }) + + req.on('response', function (res) { + if (res.code !== '0.00') { + done(new Error('Unexpected response')) + } + }) + req.end() + + setTimeout(function () { + expect(messages).to.eql(1) + done() + }, 45 * 1000) + + fastForward(100, 45 * 1000) + }) + + it('should not send response to invalid packets', function (done) { + var req = request({ + port: port + }) + var messages = 0 + + server.on('message', function (msg, rsinfo) { + var packet = parse(msg) + , toSend = generate({ + messageId: packet.messageId + , code: '0.00' + , ack: true + , payload: 'this payload invalidates empty message' + }) + expect(packet.code).to.be.eq('0.01'); + messages++ + server.send(toSend, 0, toSend.length, rsinfo.port, rsinfo.address) + }) + + req.on('response', function (res) { + done(new Error('Unexpected response')) + }) + + req.end() + + setTimeout(function () { + expect(messages).to.eql(5) + done() + }, 45 * 1000) + + fastForward(100, 45 * 1000) + }) + it('should allow to add an option', function (done) { var req = request({ port: port diff --git a/test/server.js b/test/server.js index c7046457..137ec904 100644 --- a/test/server.js +++ b/test/server.js @@ -227,12 +227,14 @@ describe('server', function() { it('should include a reset() function in the response', function(done) { var buf = new Buffer(25) - send(generate({ payload: buf })) + var tok = new Buffer(4) + send(generate({ payload: buf, token: tok })) client.on('message', function(msg, rinfo) { var result = parse(msg) expect(result.code).to.eql('0.00') expect(result.reset).to.eql(true) expect(result.ack).to.eql(false) + expect(result.token.length).to.eql(0) expect(result.payload.length).to.eql(0) done() });