Skip to content

Commit

Permalink
streams: Support objects other than Buffers
Browse files Browse the repository at this point in the history
We detect for non-string and non-buffer values in onread and
turn the stream into an "objectMode" stream.

If we are in "objectMode" mode then howMuchToRead will
always return 1, state.length will always have 1 appended
to it when there is a new item and fromList always takes
the first value from the list.

This means that for object streams, the n in read(n) is
ignored and read() will always return a single value

Fixed a bug with unpipe where the pipe would break because
the flowing state was not reset to false.

Fixed a bug with sync cb(null, null) in _read which would
forget to end the readable stream
  • Loading branch information
Raynos authored and isaacs committed Jan 24, 2013
1 parent 193320a commit 444bbd4
Show file tree
Hide file tree
Showing 9 changed files with 757 additions and 42 deletions.
3 changes: 3 additions & 0 deletions doc/api/stream.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ method. (See below.)
resource. Default=16kb
* `encoding` {String} If specified, then buffers will be decoded to
strings using the specified encoding. Default=null
* `objectMode` {Boolean} Whether this stream should behave
as a stream of objects. Meaning that stream.read(n) returns
a single value instead of a Buffer of size n

In classes that extend the Readable class, make sure to call the
constructor so that the buffering settings can be properly
Expand Down
52 changes: 42 additions & 10 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ function ReadableState(options, stream) {
this.needReadable = false;
this.emittedReadable = false;


// object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away
this.objectMode = !!options.objectMode;

// when piping, we only care about 'readable' events that happen
// after read()ing all the bytes and not getting any pushback.
this.ranOut = false;
Expand Down Expand Up @@ -129,6 +134,9 @@ function howMuchToRead(n, state) {
if (state.length === 0 && state.ended)
return 0;

if (state.objectMode)
return n === 0 ? 0 : 1;

if (isNaN(n) || n === null)
return state.length;

Expand Down Expand Up @@ -217,11 +225,11 @@ Readable.prototype.read = function(n) {

var ret;
if (n > 0)
ret = fromList(n, state.buffer, state.length, !!state.decoder);
ret = fromList(n, state);
else
ret = null;

if (ret === null || ret.length === 0) {
if (ret === null || (!state.objectMode && ret.length === 0)) {
state.needReadable = true;
n = 0;
}
Expand All @@ -246,20 +254,36 @@ function onread(stream, er, chunk) {
var state = stream._readableState;
var sync = state.sync;

// If we get something that is not a buffer, string, null, or undefined,
// then switch into objectMode. Now stream chunks are all considered
// to be of length=1, and the watermarks determine how many objects to
// keep in the buffer, rather than how many bytes or characters.
if (!Buffer.isBuffer(chunk) &&
'string' !== typeof chunk &&
chunk !== null &&
chunk !== undefined) {
state.objectMode = true;
state.length = state.buffer.length;
state.decoder = null;
}

state.reading = false;
if (er)
return stream.emit('error', er);

if (!chunk || !chunk.length) {
if (chunk === null ||
chunk === undefined ||
(!state.objectMode && !chunk.length)) {
// eof
state.ended = true;
if (state.decoder) {
chunk = state.decoder.end();
if (chunk && chunk.length) {
state.buffer.push(chunk);
state.length += chunk.length;
state.length += state.objectMode ? 1 : chunk.length;
}
}

// if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
if (!sync) {
Expand All @@ -271,16 +295,17 @@ function onread(stream, er, chunk) {
}
} else
endReadable(stream);
}
} else
endReadable(stream);
return;
}

if (state.decoder)
chunk = state.decoder.write(chunk);

// update the buffer info.
if (chunk) {
state.length += chunk.length;
if (chunk || (state.objectMode && chunk !== undefined && chunk !== null)) {
state.length += state.objectMode ? 1 : chunk.length;
state.buffer.push(chunk);
}

Expand Down Expand Up @@ -502,6 +527,7 @@ Readable.prototype.unpipe = function(dest) {
state.pipes = null;
state.pipesCount = 0;
this.removeListener('readable', pipeOnReadable);
state.flowing = false;
if (dest)
dest.emit('unpipe', this);
return this;
Expand All @@ -516,6 +542,7 @@ Readable.prototype.unpipe = function(dest) {
state.pipes = null;
state.pipesCount = 0;
this.removeListener('readable', pipeOnReadable);
state.flowing = false;

for (var i = 0; i < len; i++)
dests[i].emit('unpipe', this);
Expand Down Expand Up @@ -680,16 +707,21 @@ Readable._fromList = fromList;

// Pluck off n bytes from an array of buffers.
// Length is the combined lengths of all the buffers in the list.
function fromList(n, list, length, stringMode) {
function fromList(n, state) {
var list = state.buffer;
var length = state.length;
var stringMode = !!state.decoder;
var objectMode = !!state.objectMode;
var ret;

// nothing in the list, definitely empty.
if (list.length === 0) {
if (list.length === 0)
return null;
}

if (length === 0)
ret = null;
else if (objectMode)
ret = list.shift();
else if (!n || n >= length) {
// read it all, truncate the array.
if (stringMode)
Expand Down
48 changes: 34 additions & 14 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ function WritableState(options, stream) {
// default to pushing everything out as fast as possible.
this.lowWaterMark = options.lowWaterMark || 0;

// object stream flag to indicate whether or not this stream
// contains buffers or objects.
this.objectMode = !!options.objectMode;

// cast to ints.
this.lowWaterMark = ~~this.lowWaterMark;
this.highWaterMark = ~~this.highWaterMark;
Expand Down Expand Up @@ -130,15 +134,29 @@ Writable.prototype.write = function(chunk, encoding, cb) {
return;
}

var l = chunk.length;
if (false === state.decodeStrings)
chunk = [chunk, encoding || 'utf8'];
else if (typeof chunk === 'string' || encoding) {
chunk = new Buffer(chunk + '', encoding);
l = chunk.length;
// Writing something other than a string or buffer will switch
// the stream into objectMode.
if (!state.objectMode &&
typeof chunk !== 'string' &&
chunk !== null &&
chunk !== undefined &&
!Buffer.isBuffer(chunk))
state.objectMode = true;

var len;
if (state.objectMode)
len = 1;
else {
len = chunk.length;
if (false === state.decodeStrings)
chunk = [chunk, encoding || 'utf8'];
else if (typeof chunk === 'string' || encoding) {
chunk = new Buffer(chunk + '', encoding);
len = chunk.length;
}
}

state.length += l;
state.length += len;

var ret = state.length < state.highWaterMark;
if (ret === false)
Expand All @@ -153,7 +171,7 @@ Writable.prototype.write = function(chunk, encoding, cb) {

state.writing = true;
state.sync = true;
state.writelen = l;
state.writelen = len;
state.writecb = cb;
this._write(chunk, state.onwrite);
state.sync = false;
Expand All @@ -165,7 +183,7 @@ function onwrite(stream, er) {
var state = stream._writableState;
var sync = state.sync;
var cb = state.writecb;
var l = state.writelen;
var len = state.writelen;

state.writing = false;
state.writelen = null;
Expand All @@ -188,7 +206,7 @@ function onwrite(stream, er) {
stream.emit('error', er);
return;
}
state.length -= l;
state.length -= len;

if (cb) {
// Don't call the cb until the next tick if we're in sync mode.
Expand Down Expand Up @@ -232,12 +250,14 @@ function onwrite(stream, er) {
var chunk = chunkCb[0];
cb = chunkCb[1];

if (false === state.decodeStrings)
l = chunk[0].length;
if (state.objectMode)
len = 1;
else if (false === state.decodeStrings)
len = chunk[0].length;
else
l = chunk.length;
len = chunk.length;

state.writelen = l;
state.writelen = len;
state.writecb = cb;
state.writechunk = chunk;
state.writing = true;
Expand Down
134 changes: 133 additions & 1 deletion test/simple/test-stream2-basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ TestWriter.prototype.end = function(c) {

// tiny node-tap lookalike.
var tests = [];
var count = 0;

function test(name, fn) {
count++;
tests.push([name, fn]);
}

Expand All @@ -111,10 +114,18 @@ function run() {
fn({
same: assert.deepEqual,
equal: assert.equal,
end: run
end: function () {
count--;
run();
}
});
}

// ensure all tests have run
process.on("exit", function () {
assert.equal(count, 0);
});

process.nextTick(run);


Expand Down Expand Up @@ -319,6 +330,127 @@ test('multipipe', function(t) {
});
});

test('back pressure respected', function (t) {
function noop() {}

var r = new R();
var counter = 0;
r.push(["one"]);
r.push(["two"]);
r.push(["three"]);
r.push(["four"]);
r.push(null);
r._read = noop;

var w1 = new R();
w1.write = function (chunk) {
assert.equal(chunk[0], "one");
w1.emit("close");
process.nextTick(function () {
r.pipe(w2);
r.pipe(w3);
})
};
w1.end = noop;

r.pipe(w1);

var expected = ["two", "two", "three", "three", "four", "four"];

var w2 = new R();
w2.write = function (chunk) {
assert.equal(chunk[0], expected.shift());
assert.equal(counter, 0);

counter++;

if (chunk[0] === "four") {
return true;
}

setTimeout(function () {
counter--;
w2.emit("drain");
}, 10);

return false;
}
w2.end = noop;

var w3 = new R();
w3.write = function (chunk) {
assert.equal(chunk[0], expected.shift());
assert.equal(counter, 1);

counter++;

if (chunk[0] === "four") {
return true;
}

setTimeout(function () {
counter--;
w3.emit("drain");
}, 50);

return false;
};
w3.end = function () {
assert.equal(counter, 2);
assert.equal(expected.length, 0);
t.end();
};
});

test('read(0) for ended streams', function (t) {
var r = new R();
var written = false;
var ended = false;
r._read = function () {};

r.push(new Buffer("foo"));
r.push(null);

var v = r.read(0);

assert.equal(v, null);

var w = new R();

w.write = function (buffer) {
written = true;
assert.equal(ended, false);
assert.equal(buffer.toString(), "foo")
};

w.end = function () {
ended = true;
assert.equal(written, true);
t.end();
};

r.pipe(w);
})

test('sync _read ending', function (t) {
var r = new R();
var called = false;
r._read = function (n, cb) {
cb(null, null);
};

r.once('end', function () {
called = true;
})

r.read();

process.nextTick(function () {
assert.equal(called, true);
t.end();
})
});

assert.throws(function() {
var bad = new R({
highWaterMark: 10,
Expand Down
Loading

0 comments on commit 444bbd4

Please sign in to comment.