Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
streams2: Handle immediate synthetic transforms properly
Browse files Browse the repository at this point in the history
  • Loading branch information
isaacs committed Dec 14, 2012
1 parent 06e321d commit 8acb416
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 48 deletions.
3 changes: 1 addition & 2 deletions lib/_stream_passthrough.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,5 @@ function PassThrough(options) {
}

PassThrough.prototype._transform = function(chunk, output, cb) {
output(chunk);
cb();
cb(null, chunk);
};
134 changes: 91 additions & 43 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var StringDecoder;

util.inherits(Readable, Stream);

function ReadableState(options, stream) {
function ReadableState(options) {
options = options || {};

this.bufferSize = options.bufferSize || 16 * 1024;
Expand All @@ -44,7 +44,6 @@ function ReadableState(options, stream) {
this.flowing = false;
this.ended = false;
this.endEmitted = false;
this.stream = stream;
this.reading = false;

// whenever we return null, then we set a flag to say
Expand All @@ -71,52 +70,76 @@ Readable.prototype.setEncoding = function(enc) {
this._readableState.decoder = new StringDecoder(enc);
};

// you can override either this method, or _read(n, cb) below.
Readable.prototype.read = function(n) {
var state = this._readableState;

if (state.length === 0 && state.ended) {
endReadable(this);
return null;
}
function howMuchToRead(n, state) {
if (state.length === 0 && state.ended)
return 0;

if (isNaN(n))
return state.length;

if (isNaN(n) || n <= 0)
n = state.length
if (n <= 0)
return 0;

// XXX: controversial.
// don't have that much. return null, unless we've ended.
// However, if the low water mark is lower than the number of bytes,
// then we still need to return what we have, or else it won't kick
// off another _read() call. For example,
// lwm=5
// len=9
// read(10)
// We don't have that many bytes, so it'd be tempting to return null,
// but then it won't ever cause _read to be called, so in that case,
// we just return what we have, and let the programmer deal with it.
if (n > state.length) {
if (!state.ended && state.length <= state.lowWaterMark) {
if (!state.ended) {
state.needReadable = true;
n = 0;
return 0;
} else
n = state.length;
return state.length;
}

return n;
}

var ret;
if (n > 0)
ret = fromList(n, state.buffer, state.length, !!state.decoder);
else
ret = null;
// you can override either this method, or _read(n, cb) below.
Readable.prototype.read = function(n) {
var state = this._readableState;
var nOrig = n;

if (ret === null || ret.length === 0)
state.needReadable = true;
n = howMuchToRead(n, state);

state.length -= n;
// if we've ended, and we're now clear, then finish it up.
if (n === 0 && state.ended) {
endReadable(this);
return null;
}

if (!state.ended &&
state.length <= state.lowWaterMark &&
!state.reading) {
// All the actual chunk generation logic needs to be
// *below* the call to _read. The reason is that in certain
// synthetic stream cases, such as passthrough streams, _read
// may be a completely synchronous operation which may change
// the state of the read buffer, providing enough data when
// before there was *not* enough.
//
// So, the steps are:
// 1. Figure out what the state of things will be after we do
// a read from the buffer.
//
// 2. If that resulting state will trigger a _read, then call _read.
// Note that this may be asynchronous, or synchronous. Yes, it is
// deeply ugly to write APIs this way, but that still doesn't mean
// that the Readable class should behave improperly, as streams are
// designed to be sync/async agnostic.
// Take note if the _read call is sync or async (ie, if the read call
// has returned yet), so that we know whether or not it's safe to emit
// 'readable' etc.
//
// 3. Actually pull the requested chunks out of the buffer and return.

// if we need a readable event, then we need to do some reading.
var doRead = state.needReadable;
// if we currently have less than the lowWaterMark, then also read some
if (state.length - n <= state.lowWaterMark)
doRead = true;
// however, if we've ended, then there's no point, and if we're already
// reading, then it's unnecessary.
if (state.ended || state.reading)
doRead = false;

if (doRead) {
var sync = true;
state.reading = true;
// call internal read method
this._read(state.bufferSize, function onread(er, chunk) {
Expand All @@ -125,21 +148,27 @@ Readable.prototype.read = function(n) {
return this.emit('error', er);

if (!chunk || !chunk.length) {
// eof
state.ended = true;
// if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
if (state.length > 0)
this.emit('readable');
else
endReadable(this);
if (!sync) {
if (state.length > 0)
this.emit('readable');
else
endReadable(this);
}
return;
}

if (state.decoder)
chunk = state.decoder.write(chunk);

state.length += chunk.length;
state.buffer.push(chunk);
// update the buffer info.
if (chunk) {
state.length += chunk.length;
state.buffer.push(chunk);
}

// if we haven't gotten enough to pass the lowWaterMark,
// and we haven't ended, then don't bother telling the user
Expand All @@ -152,14 +181,33 @@ Readable.prototype.read = function(n) {
return;
}

// now we have something to call this.read() to get.
if (state.needReadable) {
if (state.needReadable && !sync) {
state.needReadable = false;
this.emit('readable');
}
}.bind(this));
sync = false;
}

// If _read called its callback synchronously, then `reading`
// will be false, and we need to re-evaluate how much data we
// can return to the user.
if (doRead && !state.reading)
n = howMuchToRead(nOrig, state);

var ret;
if (n > 0)
ret = fromList(n, state.buffer, state.length, !!state.decoder);
else
ret = null;

if (ret === null || ret.length === 0) {
state.needReadable = true;
n = 0;
}

state.length -= n;

return ret;
};

Expand Down
7 changes: 4 additions & 3 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,14 @@ Transform.prototype._write = function(chunk, cb) {
if (ts.pendingReadCb) {
var readcb = ts.pendingReadCb;
ts.pendingReadCb = null;
this._read(-1, readcb);
this._read(0, readcb);
}

// if we weren't waiting for it, but nothing is queued up, then
// still kick off a transform, just so it's there when the user asks.
if (rs.length === 0) {
var ret = this.read();
var doRead = rs.needReadable || rs.length <= rs.lowWaterMark;
if (doRead && !rs.reading) {
var ret = this.read(0);
if (ret !== null)
return cb(new Error('invalid stream transform state'));
}
Expand Down

0 comments on commit 8acb416

Please sign in to comment.