Skip to content

Commit

Permalink
[fix] Resume the socket in the next tick
Browse files Browse the repository at this point in the history
Ensure that `socket.resume()` is called after `socket.pause()`.

Fixes #1940
  • Loading branch information
lpinca committed Aug 28, 2021
1 parent ea6c054 commit 869c989
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 3 deletions.
22 changes: 20 additions & 2 deletions lib/websocket.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Readable$" }] */

'use strict';

const EventEmitter = require('events');
Expand All @@ -6,6 +8,7 @@ const http = require('http');
const net = require('net');
const tls = require('tls');
const { randomBytes, createHash } = require('crypto');
const { Readable } = require('stream');
const { URL } = require('url');

const PerMessageDeflate = require('./permessage-deflate');
Expand Down Expand Up @@ -954,7 +957,7 @@ function receiverOnConclude(code, reason) {
const websocket = this[kWebSocket];

websocket._socket.removeListener('data', socketOnData);
websocket._socket.resume();
process.nextTick(resume, websocket._socket);

websocket._closeFrameReceived = true;
websocket._closeMessage = reason;
Expand Down Expand Up @@ -983,7 +986,12 @@ function receiverOnError(err) {
const websocket = this[kWebSocket];

websocket._socket.removeListener('data', socketOnData);
websocket._socket.resume();

//
// On Node.js < 14.0.0 the `'error'` event is emitted synchronously. See
// https://github.com/websockets/ws/issues/1940.
//
process.nextTick(resume, websocket._socket);

websocket.close(err[kStatusCode]);
websocket.emit('error', err);
Expand Down Expand Up @@ -1032,6 +1040,16 @@ function receiverOnPong(data) {
this[kWebSocket].emit('pong', data);
}

/**
* Resume a readable stream
*
* @param {Readable} stream The readable stream
* @private
*/
function resume(stream) {
stream.resume();
}

/**
* The listener of the `net.Socket` `'close'` event.
*
Expand Down
56 changes: 55 additions & 1 deletion test/websocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3308,7 +3308,7 @@ describe('WebSocket', () => {
});
});

describe('Connection close edge cases', () => {
describe('Connection close', () => {
it('closes cleanly after simultaneous errors (1/2)', (done) => {
let clientCloseEventEmitted = false;
let serverClientCloseEventEmitted = false;
Expand Down Expand Up @@ -3420,5 +3420,59 @@ describe('WebSocket', () => {
});
});
});

it('resumes the socket when an error occurs', (done) => {
const maxPayload = 16 * 1024;
const wss = new WebSocket.Server({ maxPayload, port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
});

wss.on('connection', (ws) => {
const list = [
...Sender.frame(Buffer.alloc(maxPayload + 1), {
fin: true,
opcode: 0x02,
mask: true,
readOnly: false
})
];

ws.on('error', (err) => {
assert.ok(err instanceof RangeError);
assert.strictEqual(err.code, 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH');
assert.strictEqual(err.message, 'Max payload size exceeded');

ws.on('close', (code, reason) => {
assert.strictEqual(code, 1006);
assert.strictEqual(reason, EMPTY_BUFFER);
wss.close(done);
});
});

ws._socket.push(Buffer.concat(list));
});
});

it('resumes the socket when the close frame is received', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
});

wss.on('connection', (ws) => {
const opts = { fin: true, mask: true, readOnly: false };
const list = [
...Sender.frame(Buffer.alloc(16 * 1024), { opcode: 0x02, ...opts }),
...Sender.frame(EMPTY_BUFFER, { opcode: 0x08, ...opts })
];

ws.on('close', (code, reason) => {
assert.strictEqual(code, 1005);
assert.strictEqual(reason, EMPTY_BUFFER);
wss.close(done);
});

ws._socket.push(Buffer.concat(list));
});
});
});
});

0 comments on commit 869c989

Please sign in to comment.