forked from mafintosh/pump
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
85 lines (67 loc) · 1.96 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
var once = require('once');
var noop = function() {};
var patch = function(stream, callback) { // patch 0.8 stream since they dont emit finish
var end = stream.end;
stream.end = function() {
callback();
end.apply(this, arguments);
};
};
var destroyer = function(stream, reading, writing, callback) {
callback = once(callback);
var destroyed = false;
var closed = false;
var onfinish = function() {
writing = false;
if (!reading) callback();
};
stream.on('error', callback);
stream.on('finish', onfinish);
stream.on('end', function() {
reading = false;
if (!writing) callback();
});
stream.on('close', function() {
closed = true;
if (!reading && !writing) return;
if (reading && stream._readableState && stream._readableState.ended) return;
callback(new Error('stream closed'));
});
if (writing && stream.writable && !stream._writableState) patch(stream, onfinish);
return function() {
if (closed || destroyed || (!reading && !writing)) return;
destroyed = true;
if (stream.destroy) return stream.destroy();
if (stream.end) return stream.end();
callback(new Error('stream was destroyed'));
};
};
var call = function(fn) {
fn();
};
var pipe = function(from, to) {
return from.pipe(to);
};
var functionish = function(fn) {
return !fn || typeof fn === 'function';
};
var pump = function() {
var streams = Array.prototype.slice.call(arguments);
var callback = functionish(streams[streams.length-1]) && streams.pop() || noop;
if (Array.isArray(streams[0])) streams = streams[0];
if (streams.length < 2) throw new Error('pump requires two streams per minimum');
var error;
var destroys = streams.map(function(stream, i) {
var reading = i < streams.length-1;
var writing = i > 0;
return destroyer(stream, reading, writing, function(err) {
if (!error) error = err;
if (err) destroys.forEach(call);
if (reading) return;
destroys.forEach(call);
callback(error);
});
});
return streams.reduce(pipe);
};
module.exports = pump;