From 2bc17a5c4a623e1eb088f3d2249410a45a8bca7c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 20 Jun 2020 00:01:09 +0200 Subject: [PATCH] fs: remove custom Buffer pool for streams The performance benefit of using a custom pool are negligable. Furthermore, it causes problems with Workers and transferrable. Rather than further adding complexity for compat with Workers, just remove the pooling logic. Refs: https://github.com/nodejs/node/issues/33880#issuecomment-644430693 Fixes: https://github.com/nodejs/node/issues/31733 --- lib/internal/fs/streams.js | 100 ++++++------------ .../test-crypto-authenticated-stream.js | 2 - 2 files changed, 30 insertions(+), 72 deletions(-) rename test/{known_issues => parallel}/test-crypto-authenticated-stream.js (99%) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 9e6050139dc79a..be79f57577c6f0 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -26,29 +26,8 @@ const { toPathIfFileURL } = require('internal/url'); const kIoDone = Symbol('kIoDone'); const kIsPerformingIO = Symbol('kIsPerformingIO'); -const kMinPoolSpace = 128; const kFs = Symbol('kFs'); -let pool; -// It can happen that we expect to read a large chunk of data, and reserve -// a large chunk of the pool accordingly, but the read() call only filled -// a portion of it. If a concurrently executing read() then uses the same pool, -// the "reserved" portion cannot be used, so we allow it to be re-used as a -// new pool later. -const poolFragments = []; - -function allocNewPool(poolSize) { - if (poolFragments.length > 0) - pool = poolFragments.pop(); - else - pool = Buffer.allocUnsafe(poolSize); - pool.used = 0; -} - -function roundUpToMultipleOf8(n) { - return (n + 7) & ~7; // Align to 8 byte boundary. -} - function _construct(callback) { const stream = this; if (typeof stream.fd === 'number') { @@ -188,70 +167,51 @@ ReadStream.prototype.open = openReadFs; ReadStream.prototype._construct = _construct; ReadStream.prototype._read = function(n) { - if (!pool || pool.length - pool.used < kMinPoolSpace) { - // Discard the old pool. - allocNewPool(this.readableHighWaterMark); - } + n = this.pos !== undefined ? + MathMin(this.end - this.pos + 1, n) : + MathMin(this.end - this.bytesRead + 1, n); - // Grab another reference to the pool in the case that while we're - // in the thread pool another read() finishes up the pool, and - // allocates a new one. - const thisPool = pool; - let toRead = MathMin(pool.length - pool.used, n); - const start = pool.used; - - if (this.pos !== undefined) - toRead = MathMin(this.end - this.pos + 1, toRead); - else - toRead = MathMin(this.end - this.bytesRead + 1, toRead); + if (n <= 0) { + this.push(null); + return; + } - // Already read everything we were supposed to read! - // treat as EOF. - if (toRead <= 0) - return this.push(null); + const buf = Buffer.allocUnsafeSlow(n); - // the actual read. this[kIsPerformingIO] = true; this[kFs] - .read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { + .read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => { this[kIsPerformingIO] = false; + // Tell ._destroy() that it's safe to close the fd now. - if (this.destroyed) return this.emit(kIoDone, er); + if (this.destroyed) { + this.emit(kIoDone, er); + return; + } if (er) { errorOrDestroy(this, er); - } else { - let b = null; - // Now that we know how much data we have actually read, re-wind the - // 'used' field if we can, and otherwise allow the remainder of our - // reservation to be used as a new pool later. - if (start + toRead === thisPool.used && thisPool === pool) { - const newUsed = thisPool.used + bytesRead - toRead; - thisPool.used = roundUpToMultipleOf8(newUsed); - } else { - // Round down to the next lowest multiple of 8 to ensure the new pool - // fragment start and end positions are aligned to an 8 byte boundary. - const alignedEnd = (start + toRead) & ~7; - const alignedStart = roundUpToMultipleOf8(start + bytesRead); - if (alignedEnd - alignedStart >= kMinPoolSpace) { - poolFragments.push(thisPool.slice(alignedStart, alignedEnd)); - } - } - - if (bytesRead > 0) { - this.bytesRead += bytesRead; - b = thisPool.slice(start, start + bytesRead); + } else if (bytesRead > 0) { + this.bytesRead += bytesRead; + + if (bytesRead !== buf.length) { + // Slow path. Shrink to fit. + // Copy instead of slice so that we don't retain + // large backing buffer for small reads. + const dst = Buffer.allocUnsafeSlow(bytesRead); + buf.copy(dst, 0, 0, bytesRead); + buf = dst; } - this.push(b); + this.push(buf); + } else { + this.push(null); } }); - // Move the pool positions, and internal position for reading. - if (this.pos !== undefined) - this.pos += toRead; - - pool.used = roundUpToMultipleOf8(pool.used + toRead); + if (this.pos !== undefined) { + this.pos += n; + } }; ReadStream.prototype._destroy = function(err, cb) { diff --git a/test/known_issues/test-crypto-authenticated-stream.js b/test/parallel/test-crypto-authenticated-stream.js similarity index 99% rename from test/known_issues/test-crypto-authenticated-stream.js rename to test/parallel/test-crypto-authenticated-stream.js index 1e2a9edd7b0a6e..7e119f221daf71 100644 --- a/test/known_issues/test-crypto-authenticated-stream.js +++ b/test/parallel/test-crypto-authenticated-stream.js @@ -121,7 +121,6 @@ function test(config) { tmpdir.refresh(); -// OK test({ cipher: 'aes-128-ccm', aad: Buffer.alloc(1), @@ -131,7 +130,6 @@ test({ plaintextLength: 32768, }); -// Fails the fstream test. test({ cipher: 'aes-128-ccm', aad: Buffer.alloc(1),