Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not waste time compressing when socket is closed #1464

Merged
merged 13 commits into from
Nov 6, 2018
Merged
14 changes: 14 additions & 0 deletions lib/sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ class Sender {
this._bufferedBytes = 0;
this._deflating = false;
this._queue = [];

if (this._extensions[PerMessageDeflate.extensionName]) {
this._socket.once('close', () => {
const err = new Error('WebSocket is not open: readyState 2 (CLOSING)');

while (this._queue.length) {
const params = this._queue.shift();
const cb = params[params.length - 1];

this._bufferedBytes -= params[1].length;
if (typeof cb === 'function') cb(err);
}
});
}
}

/**
Expand Down
5 changes: 5 additions & 0 deletions lib/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ class WebSocket extends EventEmitter {
maxPayload
);

//
// `Sender` must be instantiated before adding the `socketOnClose` listener.
// This allows the sender queue, when used, to be emptied before
// `socketOnClose` is called.
//
this._sender = new Sender(socket, this._extensions);
this._receiver = receiver;
this._socket = socket;
Expand Down
98 changes: 78 additions & 20 deletions test/sender.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ const assert = require('assert');
const PerMessageDeflate = require('../lib/permessage-deflate');
const Sender = require('../lib/sender');

class MockSocket {
constructor ({ write, once } = {}) {
if (write) this.write = write;
if (once) this.once = once;
}

write () {}
once () {}
}

describe('Sender', function () {
describe('.frame', function () {
it('does not mutate the input buffer if data is `readOnly`', function () {
Expand Down Expand Up @@ -38,12 +48,13 @@ describe('Sender', function () {
it('compresses data if compress option is enabled', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
assert.strictEqual(data[0] & 0x40, 0x40);
if (++count === 3) done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -57,14 +68,53 @@ describe('Sender', function () {
sender.send('hi', options);
});

it('does not compress enqueued messages after socket closes', function (done) {
const numMessages = 100;
let numErrors = 0;

const mockSocket = new MockSocket({
write: (data) => {
// Test that `PerMessageDeflate#compress()` and `Socket#write()` is
// called only once (for the first message that is not queued).
assert.strictEqual(numErrors, numMessages);
done();
},
once: (ev, cb) => {
if (ev === 'close') process.nextTick(cb);
}
});

const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
perMessageDeflate.accept([{}]);

const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

const options = { compress: true, fin: true };
sender.send('hi', options);

for (let i = 0; i < numMessages; i++) {
sender.send('hi', options, (err) => {
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'WebSocket is not open: readyState 2 (CLOSING)'
);
numErrors++;
});
}
});

it('does not compress data for small payloads', function (done) {
const perMessageDeflate = new PerMessageDeflate();
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
assert.notStrictEqual(data[0] & 0x40, 0x40);
done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -76,7 +126,7 @@ describe('Sender', function () {
it('compresses all frames in a fragmented message', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 3 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -87,7 +137,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 6);
done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -100,7 +151,7 @@ describe('Sender', function () {
it('compresses no frames in a fragmented message', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 3 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -111,7 +162,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 5);
done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -124,7 +176,7 @@ describe('Sender', function () {
it('compresses empty buffer as first fragment', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -135,7 +187,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 8);
done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -148,7 +201,7 @@ describe('Sender', function () {
it('compresses empty buffer as last fragment', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -159,7 +212,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 3);
done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -172,11 +226,12 @@ describe('Sender', function () {
it('handles many send calls while processing without crashing on flush', function (done) {
let count = 0;
const perMessageDeflate = new PerMessageDeflate();
const sender = new Sender({
const mockSocket = new MockSocket({
write: () => {
if (++count > 1e4) done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -196,14 +251,15 @@ describe('Sender', function () {
it('works with multiple types of data', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
if (++count === 1) return;

assert.ok(data.equals(Buffer.from([0x89, 0x02, 0x68, 0x69])));
if (count === 4) done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -222,14 +278,15 @@ describe('Sender', function () {
it('works with multiple types of data', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
if (++count === 1) return;

assert.ok(data.equals(Buffer.from([0x8a, 0x02, 0x68, 0x69])));
if (count === 4) done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -249,12 +306,13 @@ describe('Sender', function () {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });

let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data, cb) => {
count++;
if (cb) cb();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand Down
34 changes: 34 additions & 0 deletions test/websocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,40 @@ describe('WebSocket', function () {
});
});

it('reports the state as `CLOSING` in callbacks of discarded queued messages', function (done) {
const wss = new WebSocket.Server({
perMessageDeflate: { threshold: 0 },
port: 0
}, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
});

wss.on('connection', (ws) => {
const map = () => crypto.randomBytes(16);
let count = 100;

Array.from({ length: count }).map(map).forEach((data, i) => {
ws.send(data, (err) => {
assert.ok(err instanceof Error);

if (i > 0) {
// The first message is not queued and compression completes after
// the `'close'` event is emitted on the `net.Socket`.
assert.strictEqual(ws.readyState, WebSocket.CLOSING);
assert.strictEqual(
err.message,
'WebSocket is not open: readyState 2 (CLOSING)'
);
}

if (--count === 0) done();
});
});

wss.close();
});
});

describe('#send', function () {
it('ignores the `compress` option if the extension is disabled', function (done) {
const wss = new WebSocket.Server({ port: 0 }, () => {
Expand Down