Skip to content

Commit

Permalink
stream: improve readable push performance
Browse files Browse the repository at this point in the history
  • Loading branch information
mscdex committed May 23, 2017
1 parent 485be99 commit b94a7e0
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 72 deletions.
12 changes: 7 additions & 5 deletions benchmark/streams/readable-boundaryread.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ const common = require('../common');
const Readable = require('stream').Readable;

const bench = common.createBenchmark(main, {
n: [200e1]
n: [200e1],
type: ['string', 'buffer']
});

function main(conf) {
const n = +conf.n;
const b = new Buffer(32);
const s = new Readable();
function noop() {}
s._read = noop;
var data = 'a'.repeat(32);
if (conf.type === 'buffer')
data = Buffer.from(data);
s._read = function() {};

bench.start();
for (var k = 0; k < n; ++k) {
for (var i = 0; i < 1e4; ++i)
s.push(b);
s.push(data);
while (s.read(32));
}
bench.end(n);
Expand Down
141 changes: 74 additions & 67 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,81 +177,97 @@ Readable.prototype._destroy = function(err, cb) {
// write() some more.
Readable.prototype.push = function(chunk, encoding) {
var state = this._readableState;

if (!state.objectMode && typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (encoding !== state.encoding) {
chunk = Buffer.from(chunk, encoding);
encoding = '';
var skipChunkCheck;

if (!state.objectMode) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (encoding !== state.encoding) {
chunk = Buffer.from(chunk, encoding);
encoding = '';
}
skipChunkCheck = true;
}
} else {
skipChunkCheck = true;
}

return readableAddChunk(this, state, chunk, encoding, false);
return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
};

// Unshift should *always* be something directly out of read()
Readable.prototype.unshift = function(chunk) {
var state = this._readableState;
return readableAddChunk(this, state, chunk, '', true);
};

Readable.prototype.isPaused = function() {
return this._readableState.flowing === false;
return readableAddChunk(this, chunk, null, true, false);
};

function readableAddChunk(stream, state, chunk, encoding, addToFront) {
var er = chunkInvalid(state, chunk);
if (er) {
stream.emit('error', er);
} else if (chunk === null) {
function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
var state = stream._readableState;
if (chunk === null) {
state.reading = false;
onEofChunk(stream, state);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (state.ended && !addToFront) {
const e = new Error('stream.push() after EOF');
stream.emit('error', e);
} else if (state.endEmitted && addToFront) {
const e = new Error('stream.unshift() after end event');
stream.emit('error', e);
} else {
var skipAdd;
if (state.decoder && !addToFront && !encoding) {
chunk = state.decoder.write(chunk);
skipAdd = (!state.objectMode && chunk.length === 0);
}

if (!addToFront)
} else {
var er;
if (!skipChunkCheck)
er = chunkInvalid(state, chunk);
if (er) {
stream.emit('error', er);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (addToFront) {
if (state.endEmitted)
stream.emit('error', new Error('stream.unshift() after end event'));
else
addChunk(stream, state, chunk, true);
} else if (state.ended) {
stream.emit('error', new Error('stream.push() after EOF'));
} else {
state.reading = false;

// Don't add to the buffer if we've decoded to an empty string chunk and
// we're not in object mode
if (!skipAdd) {
// if we want the data now, just emit it.
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit('data', chunk);
stream.read(0);
} else {
// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
addChunk(stream, state, chunk, false);
else
state.buffer.push(chunk);

if (state.needReadable)
emitReadable(stream);
maybeReadMore(stream, state);
} else {
addChunk(stream, state, chunk, false);
}
}

maybeReadMore(stream, state);
} else if (!addToFront) {
state.reading = false;
}
} else if (!addToFront) {
state.reading = false;
}

return needMoreData(state);
}

function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit('data', chunk);
stream.read(0);
} else {
// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
else
state.buffer.push(chunk);

if (state.needReadable)
emitReadable(stream);
}
maybeReadMore(stream, state);
}

function chunkInvalid(state, chunk) {
var er;
if (!(chunk instanceof Buffer) &&
typeof chunk !== 'string' &&
chunk !== undefined &&
!state.objectMode) {
er = new TypeError('Invalid non-string/buffer chunk');
}
return er;
}


// if it's past the high water mark, we can push in some more.
// Also, if we have no data yet, we can stand some
Expand All @@ -267,6 +283,10 @@ function needMoreData(state) {
state.length === 0);
}

Readable.prototype.isPaused = function() {
return this._readableState.flowing === false;
};

// backwards compatibility.
Readable.prototype.setEncoding = function(enc) {
if (!StringDecoder)
Expand Down Expand Up @@ -438,19 +458,6 @@ Readable.prototype.read = function(n) {
return ret;
};

function chunkInvalid(state, chunk) {
var er = null;
if (!(chunk instanceof Buffer) &&
typeof chunk !== 'string' &&
chunk !== null &&
chunk !== undefined &&
!state.objectMode) {
er = new TypeError('Invalid non-string/buffer chunk');
}
return er;
}


function onEofChunk(stream, state) {
if (state.ended) return;
if (state.decoder) {
Expand Down

0 comments on commit b94a7e0

Please sign in to comment.