Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature]: zlib deflate concurrency limit #1204

Merged
merged 1 commit into from
Sep 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/ws.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ provided then that is extension parameters:
- `memLevel` {Number} The value of zlib's `memLevel` param (1-9, default 8).
- `threshold` {Number} Payloads smaller than this will not be compressed.
Defaults to 1024 bytes.
- `concurrencyLimit` {Number} The number of concurrent calls to zlib.
Calls above this limit will be queued. Default 10. You usually won't
need to touch this option. See [concurrency-limit][this issue] for more
details.

If a property is empty then either an offered configuration or a default value
is used.
Expand Down Expand Up @@ -425,4 +429,5 @@ Forcibly close the connection.

The URL of the WebSocket server. Server clients don't have this attribute.

[concurrency-limit]: https://github.com/websockets/ws/issues/1202
[permessage-deflate]: https://tools.ietf.org/html/draft-ietf-hybi-permessage-compression-19
56 changes: 53 additions & 3 deletions lib/PerMessageDeflate.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const safeBuffer = require('safe-buffer');
const zlib = require('zlib');
const Limiter = require('async-limiter');

const bufferUtil = require('./BufferUtil');

Expand All @@ -10,6 +11,14 @@ const Buffer = safeBuffer.Buffer;
const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
const EMPTY_BLOCK = Buffer.from([0x00]);

// We limit zlib concurrency, which prevents severe memory fragmentation
// as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
// and https://github.com/websockets/ws/issues/1202
//
// Intentionally global; it's the global thread pool that's
// an issue.
let zlibLimiter;

/**
* Per-message Deflate implementation.
*/
Expand All @@ -25,6 +34,13 @@ class PerMessageDeflate {
this._inflate = null;

this.params = null;

if (!zlibLimiter) {
const concurrency = this._options.concurrencyLimit !== undefined
? this._options.concurrencyLimit
: 10;
zlibLimiter = new Limiter({ concurrency });
}
}

static get extensionName () {
Expand Down Expand Up @@ -249,14 +265,48 @@ class PerMessageDeflate {
}

/**
* Decompress data.
* Decompress data. Concurrency limited by async-limiter.
*
* @param {Buffer} data Compressed data
* @param {Boolean} fin Specifies whether or not this is the last fragment
* @param {Function} callback Callback
* @public
*/
decompress (data, fin, callback) {
zlibLimiter.push((done) => {
this._decompress(data, fin, (err, result) => {
done();
callback(err, result);
});
});
}

/**
* Compress data. Concurrency limited by async-limiter.
*
* @param {Buffer} data Data to compress
* @param {Boolean} fin Specifies whether or not this is the last fragment
* @param {Function} callback Callback
* @public
*/
compress (data, fin, callback) {
zlibLimiter.push((done) => {
this._compress(data, fin, (err, result) => {
done();
callback(err, result);
});
});
}

/**
* Decompress data.
*
* @param {Buffer} data Compressed data
* @param {Boolean} fin Specifies whether or not this is the last fragment
* @param {Function} callback Callback
* @private
*/
_decompress (data, fin, callback) {
const endpoint = this._isServer ? 'client' : 'server';

if (!this._inflate) {
Expand Down Expand Up @@ -322,9 +372,9 @@ class PerMessageDeflate {
* @param {Buffer} data Data to compress
* @param {Boolean} fin Specifies whether or not this is the last fragment
* @param {Function} callback Callback
* @public
* @private
*/
compress (data, fin, callback) {
_compress (data, fin, callback) {
if (!data || data.length === 0) {
process.nextTick(callback, null, EMPTY_BLOCK);
return;
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"lint": "eslint ."
},
"dependencies": {
"async-limiter": "~1.0.0",
"safe-buffer": "~5.1.0",
"ultron": "~1.1.0"
},
Expand Down