From de4983a3fa0ddf5bc8e0ccf34b14f0a0dbf8da30 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 1 Oct 2023 18:22:42 +0530 Subject: [PATCH] stream: implement TransformStream cleanup using "transformer.cancel" Fixes: https://github.com/nodejs/node/issues/49971 --- lib/internal/webstreams/transformstream.js | 115 +++++++++++++++++++-- 1 file changed, 105 insertions(+), 10 deletions(-) diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index 4d47856a6a12de..597d38f5d8fed5 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -47,6 +47,7 @@ const { nonOpFlush, kType, kState, + nonOpCancel, } = require('internal/webstreams/util'); const { @@ -384,8 +385,7 @@ function initializeTransformStream( return transformStreamDefaultSourcePullAlgorithm(stream); }, cancel(reason) { - transformStreamErrorWritableAndUnblockWrite(stream, reason); - return PromiseResolve(); + return transformStreamDefaultSourceCancelAlgorithm(stream, reason); }, }, { highWaterMark: readableHighWaterMark, @@ -427,6 +427,10 @@ function transformStreamErrorWritableAndUnblockWrite(stream, error) { writableStreamDefaultControllerErrorIfNeeded( writable[kState].controller, error); + transformStreamUnblockWrite(stream); +} + +function transformStreamUnblockWrite(stream) { if (stream[kState].backpressure) transformStreamSetBackpressure(stream, false); } @@ -443,13 +447,15 @@ function setupTransformStreamDefaultController( stream, controller, transformAlgorithm, - flushAlgorithm) { + flushAlgorithm, + cancelAlgorithm) { assert(isTransformStream(stream)); assert(stream[kState].controller === undefined); controller[kState] = { stream, transformAlgorithm, flushAlgorithm, + cancelAlgorithm, }; stream[kState].controller = controller; } @@ -460,21 +466,26 @@ function setupTransformStreamDefaultControllerFromTransformer( const controller = new TransformStreamDefaultController(kSkipThrow); const transform = transformer?.transform || defaultTransformAlgorithm; const flush = transformer?.flush || nonOpFlush; + const cancel = transformer?.cancel || nonOpCancel; const transformAlgorithm = FunctionPrototypeBind(transform, transformer); const flushAlgorithm = FunctionPrototypeBind(flush, transformer); + const cancelAlgorithm = + FunctionPrototypeBind(cancel, transformer); setupTransformStreamDefaultController( stream, controller, transformAlgorithm, - flushAlgorithm); + flushAlgorithm, + cancelAlgorithm); } function transformStreamDefaultControllerClearAlgorithms(controller) { controller[kState].transformAlgorithm = undefined; controller[kState].flushAlgorithm = undefined; + controller[kState].cancelAlgorithm = undefined; } function transformStreamDefaultControllerEnqueue(controller, chunk) { @@ -563,7 +574,40 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) { } async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) { - transformStreamError(stream, reason); + const { + controller, + readable, + } = stream[kState]; + + if (controller[kState].finishPromise !== undefined) { + return controller[kState].finishPromise + } + + const { promise, resolve, reject } = createDeferredPromise(); + controller[kState].finishPromise = promise; + const cancelPromise = ensureIsPromise( + controller[kState].cancelAlgorithm, + controller, + reason); + transformStreamDefaultControllerClearAlgorithms(controller); + + PromisePrototypeThen( + cancelPromise, + () => { + if (readable[kState].state === 'errored') + reject(readable[kState].storedError); + else { + readableStreamDefaultControllerError(readable[kState].controller, reason); + resolve(); + } + }, + (error) => { + readableStreamDefaultControllerError(readable[kState].controller, error); + reject(error); + } + ); + + return controller[kState].finishPromise; } function transformStreamDefaultSinkCloseAlgorithm(stream) { @@ -572,23 +616,32 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) { controller, } = stream[kState]; + if (controller[kState].finishPromise !== undefined) { + return controller[kState].finishPromise + } + const { promise, resolve, reject } = createDeferredPromise(); + controller[kState].finishPromise = promise; const flushPromise = ensureIsPromise( controller[kState].flushAlgorithm, controller, controller); transformStreamDefaultControllerClearAlgorithms(controller); - return PromisePrototypeThen( + PromisePrototypeThen( flushPromise, () => { if (readable[kState].state === 'errored') - throw readable[kState].storedError; - readableStreamDefaultControllerClose(readable[kState].controller); + reject(readable[kState].storedError); + else { + readableStreamDefaultControllerClose(readable[kState].controller); + resolve(); + } }, (error) => { - transformStreamError(stream, error); - throw readable[kState].storedError; + readableStreamDefaultControllerError(readable[kState].controller, error); + reject(error); }); + return controller[kState].finishPromise; } function transformStreamDefaultSourcePullAlgorithm(stream) { @@ -598,6 +651,48 @@ function transformStreamDefaultSourcePullAlgorithm(stream) { return stream[kState].backpressureChange.promise; } +function transformStreamDefaultSourceCancelAlgorithm(stream, reason) { + const { + controller, + writable, + } = stream[kState]; + + if (controller[kState].finishPromise !== undefined) { + return controller[kState].finishPromise; + } + + const { promise, resolve, reject } = createDeferredPromise(); + controller[kState].finishPromise = promise; + const cancelPromise = ensureIsPromise( + controller[kState].cancelAlgorithm, + controller, + reason); + transformStreamDefaultControllerClearAlgorithms(controller); + + PromisePrototypeThen(cancelPromise, + () => { + if (writable[kState].state === 'errored') + reject(writable[kState].storedError); + else { + writableStreamDefaultControllerErrorIfNeeded( + writable[kState].controller, + reason); + transformStreamUnblockWrite(stream); + resolve(); + } + }, + (error) => { + writableStreamDefaultControllerErrorIfNeeded( + writable[kState].controller, + error); + transformStreamUnblockWrite(stream); + reject(error); + } + ); + + return controller[kState].finishPromise +} + module.exports = { TransformStream, TransformStreamDefaultController,