From add4b0ab8cc0ec663cd4623e9032c14830873760 Mon Sep 17 00:00:00 2001 From: Brian White Date: Tue, 30 May 2017 12:56:09 -0400 Subject: [PATCH] zlib: improve performance PR-URL: https://github.com/nodejs/node/pull/13322 Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- benchmark/zlib/creation.js | 32 + benchmark/zlib/deflate.js | 54 ++ lib/zlib.js | 613 +++++++++--------- src/node_constants.cc | 16 - src/node_internals.h | 17 + src/node_zlib.cc | 106 +-- .../test-zlib.zlib-binding.deflate.js | 4 +- 7 files changed, 482 insertions(+), 360 deletions(-) create mode 100644 benchmark/zlib/creation.js create mode 100644 benchmark/zlib/deflate.js diff --git a/benchmark/zlib/creation.js b/benchmark/zlib/creation.js new file mode 100644 index 00000000000000..a207665bd90fba --- /dev/null +++ b/benchmark/zlib/creation.js @@ -0,0 +1,32 @@ +'use strict'; +var common = require('../common.js'); +var zlib = require('zlib'); + +var bench = common.createBenchmark(main, { + type: [ + 'Deflate', 'DeflateRaw', 'Inflate', 'InflateRaw', 'Gzip', 'Gunzip', 'Unzip' + ], + options: ['true', 'false'], + n: [5e5] +}); + +function main(conf) { + var n = +conf.n; + var fn = zlib['create' + conf.type]; + if (typeof fn !== 'function') + throw new Error('Invalid zlib type'); + var i = 0; + + if (conf.options === 'true') { + var opts = {}; + bench.start(); + for (; i < n; ++i) + fn(opts); + bench.end(n); + } else { + bench.start(); + for (; i < n; ++i) + fn(); + bench.end(n); + } +} diff --git a/benchmark/zlib/deflate.js b/benchmark/zlib/deflate.js new file mode 100644 index 00000000000000..0874884387666d --- /dev/null +++ b/benchmark/zlib/deflate.js @@ -0,0 +1,54 @@ +'use strict'; +var common = require('../common.js'); +var zlib = require('zlib'); + +var bench = common.createBenchmark(main, { + method: ['createDeflate', 'deflate', 'deflateSync'], + inputLen: [1024], + n: [4e5] +}); + +function main(conf) { + var n = +conf.n; + var method = conf.method; + var chunk = Buffer.alloc(+conf.inputLen, 'a'); + + var i = 0; + switch (method) { + // Performs `n` writes for a single deflate stream + case 'createDeflate': + var deflater = zlib.createDeflate(); + deflater.resume(); + deflater.on('finish', () => { + bench.end(n); + }); + + bench.start(); + (function next() { + if (i++ === n) + return deflater.end(); + deflater.write(chunk, next); + })(); + break; + // Performs `n` single deflate operations + case 'deflate': + var deflate = zlib.deflate; + bench.start(); + (function next(err, result) { + if (i++ === n) + return bench.end(n); + deflate(chunk, next); + })(); + break; + // Performs `n` single deflateSync operations + case 'deflateSync': + var deflateSync = zlib.deflateSync; + bench.start(); + for (; i < n; ++i) + deflateSync(chunk); + bench.end(n); + break; + default: + throw new Error('Unsupported deflate method'); + } +} diff --git a/lib/zlib.js b/lib/zlib.js index a25901ac6ef16d..c57e83661bfa9b 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -23,6 +23,7 @@ const Buffer = require('buffer').Buffer; const Transform = require('_stream_transform'); +const { _extend } = require('util'); const binding = process.binding('zlib'); const assert = require('assert').ok; const kMaxLength = require('buffer').kMaxLength; @@ -30,6 +31,13 @@ const kRangeErrorMessage = 'Cannot create final Buffer. It would be larger ' + `than 0x${kMaxLength.toString(16)} bytes`; const constants = process.binding('constants').zlib; +const { + Z_NO_FLUSH, Z_BLOCK, Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH, Z_MIN_CHUNK, + Z_MIN_WINDOWBITS, Z_MAX_WINDOWBITS, Z_MIN_LEVEL, Z_MAX_LEVEL, Z_MIN_MEMLEVEL, + Z_MAX_MEMLEVEL, Z_DEFAULT_CHUNK, Z_DEFAULT_COMPRESSION, Z_DEFAULT_STRATEGY, + Z_DEFAULT_WINDOWBITS, Z_DEFAULT_MEMLEVEL, Z_FIXED, DEFLATE, DEFLATERAW, + INFLATE, INFLATERAW, GZIP, GUNZIP, UNZIP +} = constants; const { inherits } = require('util'); // translation table for return codes. @@ -51,38 +59,6 @@ for (var ck = 0; ck < ckeys.length; ck++) { codes[codes[ckey]] = ckey; } -function isInvalidFlushFlag(flag) { - return typeof flag !== 'number' || - flag < constants.Z_NO_FLUSH || - flag > constants.Z_BLOCK; - - // Covers: constants.Z_NO_FLUSH (0), - // constants.Z_PARTIAL_FLUSH (1), - // constants.Z_SYNC_FLUSH (2), - // constants.Z_FULL_FLUSH (3), - // constants.Z_FINISH (4), and - // constants.Z_BLOCK (5) -} - -function isInvalidStrategy(strategy) { - return typeof strategy !== 'number' || - strategy < constants.Z_DEFAULT_STRATEGY || - strategy > constants.Z_FIXED; - - // Covers: constants.Z_FILTERED, (1) - // constants.Z_HUFFMAN_ONLY (2), - // constants.Z_RLE (3), - // constants.Z_FIXED (4), and - // constants.Z_DEFAULT_STRATEGY (0) -} - -function responseData(engine, buffer) { - if (engine._opts.info) { - return { buffer, engine }; - } - return buffer; -} - function zlibBuffer(engine, buffer, callback) { // Streams do not support non-Buffer ArrayBufferViews yet. Convert it to a // Buffer without copying. @@ -90,73 +66,76 @@ function zlibBuffer(engine, buffer, callback) { Object.getPrototypeOf(buffer) !== Buffer.prototype) { buffer = Buffer.from(buffer.buffer, buffer.byteOffset, buffer.byteLength); } - - var buffers = []; - var nread = 0; - - engine.on('error', onError); - engine.on('end', onEnd); - + engine.buffers = null; + engine.nread = 0; + engine.cb = callback; + engine.on('data', zlibBufferOnData); + engine.on('error', zlibBufferOnError); + engine.on('end', zlibBufferOnEnd); engine.end(buffer); - flow(); - - function flow() { - var chunk; - while (null !== (chunk = engine.read())) { - buffers.push(chunk); - nread += chunk.byteLength; - } - engine.once('readable', flow); - } - - function onError(err) { - engine.removeListener('end', onEnd); - engine.removeListener('readable', flow); - callback(err); - } +} - function onEnd() { - var buf; - var err = null; +function zlibBufferOnData(chunk) { + if (!this.buffers) + this.buffers = [chunk]; + else + this.buffers.push(chunk); + this.nread += chunk.length; +} - if (nread >= kMaxLength) { - err = new RangeError(kRangeErrorMessage); - } else { - buf = Buffer.concat(buffers, nread); - } +function zlibBufferOnError(err) { + this.removeAllListeners('end'); + this.cb(err); +} - buffers = []; - engine.close(); - callback(err, responseData(engine, buf)); +function zlibBufferOnEnd() { + var buf; + var err; + if (this.nread >= kMaxLength) { + err = new RangeError(kRangeErrorMessage); + } else { + var bufs = this.buffers; + buf = (bufs.length === 1 ? bufs[0] : Buffer.concat(bufs, this.nread)); } + this.close(); + if (err) + this.cb(err); + else if (this._info) + this.cb(null, { buffer: buf, engine: this }); + else + this.cb(null, buf); } function zlibBufferSync(engine, buffer) { - if (typeof buffer === 'string') + if (typeof buffer === 'string') { buffer = Buffer.from(buffer); - else if (!ArrayBuffer.isView(buffer)) + } else if (!ArrayBuffer.isView(buffer)) { throw new TypeError('"buffer" argument must be a string, Buffer, ' + 'TypedArray, or DataView'); - - var flushFlag = engine._finishFlushFlag; - - return responseData(engine, engine._processChunk(buffer, flushFlag)); + } + buffer = processChunkSync(engine, buffer, engine._finishFlushFlag); + if (engine._info) + return { buffer, engine }; + else + return buffer; } function zlibOnError(message, errno) { + var self = this.jsref; // there is no way to cleanly recover. // continuing only obscures problems. - _close(this); - this._hadError = true; + _close(self); + self._hadError = true; var error = new Error(message); error.errno = errno; error.code = codes[errno]; - this.emit('error', error); + self.emit('error', error); } function flushCallback(level, strategy, callback) { - assert(this._handle, 'zlib binding closed'); + if (!this._handle) + assert(false, 'zlib binding closed'); this._handle.params(level, strategy); if (!this._hadError) { this._level = level; @@ -170,97 +149,114 @@ function flushCallback(level, strategy, callback) { // true or false if there is anything in the queue when // you call the .write() method. function Zlib(opts, mode) { - opts = opts || {}; Transform.call(this, opts); + var chunkSize = Z_DEFAULT_CHUNK; + var flush = Z_NO_FLUSH; + var finishFlush = Z_FINISH; + var windowBits = Z_DEFAULT_WINDOWBITS; + var level = Z_DEFAULT_COMPRESSION; + var memLevel = Z_DEFAULT_MEMLEVEL; + var strategy = Z_DEFAULT_STRATEGY; + var dictionary; + if (opts) { + chunkSize = opts.chunkSize; + if (chunkSize !== undefined && chunkSize === chunkSize) { + if (chunkSize < Z_MIN_CHUNK || !Number.isFinite(chunkSize)) + throw new RangeError('Invalid chunk size: ' + chunkSize); + } else { + chunkSize = Z_DEFAULT_CHUNK; + } - this.bytesRead = 0; - - this._opts = opts; - this._chunkSize = opts.chunkSize || constants.Z_DEFAULT_CHUNK; - - if (opts.flush && isInvalidFlushFlag(opts.flush)) { - throw new RangeError('Invalid flush flag: ' + opts.flush); - } - if (opts.finishFlush && isInvalidFlushFlag(opts.finishFlush)) { - throw new RangeError('Invalid flush flag: ' + opts.finishFlush); - } - - this._flushFlag = opts.flush || constants.Z_NO_FLUSH; - this._finishFlushFlag = opts.finishFlush !== undefined ? - opts.finishFlush : constants.Z_FINISH; + flush = opts.flush; + if (flush !== undefined && flush === flush) { + if (flush < Z_NO_FLUSH || flush > Z_BLOCK || !Number.isFinite(flush)) + throw new RangeError('Invalid flush flag: ' + flush); + } else { + flush = Z_NO_FLUSH; + } - if (opts.chunkSize !== undefined) { - if (opts.chunkSize < constants.Z_MIN_CHUNK) { - throw new RangeError('Invalid chunk size: ' + opts.chunkSize); + finishFlush = opts.finishFlush; + if (finishFlush !== undefined && finishFlush === finishFlush) { + if (finishFlush < Z_NO_FLUSH || finishFlush > Z_BLOCK || + !Number.isFinite(finishFlush)) { + throw new RangeError('Invalid flush flag: ' + finishFlush); + } + } else { + finishFlush = Z_FINISH; } - } - if (opts.windowBits !== undefined) { - if (opts.windowBits < constants.Z_MIN_WINDOWBITS || - opts.windowBits > constants.Z_MAX_WINDOWBITS) { - throw new RangeError('Invalid windowBits: ' + opts.windowBits); + windowBits = opts.windowBits; + if (windowBits !== undefined && windowBits === windowBits) { + if (windowBits < Z_MIN_WINDOWBITS || windowBits > Z_MAX_WINDOWBITS || + !Number.isFinite(windowBits)) { + throw new RangeError('Invalid windowBits: ' + windowBits); + } + } else { + windowBits = Z_DEFAULT_WINDOWBITS; } - } - if (opts.level !== undefined) { - if (opts.level < constants.Z_MIN_LEVEL || - opts.level > constants.Z_MAX_LEVEL) { - throw new RangeError('Invalid compression level: ' + opts.level); + level = opts.level; + if (level !== undefined && level === level) { + if (level < Z_MIN_LEVEL || level > Z_MAX_LEVEL || + !Number.isFinite(level)) { + throw new RangeError('Invalid compression level: ' + level); + } + } else { + level = Z_DEFAULT_COMPRESSION; } - } - if (opts.memLevel !== undefined) { - if (opts.memLevel < constants.Z_MIN_MEMLEVEL || - opts.memLevel > constants.Z_MAX_MEMLEVEL) { - throw new RangeError('Invalid memLevel: ' + opts.memLevel); + memLevel = opts.memLevel; + if (memLevel !== undefined && memLevel === memLevel) { + if (memLevel < Z_MIN_MEMLEVEL || memLevel > Z_MAX_MEMLEVEL || + !Number.isFinite(memLevel)) { + throw new RangeError('Invalid memLevel: ' + memLevel); + } + } else { + memLevel = Z_DEFAULT_MEMLEVEL; } - } - if (opts.strategy !== undefined && isInvalidStrategy(opts.strategy)) - throw new TypeError('Invalid strategy: ' + opts.strategy); + strategy = opts.strategy; + if (strategy !== undefined && strategy === strategy) { + if (strategy < Z_DEFAULT_STRATEGY || strategy > Z_FIXED || + !Number.isFinite(strategy)) { + throw new TypeError('Invalid strategy: ' + strategy); + } + } else { + strategy = Z_DEFAULT_STRATEGY; + } - if (opts.dictionary !== undefined) { - if (!ArrayBuffer.isView(opts.dictionary)) { + dictionary = opts.dictionary; + if (dictionary !== undefined && !ArrayBuffer.isView(dictionary)) { throw new TypeError( 'Invalid dictionary: it should be a Buffer, TypedArray, or DataView'); } - } + if (opts.encoding || opts.objectMode || opts.writableObjectMode) { + opts = _extend({}, opts); + opts.encoding = null; + opts.objectMode = false; + opts.writableObjectMode = false; + } + } + this.bytesRead = 0; this._handle = new binding.Zlib(mode); - this._handle.onerror = zlibOnError.bind(this); + this._handle.jsref = this; // Used by processCallback() and zlibOnError() + this._handle.onerror = zlibOnError; this._hadError = false; + this._writeState = new Uint32Array(2); - var level = constants.Z_DEFAULT_COMPRESSION; - if (Number.isFinite(opts.level)) { - level = opts.level; - } - - var strategy = constants.Z_DEFAULT_STRATEGY; - if (Number.isFinite(opts.strategy)) { - strategy = opts.strategy; - } - - var windowBits = constants.Z_DEFAULT_WINDOWBITS; - if (Number.isFinite(opts.windowBits)) { - windowBits = opts.windowBits; - } - - var memLevel = constants.Z_DEFAULT_MEMLEVEL; - if (Number.isFinite(opts.memLevel)) { - memLevel = opts.memLevel; - } + this._handle.init(windowBits, level, memLevel, strategy, this._writeState, + processCallback, dictionary); - this._handle.init(windowBits, - level, - memLevel, - strategy, - opts.dictionary); - - this._buffer = Buffer.allocUnsafe(this._chunkSize); - this._offset = 0; + this._outBuffer = Buffer.allocUnsafe(chunkSize); + this._outOffset = 0; this._level = level; this._strategy = strategy; - + this._chunkSize = chunkSize; + this._flushFlag = flush; + this._origFlushFlag = flush; + this._finishFlushFlag = finishFlush; + this._info = opts && opts.info; this.once('end', this.close); } inherits(Zlib, Transform); @@ -274,15 +270,17 @@ Object.defineProperty(Zlib.prototype, '_closed', { }); Zlib.prototype.params = function params(level, strategy, callback) { - if (level < constants.Z_MIN_LEVEL || - level > constants.Z_MAX_LEVEL) { + if (level < Z_MIN_LEVEL || level > Z_MAX_LEVEL) throw new RangeError('Invalid compression level: ' + level); - } - if (isInvalidStrategy(strategy)) + + if (strategy !== undefined && + (strategy < Z_DEFAULT_STRATEGY || strategy > Z_FIXED || + !Number.isFinite(strategy))) { throw new TypeError('Invalid strategy: ' + strategy); + } if (this._level !== level || this._strategy !== strategy) { - this.flush(constants.Z_SYNC_FLUSH, + this.flush(Z_SYNC_FLUSH, flushCallback.bind(this, level, strategy, callback)); } else { process.nextTick(callback); @@ -290,7 +288,8 @@ Zlib.prototype.params = function params(level, strategy, callback) { }; Zlib.prototype.reset = function reset() { - assert(this._handle, 'zlib binding closed'); + if (!this._handle) + assert(false, 'zlib binding closed'); return this._handle.reset(); }; @@ -305,7 +304,7 @@ Zlib.prototype.flush = function flush(kind, callback) { if (typeof kind === 'function' || (kind === undefined && !callback)) { callback = kind; - kind = constants.Z_FULL_FLUSH; + kind = Z_FULL_FLUSH; } if (ws.ended) { @@ -331,160 +330,192 @@ Zlib.prototype.close = function close(callback) { }; Zlib.prototype._transform = function _transform(chunk, encoding, cb) { - var flushFlag; - var ws = this._writableState; - var ending = ws.ending || ws.ended; - var last = ending && (!chunk || ws.length === chunk.byteLength); - - if (chunk !== null && !ArrayBuffer.isView(chunk)) - return cb(new TypeError('invalid input')); - - if (!this._handle) - return cb(new Error('zlib binding closed')); - // If it's the last chunk, or a final flush, we use the Z_FINISH flush flag // (or whatever flag was provided using opts.finishFlush). // If it's explicitly flushing at some other time, then we use - // Z_FULL_FLUSH. Otherwise, use Z_NO_FLUSH for maximum compression - // goodness. - if (last) + // Z_FULL_FLUSH. Otherwise, use the original opts.flush flag. + var flushFlag; + var ws = this._writableState; + if ((ws.ending || ws.ended) && ws.length === chunk.byteLength) { flushFlag = this._finishFlushFlag; - else { + } else { flushFlag = this._flushFlag; // once we've flushed the last of the queue, stop flushing and // go back to the normal behavior. - if (chunk.byteLength >= ws.length) { - this._flushFlag = this._opts.flush || constants.Z_NO_FLUSH; - } + if (chunk.byteLength >= ws.length) + this._flushFlag = this._origFlushFlag; } - - this._processChunk(chunk, flushFlag, cb); + processChunk(this, chunk, flushFlag, cb); }; Zlib.prototype._processChunk = function _processChunk(chunk, flushFlag, cb) { - var availInBefore = chunk && chunk.byteLength; - var availOutBefore = this._chunkSize - this._offset; - var inOff = 0; - - var self = this; - - var async = typeof cb === 'function'; - - if (!async) { - var buffers = []; - var nread = 0; + // _processChunk() is left for backwards compatibility + if (typeof cb === 'function') + processChunk(this, chunk, flushFlag, cb); + else + return processChunkSync(this, chunk, flushFlag); +}; - var error; - this.on('error', function(er) { - error = er; - }); +function processChunkSync(self, chunk, flushFlag) { + var availInBefore = chunk.byteLength; + var availOutBefore = self._chunkSize - self._outOffset; + var inOff = 0; + var availOutAfter; + var availInAfter; - assert(this._handle, 'zlib binding closed'); - do { - var res = this._handle.writeSync(flushFlag, - chunk, // in - inOff, // in_off - availInBefore, // in_len - this._buffer, // out - this._offset, //out_off - availOutBefore); // out_len - } while (!this._hadError && callback(res[0], res[1])); + var buffers = null; + var nread = 0; + var inputRead = 0; + var state = self._writeState; + var handle = self._handle; + var buffer = self._outBuffer; + var offset = self._outOffset; + var chunkSize = self._chunkSize; + + var error; + self.on('error', function(er) { + error = er; + }); - if (this._hadError) { + while (true) { + handle.writeSync(flushFlag, + chunk, // in + inOff, // in_off + availInBefore, // in_len + buffer, // out + offset, // out_off + availOutBefore); // out_len + if (error) throw error; - } - - if (nread >= kMaxLength) { - _close(this); - throw new RangeError(kRangeErrorMessage); - } - var buf = Buffer.concat(buffers, nread); - _close(this); + availOutAfter = state[0]; + availInAfter = state[1]; - return buf; - } - - assert(this._handle, 'zlib binding closed'); - var req = this._handle.write(flushFlag, - chunk, // in - inOff, // in_off - availInBefore, // in_len - this._buffer, // out - this._offset, //out_off - availOutBefore); // out_len - - req.buffer = chunk; - req.callback = callback; - - function callback(availInAfter, availOutAfter) { - // When the callback is used in an async write, the callback's - // context is the `req` object that was created. The req object - // is === this._handle, and that's why it's important to null - // out the values after they are done being used. `this._handle` - // can stay in memory longer than the callback and buffer are needed. - if (this) { - this.buffer = null; - this.callback = null; - } - - if (self._hadError) - return; + var inDelta = (availInBefore - availInAfter); + inputRead += inDelta; var have = availOutBefore - availOutAfter; - assert(have >= 0, 'have should not go down'); - - self.bytesRead += availInBefore - availInAfter; - if (have > 0) { - var out = self._buffer.slice(self._offset, self._offset + have); - self._offset += have; - // serve some output to the consumer. - if (async) { - self.push(out); - } else { + var out = buffer.slice(offset, offset + have); + offset += have; + if (!buffers) + buffers = [out]; + else buffers.push(out); - nread += out.byteLength; - } + nread += out.byteLength; + } else if (have < 0) { + assert(false, 'have should not go down'); } // exhausted the output buffer, or used all the input create a new one. - if (availOutAfter === 0 || self._offset >= self._chunkSize) { - availOutBefore = self._chunkSize; - self._offset = 0; - self._buffer = Buffer.allocUnsafe(self._chunkSize); + if (availOutAfter === 0 || offset >= chunkSize) { + availOutBefore = chunkSize; + offset = 0; + buffer = Buffer.allocUnsafe(chunkSize); } if (availOutAfter === 0) { - // Not actually done. Need to reprocess. + // Not actually done. Need to reprocess. // Also, update the availInBefore to the availInAfter value, // so that if we have to hit it a third (fourth, etc.) time, // it'll have the correct byte counts. - inOff += (availInBefore - availInAfter); + inOff += inDelta; availInBefore = availInAfter; - - if (!async) - return true; - - var newReq = self._handle.write(flushFlag, - chunk, - inOff, - availInBefore, - self._buffer, - self._offset, - self._chunkSize); - newReq.callback = callback; // this same function - newReq.buffer = chunk; - return; + } else { + break; } + } - if (!async) - return false; + self.bytesRead = inputRead; - // finished with the chunk. - cb(); + if (nread >= kMaxLength) { + _close(self); + throw new RangeError(kRangeErrorMessage); + } + + _close(self); + + return (buffers.length === 1 ? buffers[0] : Buffer.concat(buffers, nread)); +} + +function processChunk(self, chunk, flushFlag, cb) { + var handle = self._handle; + if (!handle) + return cb(new Error('zlib binding closed')); + + handle.buffer = chunk; + handle.cb = cb; + handle.availOutBefore = self._chunkSize - self._outOffset; + handle.availInBefore = chunk.byteLength; + handle.inOff = 0; + handle.flushFlag = flushFlag; + + handle.write(flushFlag, + chunk, // in + 0, // in_off + handle.availInBefore, // in_len + self._outBuffer, // out + self._outOffset, // out_off + handle.availOutBefore); // out_len +} + +function processCallback() { + // This callback's context (`this`) is the `_handle` (ZCtx) object. It is + // important to null out the values once they are no longer needed since + // `_handle` can stay in memory long after the buffer is needed. + var handle = this; + var self = this.jsref; + var state = self._writeState; + + if (self._hadError) { + this.buffer = null; + return; + } + + var availOutAfter = state[0]; + var availInAfter = state[1]; + + var inDelta = (handle.availInBefore - availInAfter); + self.bytesRead += inDelta; + + var have = handle.availOutBefore - availOutAfter; + if (have > 0) { + var out = self._outBuffer.slice(self._outOffset, self._outOffset + have); + self._outOffset += have; + self.push(out); + } else if (have < 0) { + assert(false, 'have should not go down'); } -}; + + // exhausted the output buffer, or used all the input create a new one. + if (availOutAfter === 0 || self._outOffset >= self._chunkSize) { + handle.availOutBefore = self._chunkSize; + self._outOffset = 0; + self._outBuffer = Buffer.allocUnsafe(self._chunkSize); + } + + if (availOutAfter === 0) { + // Not actually done. Need to reprocess. + // Also, update the availInBefore to the availInAfter value, + // so that if we have to hit it a third (fourth, etc.) time, + // it'll have the correct byte counts. + handle.inOff += inDelta; + handle.availInBefore = availInAfter; + + this.write(handle.flushFlag, + this.buffer, // in + handle.inOff, // in_off + handle.availInBefore, // in_len + self._outBuffer, // out + self._outOffset, // out_off + self._chunkSize); // out_len + return; + } + + // finished with the chunk. + this.buffer = null; + this.cb(); +} function _close(engine, callback) { if (callback) @@ -507,56 +538,56 @@ function emitCloseNT(self) { function Deflate(opts) { if (!(this instanceof Deflate)) return new Deflate(opts); - Zlib.call(this, opts, constants.DEFLATE); + Zlib.call(this, opts, DEFLATE); } inherits(Deflate, Zlib); function Inflate(opts) { if (!(this instanceof Inflate)) return new Inflate(opts); - Zlib.call(this, opts, constants.INFLATE); + Zlib.call(this, opts, INFLATE); } inherits(Inflate, Zlib); function Gzip(opts) { if (!(this instanceof Gzip)) return new Gzip(opts); - Zlib.call(this, opts, constants.GZIP); + Zlib.call(this, opts, GZIP); } inherits(Gzip, Zlib); function Gunzip(opts) { if (!(this instanceof Gunzip)) return new Gunzip(opts); - Zlib.call(this, opts, constants.GUNZIP); + Zlib.call(this, opts, GUNZIP); } inherits(Gunzip, Zlib); function DeflateRaw(opts) { if (!(this instanceof DeflateRaw)) return new DeflateRaw(opts); - Zlib.call(this, opts, constants.DEFLATERAW); + Zlib.call(this, opts, DEFLATERAW); } inherits(DeflateRaw, Zlib); function InflateRaw(opts) { if (!(this instanceof InflateRaw)) return new InflateRaw(opts); - Zlib.call(this, opts, constants.INFLATERAW); + Zlib.call(this, opts, INFLATERAW); } inherits(InflateRaw, Zlib); function Unzip(opts) { if (!(this instanceof Unzip)) return new Unzip(opts); - Zlib.call(this, opts, constants.UNZIP); + Zlib.call(this, opts, UNZIP); } inherits(Unzip, Zlib); -function createConvenienceMethod(type, sync) { +function createConvenienceMethod(ctor, sync) { if (sync) { return function(buffer, opts) { - return zlibBufferSync(new type(opts), buffer); + return zlibBufferSync(new ctor(opts), buffer); }; } else { return function(buffer, opts, callback) { @@ -564,16 +595,18 @@ function createConvenienceMethod(type, sync) { callback = opts; opts = {}; } - return zlibBuffer(new type(opts), buffer, callback); + return zlibBuffer(new ctor(opts), buffer, callback); }; } } -function createProperty(type) { +function createProperty(ctor) { return { configurable: true, enumerable: true, - value: type + value: function(options) { + return new ctor(options); + } }; } @@ -605,13 +638,13 @@ module.exports = { }; Object.defineProperties(module.exports, { - createDeflate: createProperty(module.exports.Deflate), - createInflate: createProperty(module.exports.Inflate), - createDeflateRaw: createProperty(module.exports.DeflateRaw), - createInflateRaw: createProperty(module.exports.InflateRaw), - createGzip: createProperty(module.exports.Gzip), - createGunzip: createProperty(module.exports.Gunzip), - createUnzip: createProperty(module.exports.Unzip), + createDeflate: createProperty(Deflate), + createInflate: createProperty(Inflate), + createDeflateRaw: createProperty(DeflateRaw), + createInflateRaw: createProperty(InflateRaw), + createGzip: createProperty(Gzip), + createGunzip: createProperty(Gunzip), + createUnzip: createProperty(Unzip), constants: { configurable: false, enumerable: true, diff --git a/src/node_constants.cc b/src/node_constants.cc index 0e178602b73742..263909f8cd343b 100644 --- a/src/node_constants.cc +++ b/src/node_constants.cc @@ -1228,22 +1228,6 @@ void DefineZlibConstants(Local target) { NODE_DEFINE_CONSTANT(target, INFLATERAW); NODE_DEFINE_CONSTANT(target, UNZIP); -#define Z_MIN_WINDOWBITS 8 -#define Z_MAX_WINDOWBITS 15 -#define Z_DEFAULT_WINDOWBITS 15 -// Fewer than 64 bytes per chunk is not recommended. -// Technically it could work with as few as 8, but even 64 bytes -// is low. Usually a MB or more is best. -#define Z_MIN_CHUNK 64 -#define Z_MAX_CHUNK std::numeric_limits::infinity() -#define Z_DEFAULT_CHUNK (16 * 1024) -#define Z_MIN_MEMLEVEL 1 -#define Z_MAX_MEMLEVEL 9 -#define Z_DEFAULT_MEMLEVEL 8 -#define Z_MIN_LEVEL -1 -#define Z_MAX_LEVEL 9 -#define Z_DEFAULT_LEVEL Z_DEFAULT_COMPRESSION - NODE_DEFINE_CONSTANT(target, Z_MIN_WINDOWBITS); NODE_DEFINE_CONSTANT(target, Z_MAX_WINDOWBITS); NODE_DEFINE_CONSTANT(target, Z_DEFAULT_WINDOWBITS); diff --git a/src/node_internals.h b/src/node_internals.h index a08ab45affe96e..d857f3d4a3d5af 100644 --- a/src/node_internals.h +++ b/src/node_internals.h @@ -36,6 +36,23 @@ #include +// Custom constants used by both node_constants.cc and node_zlib.cc +#define Z_MIN_WINDOWBITS 8 +#define Z_MAX_WINDOWBITS 15 +#define Z_DEFAULT_WINDOWBITS 15 +// Fewer than 64 bytes per chunk is not recommended. +// Technically it could work with as few as 8, but even 64 bytes +// is low. Usually a MB or more is best. +#define Z_MIN_CHUNK 64 +#define Z_MAX_CHUNK std::numeric_limits::infinity() +#define Z_DEFAULT_CHUNK (16 * 1024) +#define Z_MIN_MEMLEVEL 1 +#define Z_MAX_MEMLEVEL 9 +#define Z_DEFAULT_MEMLEVEL 8 +#define Z_MIN_LEVEL -1 +#define Z_MAX_LEVEL 9 +#define Z_DEFAULT_LEVEL Z_DEFAULT_COMPRESSION + struct sockaddr; // Variation on NODE_DEFINE_CONSTANT that sets a String value. diff --git a/src/node_zlib.cc b/src/node_zlib.cc index 4495eb2bcad255..07cd526b1ea8b9 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -40,14 +40,17 @@ namespace node { using v8::Array; +using v8::ArrayBuffer; using v8::Context; +using v8::Function; using v8::FunctionCallbackInfo; using v8::FunctionTemplate; using v8::HandleScope; -using v8::Integer; using v8::Local; using v8::Number; using v8::Object; +using v8::Persistent; +using v8::Uint32Array; using v8::Value; namespace { @@ -86,7 +89,8 @@ class ZCtx : public AsyncWrap { write_in_progress_(false), pending_close_(false), refs_(0), - gzip_id_bytes_read_(0) { + gzip_id_bytes_read_(0), + write_result_(nullptr) { MakeWeak(this); Wrap(wrap, this); } @@ -200,38 +204,19 @@ class ZCtx : public AsyncWrap { if (!async) { // sync version - ctx->env()->PrintSyncTrace(); + env->PrintSyncTrace(); Process(work_req); - if (CheckError(ctx)) - AfterSync(ctx, args); + if (CheckError(ctx)) { + ctx->write_result_[0] = ctx->strm_.avail_out; + ctx->write_result_[1] = ctx->strm_.avail_in; + ctx->write_in_progress_ = false; + ctx->Unref(); + } return; } // async version - uv_queue_work(ctx->env()->event_loop(), - work_req, - ZCtx::Process, - ZCtx::After); - - args.GetReturnValue().Set(ctx->object()); - } - - - static void AfterSync(ZCtx* ctx, const FunctionCallbackInfo& args) { - Environment* env = ctx->env(); - Local avail_out = Integer::New(env->isolate(), - ctx->strm_.avail_out); - Local avail_in = Integer::New(env->isolate(), - ctx->strm_.avail_in); - - ctx->write_in_progress_ = false; - - Local result = Array::New(env->isolate(), 2); - result->Set(0, avail_in); - result->Set(1, avail_out); - args.GetReturnValue().Set(result); - - ctx->Unref(); + uv_queue_work(env->event_loop(), work_req, ZCtx::Process, ZCtx::After); } @@ -389,16 +374,14 @@ class ZCtx : public AsyncWrap { if (!CheckError(ctx)) return; - Local avail_out = Integer::New(env->isolate(), - ctx->strm_.avail_out); - Local avail_in = Integer::New(env->isolate(), - ctx->strm_.avail_in); - + ctx->write_result_[0] = ctx->strm_.avail_out; + ctx->write_result_[1] = ctx->strm_.avail_in; ctx->write_in_progress_ = false; // call the write() cb - Local args[2] = { avail_in, avail_out }; - ctx->MakeCallback(env->callback_string(), arraysize(args), args); + Local cb = PersistentToLocal(env->isolate(), + ctx->write_js_callback_); + ctx->MakeCallback(cb, 0, nullptr); ctx->Unref(); if (ctx->pending_close_) @@ -447,41 +430,51 @@ class ZCtx : public AsyncWrap { // just pull the ints out of the args and call the other Init static void Init(const FunctionCallbackInfo& args) { - CHECK((args.Length() == 4 || args.Length() == 5) && - "init(windowBits, level, memLevel, strategy, [dictionary])"); + CHECK(args.Length() == 7 && + "init(windowBits, level, memLevel, strategy, writeResult, writeCallback," + " dictionary)"); ZCtx* ctx; ASSIGN_OR_RETURN_UNWRAP(&ctx, args.Holder()); int windowBits = args[0]->Uint32Value(); - CHECK((windowBits >= 8 && windowBits <= 15) && "invalid windowBits"); + CHECK((windowBits >= Z_MIN_WINDOWBITS && windowBits <= Z_MAX_WINDOWBITS) && + "invalid windowBits"); int level = args[1]->Int32Value(); - CHECK((level >= -1 && level <= 9) && "invalid compression level"); + CHECK((level >= Z_MIN_LEVEL && level <= Z_MAX_LEVEL) && + "invalid compression level"); int memLevel = args[2]->Uint32Value(); - CHECK((memLevel >= 1 && memLevel <= 9) && "invalid memlevel"); + CHECK((memLevel >= Z_MIN_MEMLEVEL && memLevel <= Z_MAX_MEMLEVEL) && + "invalid memlevel"); int strategy = args[3]->Uint32Value(); CHECK((strategy == Z_FILTERED || - strategy == Z_HUFFMAN_ONLY || - strategy == Z_RLE || - strategy == Z_FIXED || - strategy == Z_DEFAULT_STRATEGY) && "invalid strategy"); + strategy == Z_HUFFMAN_ONLY || + strategy == Z_RLE || + strategy == Z_FIXED || + strategy == Z_DEFAULT_STRATEGY) && "invalid strategy"); + + CHECK(args[4]->IsUint32Array()); + Local array = args[4].As(); + Local ab = array->Buffer(); + uint32_t* write_result = static_cast(ab->GetContents().Data()); + + Local write_js_callback = args[5].As(); char* dictionary = nullptr; size_t dictionary_len = 0; - if (args.Length() >= 5 && Buffer::HasInstance(args[4])) { - Local dictionary_ = args[4]->ToObject(args.GetIsolate()); + if (Buffer::HasInstance(args[6])) { + const char* dictionary_ = Buffer::Data(args[6]); + dictionary_len = Buffer::Length(args[6]); - dictionary_len = Buffer::Length(dictionary_); dictionary = new char[dictionary_len]; - - memcpy(dictionary, Buffer::Data(dictionary_), dictionary_len); + memcpy(dictionary, dictionary_, dictionary_len); } - Init(ctx, level, windowBits, memLevel, strategy, - dictionary, dictionary_len); + Init(ctx, level, windowBits, memLevel, strategy, write_result, + write_js_callback, dictionary, dictionary_len); SetDictionary(ctx); } @@ -500,7 +493,9 @@ class ZCtx : public AsyncWrap { } static void Init(ZCtx *ctx, int level, int windowBits, int memLevel, - int strategy, char* dictionary, size_t dictionary_len) { + int strategy, uint32_t* write_result, + Local write_js_callback, char* dictionary, + size_t dictionary_len) { ctx->level_ = level; ctx->windowBits_ = windowBits; ctx->memLevel_ = memLevel; @@ -564,6 +559,9 @@ class ZCtx : public AsyncWrap { } ctx->env()->ThrowError("Init error"); } + + ctx->write_result_ = write_result; + ctx->write_js_callback_.Reset(ctx->env()->isolate(), write_js_callback); } static void SetDictionary(ZCtx* ctx) { @@ -670,6 +668,8 @@ class ZCtx : public AsyncWrap { bool pending_close_; unsigned int refs_; unsigned int gzip_id_bytes_read_; + uint32_t* write_result_; + Persistent write_js_callback_; }; diff --git a/test/async-hooks/test-zlib.zlib-binding.deflate.js b/test/async-hooks/test-zlib.zlib-binding.deflate.js index bf991cfbabb73f..306e3fac084e1f 100644 --- a/test/async-hooks/test-zlib.zlib-binding.deflate.js +++ b/test/async-hooks/test-zlib.zlib-binding.deflate.js @@ -26,6 +26,8 @@ handle.init( constants.Z_MIN_LEVEL, constants.Z_DEFAULT_MEMLEVEL, constants.Z_DEFAULT_STRATEGY, + new Uint32Array(2), + function processCallback() { this.cb(); }, Buffer.from('') ); checkInvocations(hdl, { init: 1 }, 'when initialized handle'); @@ -34,7 +36,7 @@ const inBuf = Buffer.from('x'); const outBuf = Buffer.allocUnsafe(1); let count = 2; -handle.callback = common.mustCall(onwritten, 2); +handle.cb = common.mustCall(onwritten, 2); handle.write(true, inBuf, 0, 1, outBuf, 0, 1); checkInvocations(hdl, { init: 1 }, 'when invoked write() on handle');