Skip to content

Commit

Permalink
streams: implement TransformStream cleanup using "transformer.cancel"
Browse files Browse the repository at this point in the history
  • Loading branch information
debadree25 committed Oct 23, 2023
1 parent 8e814e3 commit aac37a2
Showing 1 changed file with 105 additions and 10 deletions.
115 changes: 105 additions & 10 deletions lib/internal/webstreams/transformstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const {
nonOpFlush,
kType,
kState,
nonOpCancel,
} = require('internal/webstreams/util');

const {
Expand Down Expand Up @@ -384,8 +385,7 @@ function initializeTransformStream(
return transformStreamDefaultSourcePullAlgorithm(stream);
},
cancel(reason) {
transformStreamErrorWritableAndUnblockWrite(stream, reason);
return PromiseResolve();
return transformStreamDefaultSourceCancelAlgorithm(stream, reason);
},
}, {
highWaterMark: readableHighWaterMark,
Expand Down Expand Up @@ -427,6 +427,10 @@ function transformStreamErrorWritableAndUnblockWrite(stream, error) {
writableStreamDefaultControllerErrorIfNeeded(
writable[kState].controller,
error);
transformStreamUnblockWrite(stream);
}

function transformStreamUnblockWrite(stream) {
if (stream[kState].backpressure)
transformStreamSetBackpressure(stream, false);
}
Expand All @@ -443,13 +447,15 @@ function setupTransformStreamDefaultController(
stream,
controller,
transformAlgorithm,
flushAlgorithm) {
flushAlgorithm,
cancelAlgorithm) {
assert(isTransformStream(stream));
assert(stream[kState].controller === undefined);
controller[kState] = {
stream,
transformAlgorithm,
flushAlgorithm,
cancelAlgorithm,
};
stream[kState].controller = controller;
}
Expand All @@ -460,21 +466,26 @@ function setupTransformStreamDefaultControllerFromTransformer(
const controller = new TransformStreamDefaultController(kSkipThrow);
const transform = transformer?.transform || defaultTransformAlgorithm;
const flush = transformer?.flush || nonOpFlush;
const cancel = transformer?.cancel || nonOpCancel;
const transformAlgorithm =
FunctionPrototypeBind(transform, transformer);
const flushAlgorithm =
FunctionPrototypeBind(flush, transformer);
const cancelAlgorithm =
FunctionPrototypeBind(cancel, transformer);

setupTransformStreamDefaultController(
stream,
controller,
transformAlgorithm,
flushAlgorithm);
flushAlgorithm,
cancelAlgorithm);
}

function transformStreamDefaultControllerClearAlgorithms(controller) {
controller[kState].transformAlgorithm = undefined;
controller[kState].flushAlgorithm = undefined;
controller[kState].cancelAlgorithm = undefined;
}

function transformStreamDefaultControllerEnqueue(controller, chunk) {
Expand Down Expand Up @@ -563,7 +574,40 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
}

async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
transformStreamError(stream, reason);
const {
controller,
readable,
} = stream[kState];

if (controller[kState].finishPromise !== undefined) {
return controller[kState].finishPromise
}

const { promise, resolve, reject } = createDeferredPromise();
controller[kState].finishPromise = promise;
const cancelPromise = ensureIsPromise(
controller[kState].cancelAlgorithm,
controller,
reason);
transformStreamDefaultControllerClearAlgorithms(controller);

PromisePrototypeThen(
cancelPromise,
() => {
if (readable[kState].state === 'errored')
reject(readable[kState].storedError);
else {
readableStreamDefaultControllerError(readable[kState].controller, reason);
resolve();
}
},
(error) => {
readableStreamDefaultControllerError(readable[kState].controller, error);
reject(error);
}
);

return controller[kState].finishPromise;
}

function transformStreamDefaultSinkCloseAlgorithm(stream) {
Expand All @@ -572,23 +616,32 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) {
controller,
} = stream[kState];

if (controller[kState].finishPromise !== undefined) {
return controller[kState].finishPromise
}
const { promise, resolve, reject } = createDeferredPromise();
controller[kState].finishPromise = promise;
const flushPromise =
ensureIsPromise(
controller[kState].flushAlgorithm,
controller,
controller);
transformStreamDefaultControllerClearAlgorithms(controller);
return PromisePrototypeThen(
PromisePrototypeThen(
flushPromise,
() => {
if (readable[kState].state === 'errored')
throw readable[kState].storedError;
readableStreamDefaultControllerClose(readable[kState].controller);
reject(readable[kState].storedError);
else {
readableStreamDefaultControllerClose(readable[kState].controller);
resolve();
}
},
(error) => {
transformStreamError(stream, error);
throw readable[kState].storedError;
readableStreamDefaultControllerError(readable[kState].controller, error);
reject(error);
});
return controller[kState].finishPromise;
}

function transformStreamDefaultSourcePullAlgorithm(stream) {
Expand All @@ -598,6 +651,48 @@ function transformStreamDefaultSourcePullAlgorithm(stream) {
return stream[kState].backpressureChange.promise;
}

function transformStreamDefaultSourceCancelAlgorithm(stream, reason) {
const {
controller,
writable,
} = stream[kState];

if (controller[kState].finishPromise !== undefined) {
return controller[kState].finishPromise;
}

const { promise, resolve, reject } = createDeferredPromise();
controller[kState].finishPromise = promise;
const cancelPromise = ensureIsPromise(
controller[kState].cancelAlgorithm,
controller,
reason);
transformStreamDefaultControllerClearAlgorithms(controller);

PromisePrototypeThen(cancelPromise,
() => {
if (writable[kState].state === 'errored')
reject(writable[kState].storedError);
else {
writableStreamDefaultControllerErrorIfNeeded(
writable[kState].controller,
reason);
transformStreamUnblockWrite(stream);
resolve();
}
},
(error) => {
writableStreamDefaultControllerErrorIfNeeded(
writable[kState].controller,
error);
transformStreamUnblockWrite(stream);
reject(error);
}
);

return controller[kState].finishPromise
}

module.exports = {
TransformStream,
TransformStreamDefaultController,
Expand Down

0 comments on commit aac37a2

Please sign in to comment.