Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: enable usage of webstreams on compose() #46675

Merged
merged 12 commits into from
Feb 27, 2023
148 changes: 118 additions & 30 deletions lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ const {
isNodeStream,
isReadable,
isWritable,
isWebStream,
isTransformStream,
isWritableStream,
isReadableStream,
} = require('internal/streams/utils');
const {
AbortError,
Expand All @@ -15,6 +19,7 @@ const {
ERR_MISSING_ARGS,
},
} = require('internal/errors');
const eos = require('internal/streams/end-of-stream');

module.exports = function compose(...streams) {
if (streams.length === 0) {
Expand Down Expand Up @@ -57,9 +62,8 @@ module.exports = function compose(...streams) {
}
}

let ondrain;
let onfinish;
let onreadable;
let writableEndDestructor;
let readableEndDestructor;
let onclose;
let d;

Expand All @@ -79,8 +83,8 @@ module.exports = function compose(...streams) {
const head = streams[0];
const tail = pipeline(streams, onfinished);

const writable = !!isWritable(head);
const readable = !!isReadable(tail);
const writable = !!(isWritable(head) || isWritableStream(head) || isTransformStream(head));
const readable = !!(isReadable(tail) || isReadableStream(tail) || isTransformStream(tail));

// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
Expand All @@ -94,15 +98,53 @@ module.exports = function compose(...streams) {
});

if (writable) {
d._write = function(chunk, encoding, callback) {
writableEndDestructor = makeWritableEnd(d, head, tail);
}

if (readable) {
readableEndDestructor = makeReadableEnd(d, head, tail);
}

d._destroy = function(err, callback) {
if (!err && onclose !== null) {
err = new AbortError();
}

if (readableEndDestructor) {
readableEndDestructor();
}

if (writableEndDestructor) {
writableEndDestructor();
}

if (onclose === null) {
callback(err);
} else {
onclose = callback;
if (isNodeStream(tail)) {
destroyer(tail, err);
Copy link
Member Author

@debadree25 debadree25 Feb 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some help is needed here, how could we destroy webstreams here? or should we even?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on the question? (We can destroy web streams the question is what scenario do you specifically mean)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the pipeline encounters an error, it would call d.destroy, which in turn would destroy the last stream in the series the tail stream should the same happen for webstreams too I think we could do writableStream.abort() here.

Actually, i am a little confused why destroying the last stream is necessary 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one question remain wdyt @benjamingr ?

}
}
};

return d;
};

function makeWritableEnd(duplex, head, tail) {
let ondrain;
let onfinish;

if (isNodeStream(head)) {
duplex._write = function(chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};

d._final = function(callback) {
duplex._final = function(callback) {
head.end();
onfinish = callback;
};
Expand All @@ -114,17 +156,61 @@ module.exports = function compose(...streams) {
cb();
}
});
} else if (isWebStream(head)) {
const writable = isTransformStream(head) ? head.writable : head;
const writer = writable.getWriter();

duplex._write = async function(chunk, encoding, callback) {
try {
await writer.ready;
writer.write(chunk).catch(() => {});
callback();
} catch (err) {
callback(err);
}
};

duplex._final = async function(callback) {
try {
await writer.ready;
writer.close();
onfinish = callback;
} catch (err) {
callback(err);
}
};
}

if (isNodeStream(tail)) {
tail.on('finish', function() {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
} else if (isWebStream(tail)) {
const readable = isTransformStream(tail) ? tail.readable : tail;
eos(readable, () => {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
}

if (readable) {
function destructor() {
ondrain = null;
onfinish = null;
}

return destructor;
}

function makeReadableEnd(duplex, head, tail) {
let onreadable;
if (isNodeStream(tail)) {
tail.on('readable', function() {
if (onreadable) {
const cb = onreadable;
Expand All @@ -134,41 +220,43 @@ module.exports = function compose(...streams) {
});

tail.on('end', function() {
d.push(null);
duplex.push(null);
});

d._read = function() {
duplex._read = function() {
while (true) {
const buf = tail.read();

if (buf === null) {
onreadable = d._read;
onreadable = duplex._read;
return;
}

if (!d.push(buf)) {
if (!duplex.push(buf)) {
return;
}
}
};
}
} else if (isWebStream(tail)) {
const readable = isTransformStream(tail) ? tail.readable : tail;
const reader = readable.getReader();
duplex._read = async function() {
while (true) {
const { value, done } = await reader.read();
if (done) {
duplex.push(null);
return;
}

d._destroy = function(err, callback) {
if (!err && onclose !== null) {
err = new AbortError();
}
if (!duplex.push(value)) {
return;
}
}
};
}

function destructor() {
onreadable = null;
ondrain = null;
onfinish = null;

if (onclose === null) {
callback(err);
} else {
onclose = callback;
destroyer(tail, err);
}
};
}

return d;
};
return destructor;
}
2 changes: 1 addition & 1 deletion lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ function pipelineImpl(streams, callback, opts) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
} else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) {
ret = stream;
} else {
ret = Duplex.from(stream);
Expand Down