From 51b34e8e3f994347a907d941fefda90b788c4245 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sun, 30 Sep 2018 16:13:07 -0400 Subject: [PATCH] zlib: simplify flushing mechanism MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, flushing on zlib streams was implemented through stream 'drain' handlers. This has a number of downsides; in particular, it is complex, and could lead to unpredictable behaviour, since it meant that in a sequence like ```js compressor.write('abc'); compressor.flush(); waitForMoreDataAsynchronously(() => { compressor.write('def'); }); ``` it was not fully deterministic whether the flush happens after the second chunk is written or the first one. This commit replaces this mechanism by one that piggy-backs along the stream’s write queue, using a “special” `Buffer` instance that signals that a flush is currently due. --- lib/zlib.js | 48 ++++++++------------ test/parallel/test-zlib-flush-drain.js | 2 +- test/parallel/test-zlib-write-after-flush.js | 2 +- 3 files changed, 21 insertions(+), 31 deletions(-) diff --git a/lib/zlib.js b/lib/zlib.js index 68d06fa93fbaa2..5d5ef3e083ce9c 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -311,10 +311,9 @@ function Zlib(opts, mode) { this._level = level; this._strategy = strategy; this._chunkSize = chunkSize; - this._flushFlag = flush; - this._scheduledFlushFlag = Z_NO_FLUSH; - this._origFlushFlag = flush; + this._defaultFlushFlag = flush; this._finishFlushFlag = finishFlush; + this._nextFlush = -1; this._info = opts && opts.info; this.once('end', this.close); } @@ -398,6 +397,7 @@ function maxFlush(a, b) { return flushiness[a] > flushiness[b] ? a : b; } +const flushBuffer = Buffer.alloc(0); Zlib.prototype.flush = function flush(kind, callback) { var ws = this._writableState; @@ -412,21 +412,13 @@ Zlib.prototype.flush = function flush(kind, callback) { } else if (ws.ending) { if (callback) this.once('end', callback); - } else if (ws.needDrain) { - const alreadyHadFlushScheduled = this._scheduledFlushFlag !== Z_NO_FLUSH; - this._scheduledFlushFlag = maxFlush(kind, this._scheduledFlushFlag); - - // If a callback was passed, always register a new `drain` + flush handler, - // mostly because that's simpler and flush callbacks piling up is a rare - // thing anyway. - if (!alreadyHadFlushScheduled || callback) { - const drainHandler = () => this.flush(this._scheduledFlushFlag, callback); - this.once('drain', drainHandler); - } + } else if (this._nextFlush !== -1) { + // This means that there is a flush currently in the write queue. + // We currently coalesce this flush into the pending one. + this._nextFlush = maxFlush(this._nextFlush, kind); } else { - this._flushFlag = kind; - this.write(Buffer.alloc(0), '', callback); - this._scheduledFlushFlag = Z_NO_FLUSH; + this._nextFlush = kind; + this.write(flushBuffer, '', callback); } }; @@ -436,20 +428,18 @@ Zlib.prototype.close = function close(callback) { }; Zlib.prototype._transform = function _transform(chunk, encoding, cb) { - // If it's the last chunk, or a final flush, we use the Z_FINISH flush flag - // (or whatever flag was provided using opts.finishFlush). - // If it's explicitly flushing at some other time, then we use - // Z_FULL_FLUSH. Otherwise, use the original opts.flush flag. - var flushFlag; + var flushFlag = this._defaultFlushFlag; + // We use a 'fake' zero-length chunk to carry information about flushes from + // the public API to the actual stream implementation. + if (chunk === flushBuffer) { + flushFlag = this._nextFlush; + this._nextFlush = -1; + } + + // For the last chunk, also apply `_finishFlushFlag`. var ws = this._writableState; if ((ws.ending || ws.ended) && ws.length === chunk.byteLength) { - flushFlag = this._finishFlushFlag; - } else { - flushFlag = this._flushFlag; - // once we've flushed the last of the queue, stop flushing and - // go back to the normal behavior. - if (chunk.byteLength >= ws.length) - this._flushFlag = this._origFlushFlag; + flushFlag = maxFlush(flushFlag, this._finishFlushFlag); } processChunk(this, chunk, flushFlag, cb); }; diff --git a/test/parallel/test-zlib-flush-drain.js b/test/parallel/test-zlib-flush-drain.js index 0619eecf3ce669..a470e32090f084 100644 --- a/test/parallel/test-zlib-flush-drain.js +++ b/test/parallel/test-zlib-flush-drain.js @@ -44,5 +44,5 @@ process.once('exit', function() { assert.strictEqual( drainCount, 1); assert.strictEqual( - flushCount, 2); + flushCount, 1); }); diff --git a/test/parallel/test-zlib-write-after-flush.js b/test/parallel/test-zlib-write-after-flush.js index 2ba6ba4550f942..6d8d787343426f 100644 --- a/test/parallel/test-zlib-write-after-flush.js +++ b/test/parallel/test-zlib-write-after-flush.js @@ -35,7 +35,7 @@ gunz.setEncoding('utf8'); gunz.on('data', (c) => output += c); gunz.on('end', common.mustCall(() => { assert.strictEqual(output, input); - assert.strictEqual(gzip._flushFlag, zlib.constants.Z_NO_FLUSH); + assert.strictEqual(gzip._nextFlush, -1); })); // make sure that flush/write doesn't trigger an assert failure