Skip to content

Commit

Permalink
Make pipe behave more like Readable#pipe. Fixes caolan#449.
Browse files Browse the repository at this point in the history
- `pipe` now emits the `pipe` event on the destination when piping.
- `pipe` has an optional pipe options argument that allows users to
  choose to not end the destination when the source ends.
  • Loading branch information
vqvu committed Feb 9, 2016
1 parent 24d1fad commit 8ba280e
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 95 deletions.
19 changes: 15 additions & 4 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ function pipeStream(src, dest, write, end, passAlongErrors) {
dest.removeListener('drain', onConsumerDrain);
});

dest.emit('pipe', src);

s.resume();
return dest;

Expand Down Expand Up @@ -955,17 +957,24 @@ Stream.prototype.end = function () {
* automatically managing flow so that the destination is not overwhelmed
* by a fast source.
*
* This function returns the destination so you can chain together pipe calls.
* Users may optionally pass an object that may contain any of these fields:
*
* - `end` - Ends the destination when this stream ends. Default: `true`. This
* option has no effect if the destination is either `process.stdout` or
* `process.stderr`. Those two streams are never neded.
*
* Like [Readable#pipe](https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options),
* this function will throw errors if there is no `error` handler installed on
* the stream. Use [through](#through) if you are piping to another Highland
* stream and want errors as well as values to be propagated.
*
* This function returns the destination so you can chain together pipe calls.
*
* @id pipe
* @section Consumption
* @name Stream.pipe(dest)
* @name Stream.pipe(dest, options)
* @param {Writable Stream} dest - the destination to write all data to
* @param {Object} options - (optional) an options object.
* @api public
*
* var source = _(generator);
Expand All @@ -976,9 +985,11 @@ Stream.prototype.end = function () {
* source.pipe(through).pipe(dest);
*/

Stream.prototype.pipe = function (dest) {
Stream.prototype.pipe = function (dest, options) {
options = options || {};

// stdout and stderr are special case writables that cannot be closed
var canClose = dest !== process.stdout && dest !== process.stderr;
var canClose = dest !== process.stdout && dest !== process.stderr && options.end !== false;

var end;
if (canClose) {
Expand Down
226 changes: 135 additions & 91 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1629,85 +1629,153 @@ exports['lazily evalute stream'] = function (test) {
test.done();
};


exports['pipe old-style node stream to highland stream'] = function (test) {
var xs = [];
var src = streamify([1,2,3,4]);
var s1 = _();
var s2 = s1.consume(function (err, x, push, next) {
xs.push(x);
next();
});
Stream.prototype.pipe.call(src, s1);
setTimeout(function () {
test.same(s1._incoming, [1]);
test.same(s2._incoming, []);
test.same(xs, []);
s2.resume();
exports['pipe'] = {
'old-style node stream to highland stream': function (test) {
var xs = [];
var src = streamify([1,2,3,4]);
var s1 = _();
var s2 = s1.consume(function (err, x, push, next) {
xs.push(x);
next();
});
Stream.prototype.pipe.call(src, s1);
setTimeout(function () {
test.same(s1._incoming, []);
test.same(s1._incoming, [1]);
test.same(s2._incoming, []);
test.same(xs, [1,2,3,4,_.nil]);
test.done();
test.same(xs, []);
s2.resume();
setTimeout(function () {
test.same(s1._incoming, []);
test.same(s2._incoming, []);
test.same(xs, [1,2,3,4,_.nil]);
test.done();
}, 100);
}, 100);
}, 100);
};

exports['pipe node stream to highland stream'] = function (test) {
var xs = [];
var src = streamify([1,2,3,4]);
var s1 = _();
var s2 = s1.consume(function (err, x, push, next) {
xs.push(x);
next();
});
src.pipe(s1);
setTimeout(function () {
test.same(s1._incoming, [1]);
test.same(s2._incoming, []);
test.same(xs, []);
s2.resume();
},
'node stream to highland stream': function (test) {
var xs = [];
var src = streamify([1,2,3,4]);
var s1 = _();
var s2 = s1.consume(function (err, x, push, next) {
xs.push(x);
next();
});
src.pipe(s1);
setTimeout(function () {
test.same(s1._incoming, []);
test.same(s1._incoming, [1]);
test.same(s2._incoming, []);
test.same(xs, [1,2,3,4,_.nil]);
test.done();
test.same(xs, []);
s2.resume();
setTimeout(function () {
test.same(s1._incoming, []);
test.same(s2._incoming, []);
test.same(xs, [1,2,3,4,_.nil]);
test.done();
}, 100);
}, 100);
}, 100);
};
},
'highland stream to node stream': function (test) {
var src = _(['a','b','c']);
var dest = concat(function (data) {
test.same(data, 'abc');
test.done();
});
src.pipe(dest);
},
'pipe to node stream with backpressure': function (test) {
test.expect(3);
var src = _([1,2,3,4]);
var xs = [];
var dest = new EventEmitter();
dest.writable = true;
dest.write = function (x) {
xs.push(x);
if (xs.length === 2) {
_.setImmediate(function () {
test.same(xs, [1,2]);
test.ok(src.paused);
dest.emit('drain');
});
return false;
}
};
dest.end = function () {
test.same(xs, [1,2,3,4]);
test.done();
};
src.pipe(dest);
},
'emits "pipe" event when piping (issue #449)': function (test) {
test.expect(1);

var src = _();
var dest = _();
dest.on('pipe', function (_src) {
test.strictEqual(_src, src);
test.done();
});
src.pipe(dest);
},
'pipe with {end:false} option should not end': function (test) {
test.expect(1);

exports['pipe highland stream to node stream'] = function (test) {
var src = _(['a','b','c']);
var dest = concat(function (data) {
test.same(data, 'abc');
var clock = sinon.useFakeTimers();
var dest = _();
var ended = false;
dest.end = function () {
ended = true;
};

_([1, 2, 3]).pipe(dest);

clock.tick(10000);
clock.restore();
test.ok(!ended, 'The destination should not have been ended.');
test.done();
});
src.pipe(dest);
}
};

exports['pipe to node stream with backpressure'] = function (test) {
test.expect(3);
var src = _([1,2,3,4]);
var xs = [];
var dest = new EventEmitter();
dest.writable = true;
dest.write = function (x) {
xs.push(x);
if (xs.length === 2) {
_.setImmediate(function () {
test.same(xs, [1,2]);
test.ok(src.paused);
dest.emit('drain');
});
return false;
}
// ignore these tests in non-node.js environments
if (typeof process !== 'undefined' && process.stdout) {
exports['pipe']['highland stream to stdout'] = function (test) {
test.expect(1)
var src = _(['']);
test.doesNotThrow(function () {
src.pipe(process.stdout);
})
test.done()
};
dest.end = function () {
test.same(xs, [1,2,3,4]);
test.done();

exports['pipe']['highland stream to stdout with {end:true}'] = function (test) {
test.expect(1)
var src = _(['']);
test.doesNotThrow(function () {
src.pipe(process.stdout, {end: true});
})
test.done()
};
}

// ignore these tests in non-node.js environments
if (typeof process !== 'undefined' && process.stderr) {
exports['pipe']['highland stream to stderr'] = function (test) {
test.expect(1)
var src = _(['']);
test.doesNotThrow(function () {
src.pipe(process.stderr);
})
test.done()
};
src.pipe(dest);
};

exports['pipe']['highland stream to stderr with {end:true}'] = function (test) {
test.expect(1)
var src = _(['']);
test.doesNotThrow(function () {
src.pipe(process.stderr, {end: true});
})
test.done()
};
}

exports['wrap node stream and pipe'] = function (test) {
test.expect(7);
Expand Down Expand Up @@ -1756,30 +1824,6 @@ exports['wrap node stream with error'] = function (test) {
}).each(function () {});
};

// ignore these tests in non-node.js environments
if (typeof process !== 'undefined' && process.stdout) {
exports['pipe highland stream to stdout'] = function (test) {
test.expect(1)
var src = _(['']);
test.doesNotThrow(function () {
src.pipe(process.stdout);
})
test.done()
}
}

// ignore these tests in non-node.js environments
if (typeof process !== 'undefined' && process.stderr) {
exports['pipe highland stream to stderr'] = function (test) {
test.expect(1)
var src = _(['']);
test.doesNotThrow(function () {
src.pipe(process.stderr);
})
test.done()
}
}

exports['attach data event handler'] = function (test) {
var s = _([1,2,3,4]);
var xs = [];
Expand Down

0 comments on commit 8ba280e

Please sign in to comment.