Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RST packet processing fixes #156

Merged
merged 3 commits into from
Dec 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ Agent.prototype._init = function initSock(socket) {
try {
packet = parse(msg)
} catch(err) {
message = generate({ code: '5.00', payload: new Buffer('Unable to parse packet') })
that._sock.send(message, 0, message.length,
rsinfo.port, rsinfo.address)
return
}

Expand Down Expand Up @@ -150,9 +147,8 @@ Agent.prototype._handle = function handle(msg, rsinfo, outSocket) {
req = this._tkToReq[packet.token.readUInt32BE(0)]
}

if (packet.ack && !req) {
// nothing to do, somehow there was
// a duplicate ack
if ((packet.ack || packet.reset) && !req) {
// Nothing to do on unknown or duplicate ACK/RST packet
return
}

Expand Down Expand Up @@ -191,7 +187,8 @@ Agent.prototype._handle = function handle(msg, rsinfo, outSocket) {

req.sender.reset()

if (packet.code == '0.00')
// Drop empty messages (ACKs), but process RST
if (packet.code == '0.00' && !packet.reset)
return

var block2Buff = getOption(packet.options, 'Block2')
Expand Down
16 changes: 8 additions & 8 deletions lib/outgoing_message.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,42 +59,42 @@ OutgoingMessage.prototype.end = function(a, b) {
BufferList.prototype.end.call(this, a, b)

var packet = this._packet
, message
, that = this

packet.code = toCode(this.code || this.statusCode)
packet.payload = this

if (this._ackTimer)
clearTimeout(this._ackTimer)

this._send(this, packet)

// easy clean up after generating the packet
delete this._packet.payload

if (this._ackTimer)
clearTimeout(this._ackTimer)

return this
}

OutgoingMessage.prototype.reset = function() {
BufferList.prototype.end.call(this)

var packet = this._packet
, message
, that = this

packet.code = '0.00'
packet.payload = ''
packet.reset = true;
packet.ack = false
packet.token = ''

if (this._ackTimer)
clearTimeout(this._ackTimer)

this._send(this, packet)

// easy clean up after generating the packet
delete this._packet.payload

if (this._ackTimer)
clearTimeout(this._ackTimer)

return this
}

Expand Down
104 changes: 104 additions & 0 deletions test/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ describe('request', function() {
var server
, server2
, port
, clock

beforeEach(function (done) {
port = nextPort()
server = dgram.createSocket('udp4')
server.bind(port, done)
clock = sinon.useFakeTimers()
})

afterEach(function () {
Expand All @@ -32,8 +34,16 @@ describe('request', function() {
server2.close()

server = server2 = null

clock.restore()
})

function fastForward(increase, max) {
clock.tick(increase)
if (increase < max)
setImmediate(fastForward.bind(null, increase, max - increase))
}

function ackBack(msg, rsinfo) {
var packet = parse(msg)
, toSend = generate({
Expand Down Expand Up @@ -306,6 +316,7 @@ describe('request', function() {
setTimeout(function () {
done()
}, 20)
fastForward(5, 25)
})

req.on('response', function (res) {
Expand Down Expand Up @@ -342,6 +353,99 @@ describe('request', function() {
req.end()
})

it('should emit a response on reset', function (done) {
var req = request({
port: port
})

server.on('message', function (msg, rsinfo) {
var packet = parse(msg)
, toSend = generate({
messageId: packet.messageId
, code: '0.00'
, ack: false
, reset: true
})
server.send(toSend, 0, toSend.length, rsinfo.port, rsinfo.address)
})

req.on('response', function (res) {
if (res.code === '0.00') {
done()
} else {
done(new Error('Unexpected response'))
}
})

req.end()
})

it('should stop retrying on reset', function (done) {
var req = request({
port: port
})
var messages = 0

server.on('message', function (msg, rsinfo) {
var packet = parse(msg)
, toSend = generate({
messageId: packet.messageId
, code: '0.00'
, ack: false
, reset: true
})
messages++
server.send(toSend, 0, toSend.length, rsinfo.port, rsinfo.address)
})

req.on('response', function (res) {
if (res.code !== '0.00') {
done(new Error('Unexpected response'))
}
})
req.end()

setTimeout(function () {
expect(messages).to.eql(1)
done()
}, 45 * 1000)

fastForward(100, 45 * 1000)
})

it('should not send response to invalid packets', function (done) {
var req = request({
port: port
})
var messages = 0

server.on('message', function (msg, rsinfo) {
var packet = parse(msg)
, toSend = generate({
messageId: packet.messageId
, code: '0.00'
, ack: true
, payload: 'this payload invalidates empty message'
})
expect(packet.code).to.be.eq('0.01');
messages++
server.send(toSend, 0, toSend.length, rsinfo.port, rsinfo.address)
})

req.on('response', function (res) {
done(new Error('Unexpected response'))
})

req.end()

setTimeout(function () {
expect(messages).to.eql(5)
done()
}, 45 * 1000)

fastForward(100, 45 * 1000)
})

it('should allow to add an option', function (done) {
var req = request({
port: port
Expand Down
4 changes: 3 additions & 1 deletion test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,14 @@ describe('server', function() {

it('should include a reset() function in the response', function(done) {
var buf = new Buffer(25)
send(generate({ payload: buf }))
var tok = new Buffer(4)
send(generate({ payload: buf, token: tok }))
client.on('message', function(msg, rinfo) {
var result = parse(msg)
expect(result.code).to.eql('0.00')
expect(result.reset).to.eql(true)
expect(result.ack).to.eql(false)
expect(result.token.length).to.eql(0)
expect(result.payload.length).to.eql(0)
done()
});
Expand Down