Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not waste time compressing when socket is closed #1464

Merged
merged 13 commits into from
Nov 6, 2018
Merged
18 changes: 18 additions & 0 deletions lib/sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@ class Sender {
this._bufferedBytes = 0;
this._deflating = false;
this._queue = [];

this._socket.once('close', () => {
const err = new Error(
`WebSocket is not open: readyState CLOSING`
Evertras marked this conversation as resolved.
Show resolved Hide resolved
);

while (this._queue.length) {
const params = this._queue.shift();

this._bufferedBytes -= params[1].length;

const cb = params[params.length - 1];

if (typeof cb === 'function') {
cb(err);
}
}
});
}

/**
Expand Down
2 changes: 2 additions & 0 deletions lib/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ class WebSocket extends EventEmitter {
maxPayload
);

// Note that we must instantiate the Sender before we add socket.on('close') below
// because we want the Sender to empty its queue before sending our own close event.
this._sender = new Sender(socket, this._extensions);
this._receiver = receiver;
this._socket = socket;
Expand Down
111 changes: 81 additions & 30 deletions test/sender.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ const assert = require('assert');
const PerMessageDeflate = require('../lib/permessage-deflate');
const Sender = require('../lib/sender');

class MockSocket {
constructor ({ write, on, once, prependOnceListener } = {}) {
if (write) this.write = write;
if (on) this.on = on;
if (once) this.once = once;
if (prependOnceListener) this.prependOnceListener = prependOnceListener;
}

write () {}
on () {}
once () {}
prependOnceListener () {}
}

describe('Sender', function () {
describe('.frame', function () {
it('does not mutate the input buffer if data is `readOnly`', function () {
Expand Down Expand Up @@ -38,14 +52,13 @@ describe('Sender', function () {
it('compresses data if compress option is enabled', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
assert.strictEqual(data[0] & 0x40, 0x40);
if (++count === 3) done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -57,16 +70,62 @@ describe('Sender', function () {
sender.send('hi', options);
});

it('does not attempt to compress enqueued messages after socket closes', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
const numMessages = 1000;
let numWritten = 0;
const mockSocket = new MockSocket({
write: (data) => {
assert.strictEqual(data[0] & 0x40, 0x40);
if (++numWritten > 1) done(new Error('Too many attempted writes'));
},
once: (ev, cb) => {
if (ev === 'close') {
process.nextTick(cb);
}
}
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

let numCompressed = 0;

perMessageDeflate.compress = (data, fin, cb) => {
if (++numCompressed > 1) {
done(new Error('Compressed too many times'));
}

setTimeout(() => cb(null, data), 1);
};

perMessageDeflate.accept([{}]);

const options = { compress: true, fin: false };
const array = new Uint8Array([0x68, 0x69]);

sender.send(array.buffer, options, () => {});
sender.send(array, options, () => {});

let numErrors = 0;
for (let i = 0; i < numMessages; ++i) {
sender.send('hi', options, (err) => {
if (!err) return;

if (++numErrors === numMessages) {
done();
}
});
}
});

it('does not compress data for small payloads', function (done) {
const perMessageDeflate = new PerMessageDeflate();
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
assert.notStrictEqual(data[0] & 0x40, 0x40);
done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -76,7 +135,7 @@ describe('Sender', function () {
it('compresses all frames in a fragmented message', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 3 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -87,9 +146,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 6);
done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -100,7 +158,7 @@ describe('Sender', function () {
it('compresses no frames in a fragmented message', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 3 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -111,9 +169,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 5);
done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -124,7 +181,7 @@ describe('Sender', function () {
it('compresses empty buffer as first fragment', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -135,9 +192,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 8);
done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -148,7 +204,7 @@ describe('Sender', function () {
it('compresses empty buffer as last fragment', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -159,9 +215,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 3);
done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -172,13 +227,12 @@ describe('Sender', function () {
it('handles many send calls while processing without crashing on flush', function (done) {
let count = 0;
const perMessageDeflate = new PerMessageDeflate();
const sender = new Sender({
const mockSocket = new MockSocket({
write: () => {
if (++count > 1e4) done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -196,16 +250,15 @@ describe('Sender', function () {
it('works with multiple types of data', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
if (++count === 1) return;

assert.ok(data.equals(Buffer.from([0x89, 0x02, 0x68, 0x69])));
if (count === 4) done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -222,16 +275,15 @@ describe('Sender', function () {
it('works with multiple types of data', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
if (++count === 1) return;

assert.ok(data.equals(Buffer.from([0x8a, 0x02, 0x68, 0x69])));
if (count === 4) done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -249,14 +301,13 @@ describe('Sender', function () {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });

let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data, cb) => {
count++;
if (cb) cb();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand Down
31 changes: 31 additions & 0 deletions test/websocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,37 @@ describe('WebSocket', function () {
});
});

it('reports the web socket as CLOSING in error callbacks when connection is terminated abnormally', function (done) {
const wss = new WebSocket.Server({
perMessageDeflate: { threshold: 0 },
port: 0
}, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`, {
perMessageDeflate: { threshold: 0 } });
const messages = [];

ws.on('message', (message) => messages.push(message));
ws.on('close', (code) => {
console.log('closing ws');
assert.strictEqual(code, 1006);
assert.deepStrictEqual(messages, []);
done();
});
});

wss.on('connection', (ws) => {
let checkState = () => {
if (ws._sender._queue.length) {
assert.strictEqual(ws.readyState, WebSocket.CLOSING);
}
};
for (let i = 0; i < 1000; ++i) {
ws.send('foo', { compress: true }, checkState);
}
wss.close();
});
});

describe('#send', function () {
it('ignores the `compress` option if the extension is disabled', function (done) {
const wss = new WebSocket.Server({ port: 0 }, () => {
Expand Down