From d30283497a1d08bc0484739a5dd48e68cca29dd5 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 23 Oct 2023 17:03:00 +0200 Subject: [PATCH] streams: use Array for Readable buffer --- benchmark/streams/readable-bigread.js | 2 +- benchmark/streams/readable-readall.js | 2 +- lib/internal/streams/readable.js | 138 +++++++++++++++--- .../test-stream2-readable-from-list.js | 101 ------------- 4 files changed, 118 insertions(+), 125 deletions(-) delete mode 100644 test/parallel/test-stream2-readable-from-list.js diff --git a/benchmark/streams/readable-bigread.js b/benchmark/streams/readable-bigread.js index 0d963c6803299e..8b36f49d7ffa53 100644 --- a/benchmark/streams/readable-bigread.js +++ b/benchmark/streams/readable-bigread.js @@ -15,7 +15,7 @@ function main({ n }) { bench.start(); for (let k = 0; k < n; ++k) { - for (let i = 0; i < 1e4; ++i) + for (let i = 0; i < 1e3; ++i) s.push(b); while (s.read(128)); } diff --git a/benchmark/streams/readable-readall.js b/benchmark/streams/readable-readall.js index b62b1d05b13876..c1a015a4d02716 100644 --- a/benchmark/streams/readable-readall.js +++ b/benchmark/streams/readable-readall.js @@ -15,7 +15,7 @@ function main({ n }) { bench.start(); for (let k = 0; k < n; ++k) { - for (let i = 0; i < 1e4; ++i) + for (let i = 0; i < 1e3; ++i) s.push(b); while (s.read()); } diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index a129b1b6f4b75d..ca08a4b4aeb3ea 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -34,6 +34,7 @@ const { SymbolAsyncDispose, SymbolAsyncIterator, Symbol, + TypedArrayPrototypeSet, } = primordials; module.exports = Readable; @@ -73,6 +74,7 @@ const { const { validateObject } = require('internal/validators'); const kState = Symbol('kState'); +const FastBuffer = Buffer[Symbol.species]; const { StringDecoder } = require('string_decoder'); const from = require('internal/streams/from'); @@ -278,7 +280,8 @@ function ReadableState(options, stream, isDuplex) { // A linked list is used to store data chunks instead of an array because the // linked list can remove elements from the beginning faster than // array.shift(). - this.buffer = new BufferList(); + this.buffer = []; + this.bufferIndex = 0; this.length = 0; this.pipes = []; @@ -546,10 +549,19 @@ function addChunk(stream, state, chunk, addToFront) { } else { // Update the buffer info. state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length; - if (addToFront) + if (addToFront) { + state.buffer.splice(0, state.bufferIndex); + state.bufferIndex = 0; state.buffer.unshift(chunk); - else + + // if (state.bufferIndex > 0) { + // state.buffer[--state.bufferIndex] = chunk; + // } else { + // state.buffer.unshift(chunk); // Slow path + // } + } else { state.buffer.push(chunk); + } if ((state[kState] & kNeedReadable) !== 0) emitReadable(stream); @@ -564,21 +576,24 @@ Readable.prototype.isPaused = function() { // Backwards compatibility. Readable.prototype.setEncoding = function(enc) { + const state = this._readableState; + const decoder = new StringDecoder(enc); - this._readableState.decoder = decoder; + state.decoder = decoder; // If setEncoding(null), decoder.encoding equals utf8. - this._readableState.encoding = this._readableState.decoder.encoding; + state.encoding = state.decoder.encoding; - const buffer = this._readableState.buffer; // Iterate over current buffer to convert already stored Buffers: let content = ''; - for (const data of buffer) { + for (const data of state.buffer.slice(state.bufferIndex)) { content += decoder.write(data); } - buffer.clear(); + state.buffer.length = 0; + state.bufferIndex = 0; + if (content !== '') - buffer.push(content); - this._readableState.length = content.length; + state.buffer.push(content); + state.length = content.length; return this; }; @@ -611,7 +626,7 @@ function howMuchToRead(n, state) { if (NumberIsNaN(n)) { // Only flow one buffer at a time. if ((state[kState] & kFlowing) !== 0 && state.length) - return state.buffer.first().length; + return state.buffer[state.bufferIndex].length; return state.length; } if (n <= state.length) @@ -1549,21 +1564,100 @@ function fromList(n, state) { if (state.length === 0) return null; + let idx = state.bufferIndex; let ret; - if (state.objectMode) - ret = state.buffer.shift(); - else if (!n || n >= state.length) { + + const buf = state.buffer; + const len = buf.length; + + if ((state[kState] & kObjectMode) !== 0) { + ret = buf[idx]; + buf[idx++] = null; + } else if (!n || n >= state.length) { // Read it all, truncate the list. - if (state.decoder) - ret = state.buffer.join(''); - else if (state.buffer.length === 1) - ret = state.buffer.first(); - else - ret = state.buffer.concat(state.length); - state.buffer.clear(); + if ((state[kState] & kDecoder) !== 0) { + ret = '' + while (idx < len) { + ret += buf[idx]; + buf[idx++] = null; + } + } else if (len - idx === 0) { + ret = Buffer.alloc(0) + } else if (len - idx === 1) { + ret = buf[idx]; + buf[idx++] = null; + } else { + ret = Buffer.allocUnsafe(state.length); + + let i = 0; + while (idx < len) { + TypedArrayPrototypeSet(ret, buf[idx], i); + i += buf[idx].length; + buf[idx++] = null; + } + } } else { // read part of list. - ret = state.buffer.consume(n, state.decoder); + + if (n < buf[idx].length) { + // `slice` is the same for buffers and strings. + ret = buf[idx].slice(0, n); + buf[idx] = buf[idx].slice(n); + } else if (n === buf[idx].length) { + // First chunk is a perfect match. + ret = buf[idx]; + buf[idx++] = null; + } else if ((state[kState] & kDecoder) !== 0) { + ret = ''; + while (idx < len) { + const str = buf[idx]; + if (n > str.length) { + ret += str; + n -= str.length; + buf[idx++] = null; + } else { + if (n === buf.length) { + ret += str; + buf[idx++] = null; + } else { + ret += str.slice(0, n); + buf[idx] = str.slice(n); + } + break; + } + } + } else { + ret = Buffer.allocUnsafe(n); + + const retLen = n; + while (idx < len) { + const data = buf[idx]; + if (n > data.length) { + TypedArrayPrototypeSet(ret, data, retLen - n); + n -= data.length; + buf[idx++] = null; + } else { + if (n === data.length) { + TypedArrayPrototypeSet(ret, data, retLen - n); + buf[idx++] = null; + } else { + TypedArrayPrototypeSet(ret, new FastBuffer(data.buffer, data.byteOffset, n), retLen - n); + buf[idx] = new FastBuffer(data.buffer, data.byteOffset + n, data.length - n); + } + break; + } + } + } + } + + if (idx === buf.length) { + state.buffer.length = 0; + state.bufferIndex = 0 + } else if (idx > 1024) { + state.buffer.splice(0, idx); + state.bufferIndex = 0; + } else { + state.bufferIndex = idx; } return ret; diff --git a/test/parallel/test-stream2-readable-from-list.js b/test/parallel/test-stream2-readable-from-list.js deleted file mode 100644 index d5d113304e4925..00000000000000 --- a/test/parallel/test-stream2-readable-from-list.js +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright Joyent, Inc. and other Node contributors. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the -// "Software"), to deal in the Software without restriction, including -// without limitation the rights to use, copy, modify, merge, publish, -// distribute, sublicense, and/or sell copies of the Software, and to permit -// persons to whom the Software is furnished to do so, subject to the -// following conditions: -// -// The above copyright notice and this permission notice shall be included -// in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN -// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR -// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE -// USE OR OTHER DEALINGS IN THE SOFTWARE. - -// Flags: --expose-internals -'use strict'; -require('../common'); -const assert = require('assert'); -const fromList = require('stream').Readable._fromList; -const BufferList = require('internal/streams/buffer_list'); -const util = require('util'); - -function bufferListFromArray(arr) { - const bl = new BufferList(); - for (let i = 0; i < arr.length; ++i) - bl.push(arr[i]); - return bl; -} - -{ - // Verify behavior with buffers - let list = [ Buffer.from('foog'), - Buffer.from('bark'), - Buffer.from('bazy'), - Buffer.from('kuel') ]; - list = bufferListFromArray(list); - - assert.strictEqual( - util.inspect([ list ], { compact: false }), - `[ - BufferList { - head: [Object], - tail: [Object], - length: 4 - } -]`); - - // Read more than the first element. - let ret = fromList(6, { buffer: list, length: 16 }); - assert.strictEqual(ret.toString(), 'foogba'); - - // Read exactly the first element. - ret = fromList(2, { buffer: list, length: 10 }); - assert.strictEqual(ret.toString(), 'rk'); - - // Read less than the first element. - ret = fromList(2, { buffer: list, length: 8 }); - assert.strictEqual(ret.toString(), 'ba'); - - // Read more than we have. - ret = fromList(100, { buffer: list, length: 6 }); - assert.strictEqual(ret.toString(), 'zykuel'); - - // all consumed. - assert.deepStrictEqual(list, new BufferList()); -} - -{ - // Verify behavior with strings - let list = [ 'foog', - 'bark', - 'bazy', - 'kuel' ]; - list = bufferListFromArray(list); - - // Read more than the first element. - let ret = fromList(6, { buffer: list, length: 16, decoder: true }); - assert.strictEqual(ret, 'foogba'); - - // Read exactly the first element. - ret = fromList(2, { buffer: list, length: 10, decoder: true }); - assert.strictEqual(ret, 'rk'); - - // Read less than the first element. - ret = fromList(2, { buffer: list, length: 8, decoder: true }); - assert.strictEqual(ret, 'ba'); - - // Read more than we have. - ret = fromList(100, { buffer: list, length: 6, decoder: true }); - assert.strictEqual(ret, 'zykuel'); - - // all consumed. - assert.deepStrictEqual(list, new BufferList()); -}