From 505d884ff06174b7f2b5817d382eac9ae7e4e113 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 16 Feb 2023 00:51:31 +0530 Subject: [PATCH 01/12] stream: enable usage of webstreams on compose() Refs: https://github.com/nodejs/node/issues/39316 --- lib/internal/streams/compose.js | 146 ++++++++++++++++++++++++------- lib/internal/streams/pipeline.js | 2 +- 2 files changed, 117 insertions(+), 31 deletions(-) diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index 03bfa39e9f6f1b..e1ba470650515b 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -7,6 +7,10 @@ const { isNodeStream, isReadable, isWritable, + isWebStream, + isTransformStream, + isWritableStream, + isReadableStream, } = require('internal/streams/utils'); const { AbortError, @@ -15,6 +19,7 @@ const { ERR_MISSING_ARGS, }, } = require('internal/errors'); +const eos = require('internal/streams/end-of-stream'); module.exports = function compose(...streams) { if (streams.length === 0) { @@ -57,9 +62,8 @@ module.exports = function compose(...streams) { } } - let ondrain; - let onfinish; - let onreadable; + let writableEndDestructor; + let readableEndDestructor; let onclose; let d; @@ -79,8 +83,8 @@ module.exports = function compose(...streams) { const head = streams[0]; const tail = pipeline(streams, onfinished); - const writable = !!isWritable(head); - const readable = !!isReadable(tail); + const writable = !!(isWritable(head) || isWritableStream(head) || isTransformStream(head)); + const readable = !!(isReadable(tail) || isReadableStream(tail) || isTransformStream(tail)); // TODO(ronag): Avoid double buffering. // Implement Writable/Readable/Duplex traits. @@ -94,7 +98,43 @@ module.exports = function compose(...streams) { }); if (writable) { - d._write = function(chunk, encoding, callback) { + writableEndDestructor = makeWritableEnd(d, head, tail); + } + + if (readable) { + readableEndDestructor = makeReadableEnd(d, head, tail); + } + + d._destroy = function(err, callback) { + if (!err && onclose !== null) { + err = new AbortError(); + } + + if (readableEndDestructor) { + readableEndDestructor(); + } + + if (writableEndDestructor) { + writableEndDestructor(); + } + + if (onclose === null) { + callback(err); + } else { + onclose = callback; + destroyer(tail, err); + } + }; + + return d; +}; + +function makeWritableEnd(duplex, head, tail) { + let ondrain; + let onfinish; + + if (isNodeStream(head)) { + duplex._write = function(chunk, encoding, callback) { if (head.write(chunk, encoding)) { callback(); } else { @@ -102,7 +142,7 @@ module.exports = function compose(...streams) { } }; - d._final = function(callback) { + duplex._final = function(callback) { head.end(); onfinish = callback; }; @@ -114,7 +154,32 @@ module.exports = function compose(...streams) { cb(); } }); + } else if (isWebStream(head)) { + const writable = isTransformStream(head) ? head.writable : head; + const writer = writable.getWriter(); + + duplex._write = async function(chunk, encoding, callback) { + try { + await writer.ready; + writer.write(chunk).catch(() => {}); + callback(); + } catch (err) { + callback(err); + } + }; + duplex._final = async function(callback) { + try { + await writer.ready; + writer.close(); + onfinish = callback; + } catch (err) { + callback(err); + } + }; + } + + if (isNodeStream(tail)) { tail.on('finish', function() { if (onfinish) { const cb = onfinish; @@ -122,9 +187,28 @@ module.exports = function compose(...streams) { cb(); } }); + } else if (isWebStream(tail)) { + const readable = isTransformStream(tail) ? tail.readable : tail; + eos(readable, () => { + if (onfinish) { + const cb = onfinish; + onfinish = null; + cb(); + } + }); } - if (readable) { + function destructor() { + ondrain = null; + onfinish = null; + } + + return destructor; +} + +function makeReadableEnd(duplex, head, tail) { + let onreadable; + if (isNodeStream(tail)) { tail.on('readable', function() { if (onreadable) { const cb = onreadable; @@ -134,41 +218,43 @@ module.exports = function compose(...streams) { }); tail.on('end', function() { - d.push(null); + duplex.push(null); }); - d._read = function() { + duplex._read = function() { while (true) { const buf = tail.read(); - if (buf === null) { - onreadable = d._read; + onreadable = duplex._read; return; } - if (!d.push(buf)) { + if (!duplex.push(buf)) { return; } } }; - } + } else if (isWebStream(tail)) { + const readable = isTransformStream(tail) ? tail.readable : tail; + const reader = readable.getReader(); + duplex._read = async function() { + while (true) { + const { value, done } = await reader.read(); + if (done) { + duplex.push(null); + return; + } - d._destroy = function(err, callback) { - if (!err && onclose !== null) { - err = new AbortError(); - } + if (!duplex.push(value)) { + return; + } + } + }; + } + function destructor() { onreadable = null; - ondrain = null; - onfinish = null; - - if (onclose === null) { - callback(err); - } else { - onclose = callback; - destroyer(tail, err); - } - }; + } - return d; -}; + return destructor; +} diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 44c0e06ee30557..d6471d10e06a4b 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -285,7 +285,7 @@ function pipelineImpl(streams, callback, opts) { throw new ERR_INVALID_RETURN_VALUE( 'Iterable, AsyncIterable or Stream', 'source', ret); } - } else if (isIterable(stream) || isReadableNodeStream(stream)) { + } else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) { ret = stream; } else { ret = Duplex.from(stream); From a1cca78bad1e24d943d4738f8606da49988143d3 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 16 Feb 2023 12:55:43 +0530 Subject: [PATCH 02/12] fixup! destroyer --- lib/internal/streams/compose.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index e1ba470650515b..9e6604123dbc01 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -122,7 +122,9 @@ module.exports = function compose(...streams) { callback(err); } else { onclose = callback; - destroyer(tail, err); + if (isNodeStream(tail)) { + destroyer(tail, err); + } } }; From bcd122a620e8a8f658e2e96b5bce3324f841bc27 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 16 Feb 2023 22:27:15 +0530 Subject: [PATCH 03/12] fixup! add tests --- lib/internal/streams/compose.js | 32 +- test/parallel/test-webstreams-compose.js | 365 +++++++++++++++++++++++ 2 files changed, 392 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-webstreams-compose.js diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index 9e6604123dbc01..f56ea88a1d28d5 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -42,18 +42,32 @@ module.exports = function compose(...streams) { } for (let n = 0; n < streams.length; ++n) { - if (!isNodeStream(streams[n])) { + if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) { // TODO(ronag): Add checks for non streams. continue; } - if (n < streams.length - 1 && !isReadable(streams[n])) { + if ( + n < streams.length - 1 && + !( + isReadable(streams[n]) || + isReadableStream(streams[n]) || + isTransformStream(streams[n]) + ) + ) { throw new ERR_INVALID_ARG_VALUE( `streams[${n}]`, orgStreams[n], 'must be readable' ); } - if (n > 0 && !isWritable(streams[n])) { + if ( + n > 0 && + !( + isWritable(streams[n]) || + isWritableStream(streams[n]) || + isTransformStream(streams[n]) + ) + ) { throw new ERR_INVALID_ARG_VALUE( `streams[${n}]`, orgStreams[n], @@ -83,8 +97,16 @@ module.exports = function compose(...streams) { const head = streams[0]; const tail = pipeline(streams, onfinished); - const writable = !!(isWritable(head) || isWritableStream(head) || isTransformStream(head)); - const readable = !!(isReadable(tail) || isReadableStream(tail) || isTransformStream(tail)); + const writable = !!( + isWritable(head) || + isWritableStream(head) || + isTransformStream(head) + ); + const readable = !!( + isReadable(tail) || + isReadableStream(tail) || + isTransformStream(tail) + ); // TODO(ronag): Avoid double buffering. // Implement Writable/Readable/Duplex traits. diff --git a/test/parallel/test-webstreams-compose.js b/test/parallel/test-webstreams-compose.js new file mode 100644 index 00000000000000..43938f25d687a9 --- /dev/null +++ b/test/parallel/test-webstreams-compose.js @@ -0,0 +1,365 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); + +const { + Transform, + Readable, + Writable, + compose +} = require('stream'); + +const { + TransformStream, + ReadableStream, + WritableStream, +} = require('stream/web'); + +{ + let res = ''; + + const d = compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk?.toString()?.replace(' ', '_')); + }) + }), + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk?.toString()?.toUpperCase()); + }) + }) + ); + + d.on('data', common.mustCall((chunk) => { + res += chunk; + })); + + d.on('end', common.mustCall(() => { + assert.strictEqual(res, 'HELLO_WORLD'); + })); + + d.end('hello world'); +} + +{ + let res = ''; + + compose( + new Transform({ + transform: common.mustCall((chunk, encoding, callback) => { + callback(null, chunk + chunk); + }) + }), + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }) + }) + ) + .end('asd') + .on('data', common.mustCall((buf) => { + res += buf; + })) + .on('end', common.mustCall(() => { + assert.strictEqual(res, 'ASDASD'); + })); +} + +{ + let res = ''; + + compose( + async function*(source) { + for await (const chunk of source) { + yield chunk + chunk; + } + }, + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }), + }) + ) + .end('asd') + .on('data', common.mustCall((buf) => { + res += buf; + })) + .on('end', common.mustCall(() => { + assert.strictEqual(res, 'ASDASD'); + })); +} + +{ + let res = ''; + + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }), + }), + async function*(source) { + for await (const chunk of source) { + yield chunk + chunk; + } + }, + new Transform({ + transform: common.mustCall((chunk, enc, clb) => { + clb(null, chunk?.toString()?.replaceAll('A', 'B')); + }) + }) + ) + .end('asd') + .on('data', common.mustCall((buf) => { + res += buf; + })) + .on('end', common.mustCall(() => { + assert.strictEqual(res, 'BSDBSD'); + })); +} + +{ + let res = ''; + + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }), + }), + async function*(source) { + for await (const chunk of source) { + yield chunk + chunk; + } + }, + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk?.toString()?.replaceAll('A', 'B')); + }) + }) + ) + .end('asd') + .on('data', common.mustCall((buf) => { + res += buf; + })) + .on('end', common.mustCall(() => { + assert.strictEqual(res, 'BSDBSD'); + })); +} + +{ + let res = ''; + compose( + new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + controller.close(); + } + }), + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk?.toString()?.toUpperCase()); + }) + }) + ) + .on('data', common.mustCall((buf) => { + res += buf; + })) + .on('end', common.mustCall(() => { + assert.strictEqual(res, 'ASD'); + })); +} + +{ + let res = ''; + compose( + new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + controller.close(); + } + }), + new Transform({ + transform: common.mustCall((chunk, enc, clb) => { + clb(null, chunk?.toString()?.toUpperCase()); + }) + }) + ) + .on('data', common.mustCall((buf) => { + res += buf; + })) + .on('end', common.mustCall(() => { + assert.strictEqual(res, 'ASD'); + })); +} + +{ + let res = ''; + compose( + Readable.from(['asd']), + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk?.toString()?.toUpperCase()); + }) + }) + ) + .on('data', common.mustCall((buf) => { + res += buf; + })) + .on('end', common.mustCall(() => { + assert.strictEqual(res, 'ASD'); + })); +} + +{ + let res = ''; + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }) + }), + async function*(source) { + for await (const chunk of source) { + yield chunk; + } + }, + new Writable({ + write: common.mustCall((chunk, encoding, callback) => { + res += chunk; + callback(null); + }) + }) + ) + .end('asd') + .on('finish', common.mustCall(() => { + assert.strictEqual(res, 'ASD'); + })); +} + +{ + let res = ''; + compose( + new Transform({ + transform: common.mustCall((chunk, encoding, callback) => { + callback(null, chunk.toString().toUpperCase()); + }) + }), + async function*(source) { + for await (const chunk of source) { + yield chunk; + } + }, + new WritableStream({ + write: common.mustCall((chunk) => { + res += chunk; + }) + }) + ) + .end('asd') + .on('finish', common.mustCall(() => { + assert.strictEqual(res, 'ASD'); + })); +} + +{ + let res = ''; + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }) + }), + async function*(source) { + for await (const chunk of source) { + yield chunk; + } + }, + new WritableStream({ + write: common.mustCall((chunk) => { + res += chunk; + }) + }) + ) + .end('asd') + .on('finish', common.mustCall(() => { + assert.strictEqual(res, 'ASD'); + })); +} + +{ + let res = ''; + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }) + }), + async function*(source) { + for await (const chunk of source) { + yield chunk; + } + }, + async function(source) { + for await (const chunk of source) { + res += chunk; + } + } + ) + .end('asd') + .on('finish', common.mustCall(() => { + assert.strictEqual(res, 'ASD'); + })); +} + +{ + const _err = new Error('asd'); + + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.error(_err); + }) + }), + new TransformStream({ + transform: common.mustNotCall() + }) + ) + .on('data', common.mustNotCall()) + .on('end', common.mustNotCall()) + .on('error', (err) => { + assert.strictEqual(err, _err); + }) + .end('xyz'); +} + +{ + const _err = new Error('asd'); + + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk); + }) + }), + async function*(source) { // eslint-disable-line require-yield + let tmp = ''; + for await (const chunk of source) { + tmp += chunk; + throw _err; + } + return tmp; + }, + new TransformStream({ + transform: common.mustNotCall() + }) + ) + .on('data', common.mustNotCall()) + .on('end', common.mustNotCall()) + .on('error', (err) => { + assert.strictEqual(err, _err); + }) + .end('xyz'); +} From 62f537220079b0e19321389b5c78bf72a508bde3 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 16 Feb 2023 22:41:06 +0530 Subject: [PATCH 04/12] fixup! update doc --- doc/api/stream.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index ac9ea0afd8f331..03d505142d9f62 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2802,7 +2802,8 @@ added: v16.9.0 > Stability: 1 - `stream.compose` is experimental. -* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]} +* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]| + ReadableStream\[]|WritableStream\[]|TransformStream\[]} * Returns: {stream.Duplex} Combines two or more streams into a `Duplex` stream that writes to the From 20fd566537fbb6f59c07d505efa05877b74ea3bc Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 16 Feb 2023 23:47:33 +0530 Subject: [PATCH 05/12] fixup! pipeline finishcount --- lib/internal/streams/pipeline.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index d6471d10e06a4b..4a78e1c4698874 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -384,6 +384,7 @@ function pipelineImpl(streams, callback, opts) { finishCount++; pumpToWeb(ret, stream, finish, { end }); } else if (isTransformStream(ret)) { + finishCount++; pumpToWeb(ret.readable, stream, finish, { end }); } else { throw new ERR_INVALID_ARG_TYPE( From 4e1d499cf6c7b5f6aa8e5c936e92c25318f16154 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 16 Feb 2023 23:48:00 +0530 Subject: [PATCH 06/12] fixup! add rejection handle to writer close --- lib/internal/streams/compose.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index f56ea88a1d28d5..6177c4e398dfe1 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -195,7 +195,7 @@ function makeWritableEnd(duplex, head, tail) { duplex._final = async function(callback) { try { await writer.ready; - writer.close(); + writer.close().catch(() => {}); onfinish = callback; } catch (err) { callback(err); From f3c62141a2e9395161b98fd9908fccfd541de4ca Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 16 Feb 2023 23:52:23 +0530 Subject: [PATCH 07/12] fixup! tests some issue --- test/parallel/test-webstreams-compose.js | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/test/parallel/test-webstreams-compose.js b/test/parallel/test-webstreams-compose.js index 43938f25d687a9..af1507f01312e3 100644 --- a/test/parallel/test-webstreams-compose.js +++ b/test/parallel/test-webstreams-compose.js @@ -315,22 +315,21 @@ const { } { - const _err = new Error('asd'); compose( new TransformStream({ transform: common.mustCall((chunk, controller) => { - controller.error(_err); + controller.error(new Error('asd')); }) }), new TransformStream({ transform: common.mustNotCall() }) ) - .on('data', common.mustNotCall()) + // .on('data', common.mustNotCall()) .on('end', common.mustNotCall()) .on('error', (err) => { - assert.strictEqual(err, _err); + assert.strictEqual(err?.message, 'asd'); }) .end('xyz'); } @@ -356,7 +355,7 @@ const { transform: common.mustNotCall() }) ) - .on('data', common.mustNotCall()) + // .on('data', common.mustNotCall()) .on('end', common.mustNotCall()) .on('error', (err) => { assert.strictEqual(err, _err); From 8ce94be9ab8e40442eebf000321d297dc821b341 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 17 Feb 2023 12:52:19 +0530 Subject: [PATCH 08/12] fixup! fix unhandled reject, add more tests --- lib/internal/streams/compose.js | 18 ++-- test/parallel/test-webstreams-compose.js | 129 ++++++++++++++++++++++- 2 files changed, 135 insertions(+), 12 deletions(-) diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index 6177c4e398dfe1..96e518054fdac8 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -263,13 +263,17 @@ function makeReadableEnd(duplex, head, tail) { const reader = readable.getReader(); duplex._read = async function() { while (true) { - const { value, done } = await reader.read(); - if (done) { - duplex.push(null); - return; - } - - if (!duplex.push(value)) { + try { + const { value, done } = await reader.read(); + if (done) { + duplex.push(null); + return; + } + + if (!duplex.push(value)) { + return; + } + } catch { return; } } diff --git a/test/parallel/test-webstreams-compose.js b/test/parallel/test-webstreams-compose.js index af1507f01312e3..5514d12bd02eb0 100644 --- a/test/parallel/test-webstreams-compose.js +++ b/test/parallel/test-webstreams-compose.js @@ -326,7 +326,29 @@ const { transform: common.mustNotCall() }) ) - // .on('data', common.mustNotCall()) + .on('data', common.mustNotCall()) + .on('end', common.mustNotCall()) + .on('error', (err) => { + assert.strictEqual(err?.message, 'asd'); + }) + .end('xyz'); +} + +{ + + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk); + }) + }), + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.error(new Error('asd')); + }) + }) + ) + .on('data', common.mustNotCall()) .on('end', common.mustNotCall()) .on('error', (err) => { assert.strictEqual(err?.message, 'asd'); @@ -335,7 +357,6 @@ const { } { - const _err = new Error('asd'); compose( new TransformStream({ @@ -347,7 +368,7 @@ const { let tmp = ''; for await (const chunk of source) { tmp += chunk; - throw _err; + throw new Error('asd'); } return tmp; }, @@ -355,10 +376,108 @@ const { transform: common.mustNotCall() }) ) - // .on('data', common.mustNotCall()) + .on('data', common.mustNotCall()) + .on('end', common.mustNotCall()) + .on('error', (err) => { + assert.strictEqual(err?.message, 'asd'); + }) + .end('xyz'); +} + +{ + + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.error(new Error('asd')); + }) + }), + new Transform({ + transform: common.mustNotCall() + }) + ) + .on('data', common.mustNotCall()) .on('end', common.mustNotCall()) .on('error', (err) => { - assert.strictEqual(err, _err); + assert.strictEqual(err?.message, 'asd'); }) .end('xyz'); } + +{ + + compose( + new Transform({ + transform: common.mustCall((chunk, enc, clb) => { + clb(new Error('asd')); + }) + }), + new TransformStream({ + transform: common.mustNotCall() + }) + ) + .on('data', common.mustNotCall()) + .on('end', common.mustNotCall()) + .on('error', (err) => { + assert.strictEqual(err?.message, 'asd'); + }) + .end('xyz'); +} + +{ + compose( + new ReadableStream({ + start(controller) { + controller.enqueue(new Error('asd')); + } + }), + new TransformStream({ + transform: common.mustNotCall() + }) + ) + .on('data', common.mustNotCall()) + .on('end', common.mustNotCall()) + .on('error', (err) => { + assert.strictEqual(err?.message, 'asd'); + }) + .end('xyz'); +} + +{ + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }) + }), + new WritableStream({ + write: common.mustCall((chunk, controller) => { + controller.error(new Error('asd')); + }) + }) + ) + .on('error', (err) => { + assert.strictEqual(err?.message, 'asd'); + }) + .end('xyz'); +} + +{ + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }) + }), + async function*(source) { + for await (const chunk of source) { + yield chunk; + } + }, + async function(source) { + throw new Error('asd'); + } + ).on('error', (err) => { + assert.strictEqual(err?.message, 'asd'); + }).end('xyz'); +} From 838f5ef3a7ab921d195337634d1202a056d34a02 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 17 Feb 2023 13:23:13 +0530 Subject: [PATCH 09/12] fixup! simplify --- lib/internal/streams/compose.js | 261 ++++++++++++++------------------ 1 file changed, 116 insertions(+), 145 deletions(-) diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index 96e518054fdac8..082a870533c988 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -76,8 +76,9 @@ module.exports = function compose(...streams) { } } - let writableEndDestructor; - let readableEndDestructor; + let ondrain; + let onfinish; + let onreadable; let onclose; let d; @@ -120,11 +121,119 @@ module.exports = function compose(...streams) { }); if (writable) { - writableEndDestructor = makeWritableEnd(d, head, tail); + if (isNodeStream(head)) { + d._write = function(chunk, encoding, callback) { + if (head.write(chunk, encoding)) { + callback(); + } else { + ondrain = callback; + } + }; + + d._final = function(callback) { + head.end(); + onfinish = callback; + }; + + head.on('drain', function() { + if (ondrain) { + const cb = ondrain; + ondrain = null; + cb(); + } + }); + } else if (isWebStream(head)) { + const writable = isTransformStream(head) ? head.writable : head; + const writer = writable.getWriter(); + + d._write = async function(chunk, encoding, callback) { + try { + await writer.ready; + writer.write(chunk).catch(() => {}); + callback(); + } catch (err) { + callback(err); + } + }; + + d._final = async function(callback) { + try { + await writer.ready; + writer.close().catch(() => {}); + onfinish = callback; + } catch (err) { + callback(err); + } + }; + } + if (isNodeStream(tail)) { + tail.on('finish', function() { + if (onfinish) { + const cb = onfinish; + onfinish = null; + cb(); + } + }); + } else if (isWebStream(tail)) { + const readable = isTransformStream(tail) ? tail.readable : tail; + eos(readable, () => { + if (onfinish) { + const cb = onfinish; + onfinish = null; + cb(); + } + }); + } } if (readable) { - readableEndDestructor = makeReadableEnd(d, head, tail); + if (isNodeStream(tail)) { + tail.on('readable', function() { + if (onreadable) { + const cb = onreadable; + onreadable = null; + cb(); + } + }); + + tail.on('end', function() { + d.push(null); + }); + + d._read = function() { + while (true) { + const buf = tail.read(); + if (buf === null) { + onreadable = d._read; + return; + } + + if (!d.push(buf)) { + return; + } + } + }; + } else if (isWebStream(tail)) { + const readable = isTransformStream(tail) ? tail.readable : tail; + const reader = readable.getReader(); + d._read = async function() { + while (true) { + try { + const { value, done } = await reader.read(); + if (done) { + d.push(null); + return; + } + + if (!d.push(value)) { + return; + } + } catch { + return; + } + } + }; + } } d._destroy = function(err, callback) { @@ -132,13 +241,9 @@ module.exports = function compose(...streams) { err = new AbortError(); } - if (readableEndDestructor) { - readableEndDestructor(); - } - - if (writableEndDestructor) { - writableEndDestructor(); - } + onreadable = null; + ondrain = null; + onfinish = null; if (onclose === null) { callback(err); @@ -152,137 +257,3 @@ module.exports = function compose(...streams) { return d; }; - -function makeWritableEnd(duplex, head, tail) { - let ondrain; - let onfinish; - - if (isNodeStream(head)) { - duplex._write = function(chunk, encoding, callback) { - if (head.write(chunk, encoding)) { - callback(); - } else { - ondrain = callback; - } - }; - - duplex._final = function(callback) { - head.end(); - onfinish = callback; - }; - - head.on('drain', function() { - if (ondrain) { - const cb = ondrain; - ondrain = null; - cb(); - } - }); - } else if (isWebStream(head)) { - const writable = isTransformStream(head) ? head.writable : head; - const writer = writable.getWriter(); - - duplex._write = async function(chunk, encoding, callback) { - try { - await writer.ready; - writer.write(chunk).catch(() => {}); - callback(); - } catch (err) { - callback(err); - } - }; - - duplex._final = async function(callback) { - try { - await writer.ready; - writer.close().catch(() => {}); - onfinish = callback; - } catch (err) { - callback(err); - } - }; - } - - if (isNodeStream(tail)) { - tail.on('finish', function() { - if (onfinish) { - const cb = onfinish; - onfinish = null; - cb(); - } - }); - } else if (isWebStream(tail)) { - const readable = isTransformStream(tail) ? tail.readable : tail; - eos(readable, () => { - if (onfinish) { - const cb = onfinish; - onfinish = null; - cb(); - } - }); - } - - function destructor() { - ondrain = null; - onfinish = null; - } - - return destructor; -} - -function makeReadableEnd(duplex, head, tail) { - let onreadable; - if (isNodeStream(tail)) { - tail.on('readable', function() { - if (onreadable) { - const cb = onreadable; - onreadable = null; - cb(); - } - }); - - tail.on('end', function() { - duplex.push(null); - }); - - duplex._read = function() { - while (true) { - const buf = tail.read(); - if (buf === null) { - onreadable = duplex._read; - return; - } - - if (!duplex.push(buf)) { - return; - } - } - }; - } else if (isWebStream(tail)) { - const readable = isTransformStream(tail) ? tail.readable : tail; - const reader = readable.getReader(); - duplex._read = async function() { - while (true) { - try { - const { value, done } = await reader.read(); - if (done) { - duplex.push(null); - return; - } - - if (!duplex.push(value)) { - return; - } - } catch { - return; - } - } - }; - } - - function destructor() { - onreadable = null; - } - - return destructor; -} From 183f47517885c9981517718d12eb2b763d270c49 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 17 Feb 2023 15:44:43 +0530 Subject: [PATCH 10/12] fixup! unify conditions --- lib/internal/streams/compose.js | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index 082a870533c988..4fff0e199404a7 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -166,24 +166,16 @@ module.exports = function compose(...streams) { } }; } - if (isNodeStream(tail)) { - tail.on('finish', function() { - if (onfinish) { - const cb = onfinish; - onfinish = null; - cb(); - } - }); - } else if (isWebStream(tail)) { - const readable = isTransformStream(tail) ? tail.readable : tail; - eos(readable, () => { - if (onfinish) { - const cb = onfinish; - onfinish = null; - cb(); - } - }); - } + + const toRead = isTransformStream(tail) ? tail.readable : tail; + + eos(toRead, () => { + if (onfinish) { + const cb = onfinish; + onfinish = null; + cb(); + } + }); } if (readable) { From 233115d384f7c85afcda89c9cfad3fd9f4e38552 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 17 Feb 2023 23:03:48 +0530 Subject: [PATCH 11/12] fixup! add update to doc --- doc/api/stream.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index 03d505142d9f62..082679ce80728d 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2798,6 +2798,10 @@ const server = http.createServer((req, res) => { > Stability: 1 - `stream.compose` is experimental. From 002eed44160f431fd4d2b075db96cfea8211d522 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 19 Feb 2023 17:43:17 +0530 Subject: [PATCH 12/12] fixup! update code flow while reading --- lib/internal/streams/compose.js | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index 4fff0e199404a7..06d6883ce62fc3 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -212,12 +212,13 @@ module.exports = function compose(...streams) { while (true) { try { const { value, done } = await reader.read(); - if (done) { - d.push(null); + + if (!d.push(value)) { return; } - if (!d.push(value)) { + if (done) { + d.push(null); return; } } catch {