forked from mafintosh/pump
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
128 additions
and
127 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,79 +1,79 @@ | ||
var once = require('once'); | ||
var eos = require('end-of-stream'); | ||
var fs = require('fs'); // we only need fs to get the ReadStream and WriteStream prototypes | ||
|
||
var noop = function() {}; | ||
|
||
var isFn = function(fn) { | ||
return typeof fn === 'function'; | ||
}; | ||
|
||
var isFS = function(stream) { | ||
return (stream instanceof (fs.ReadStream || noop) || stream instanceof (fs.WriteStream || noop)) && isFn(stream.close); | ||
}; | ||
|
||
var isRequest = function(stream) { | ||
return stream.setHeader && isFn(stream.abort); | ||
}; | ||
|
||
var destroyer = function(stream, reading, writing, callback) { | ||
callback = once(callback); | ||
|
||
var closed = false; | ||
stream.on('close', function() { | ||
closed = true; | ||
}); | ||
|
||
eos(stream, {readable:reading, writable:writing}, function(err) { | ||
if (err) return callback(err); | ||
closed = true; | ||
callback(); | ||
}); | ||
|
||
var destroyed = false; | ||
return function(err) { | ||
if (closed) return; | ||
if (destroyed) return; | ||
destroyed = true; | ||
|
||
if (isFS(stream)) return stream.close(); // use close for fs streams to avoid fd leaks | ||
if (isRequest(stream)) return stream.abort(); // request.destroy just do .end - .abort is what we want | ||
|
||
if (isFn(stream.destroy)) return stream.destroy(); | ||
|
||
callback(err || new Error('stream was destroyed')); | ||
}; | ||
}; | ||
|
||
var call = function(fn) { | ||
fn(); | ||
}; | ||
|
||
var pipe = function(from, to) { | ||
return from.pipe(to); | ||
}; | ||
|
||
var pump = function() { | ||
var streams = Array.prototype.slice.call(arguments); | ||
var callback = isFn(streams[streams.length-1] || noop) && streams.pop() || noop; | ||
|
||
if (Array.isArray(streams[0])) streams = streams[0]; | ||
if (streams.length < 2) throw new Error('pump requires two streams per minimum'); | ||
|
||
var error; | ||
var destroys = streams.map(function(stream, i) { | ||
var reading = i < streams.length-1; | ||
var writing = i > 0; | ||
return destroyer(stream, reading, writing, function(err) { | ||
if (!error) error = err; | ||
if (err) destroys.forEach(call); | ||
if (reading) return; | ||
destroys.forEach(call); | ||
callback(error); | ||
}); | ||
}); | ||
|
||
return streams.reduce(pipe); | ||
}; | ||
|
||
module.exports = pump; | ||
var once = require('once') | ||
var eos = require('end-of-stream') | ||
var fs = require('fs') // we only need fs to get the ReadStream and WriteStream prototypes | ||
|
||
var noop = function () {} | ||
|
||
var isFn = function (fn) { | ||
return typeof fn === 'function' | ||
} | ||
|
||
var isFS = function (stream) { | ||
return (stream instanceof (fs.ReadStream || noop) || stream instanceof (fs.WriteStream || noop)) && isFn(stream.close) | ||
} | ||
|
||
var isRequest = function (stream) { | ||
return stream.setHeader && isFn(stream.abort) | ||
} | ||
|
||
var destroyer = function (stream, reading, writing, callback) { | ||
callback = once(callback) | ||
|
||
var closed = false | ||
stream.on('close', function () { | ||
closed = true | ||
}) | ||
|
||
eos(stream, {readable: reading, writable: writing}, function (err) { | ||
if (err) return callback(err) | ||
closed = true | ||
callback() | ||
}) | ||
|
||
var destroyed = false | ||
return function (err) { | ||
if (closed) return | ||
if (destroyed) return | ||
destroyed = true | ||
|
||
if (isFS(stream)) return stream.close() // use close for fs streams to avoid fd leaks | ||
if (isRequest(stream)) return stream.abort() // request.destroy just do .end - .abort is what we want | ||
|
||
if (isFn(stream.destroy)) return stream.destroy() | ||
|
||
callback(err || new Error('stream was destroyed')) | ||
} | ||
} | ||
|
||
var call = function (fn) { | ||
fn() | ||
} | ||
|
||
var pipe = function (from, to) { | ||
return from.pipe(to) | ||
} | ||
|
||
var pump = function () { | ||
var streams = Array.prototype.slice.call(arguments) | ||
var callback = isFn(streams[streams.length - 1] || noop) && streams.pop() || noop | ||
|
||
if (Array.isArray(streams[0])) streams = streams[0] | ||
if (streams.length < 2) throw new Error('pump requires two streams per minimum') | ||
|
||
var error | ||
var destroys = streams.map(function (stream, i) { | ||
var reading = i < streams.length - 1 | ||
var writing = i > 0 | ||
return destroyer(stream, reading, writing, function (err) { | ||
if (!error) error = err | ||
if (err) destroys.forEach(call) | ||
if (reading) return | ||
destroys.forEach(call) | ||
callback(error) | ||
}) | ||
}) | ||
|
||
return streams.reduce(pipe) | ||
} | ||
|
||
module.exports = pump |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,47 +1,46 @@ | ||
var assert = require('assert'); | ||
var pump = require('./index'); | ||
var pump = require('./index') | ||
|
||
var rs = require('fs').createReadStream('/dev/random'); | ||
var ws = require('fs').createWriteStream('/dev/null'); | ||
var rs = require('fs').createReadStream('/dev/random') | ||
var ws = require('fs').createWriteStream('/dev/null') | ||
|
||
var toHex = function() { | ||
var reverse = new (require('stream').Transform)(); | ||
var toHex = function () { | ||
var reverse = new (require('stream').Transform)() | ||
|
||
reverse._transform = function(chunk, enc, callback) { | ||
reverse.push(chunk.toString('hex')); | ||
callback(); | ||
}; | ||
reverse._transform = function (chunk, enc, callback) { | ||
reverse.push(chunk.toString('hex')) | ||
callback() | ||
} | ||
|
||
return reverse; | ||
}; | ||
return reverse | ||
} | ||
|
||
var wsClosed = false; | ||
var rsClosed = false; | ||
var callbackCalled = false; | ||
var wsClosed = false | ||
var rsClosed = false | ||
var callbackCalled = false | ||
|
||
var check = function() { | ||
if (wsClosed && rsClosed && callbackCalled) process.exit(0); | ||
}; | ||
var check = function () { | ||
if (wsClosed && rsClosed && callbackCalled) process.exit(0) | ||
} | ||
|
||
ws.on('close', function() { | ||
wsClosed = true; | ||
check(); | ||
}); | ||
ws.on('close', function () { | ||
wsClosed = true | ||
check() | ||
}) | ||
|
||
rs.on('close', function() { | ||
rsClosed = true; | ||
check(); | ||
}); | ||
rs.on('close', function () { | ||
rsClosed = true | ||
check() | ||
}) | ||
|
||
pump(rs, toHex(), toHex(), toHex(), ws, function(err) { | ||
callbackCalled = true; | ||
check(); | ||
}); | ||
pump(rs, toHex(), toHex(), toHex(), ws, function () { | ||
callbackCalled = true | ||
check() | ||
}) | ||
|
||
setTimeout(function() { | ||
rs.destroy(); | ||
}, 1000); | ||
setTimeout(function () { | ||
rs.destroy() | ||
}, 1000) | ||
|
||
setTimeout(function() { | ||
throw new Error('timeout'); | ||
}, 5000); | ||
setTimeout(function () { | ||
throw new Error('timeout') | ||
}, 5000) |