Skip to content

Commit

Permalink
stream: implement TransformStream cleanup using "transformer.cancel"
Browse files Browse the repository at this point in the history
Fixes: nodejs#49971
PR-URL: nodejs#50126
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
debadree25 authored and MattiasBuelens committed May 1, 2024
1 parent c530520 commit 10a0718
Show file tree
Hide file tree
Showing 72 changed files with 418 additions and 94 deletions.
5 changes: 3 additions & 2 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const {
ObjectCreate,
ObjectDefineProperties,
ObjectSetPrototypeOf,
Promise,
PromisePrototypeThen,
PromiseResolve,
PromiseReject,
Expand Down Expand Up @@ -2441,7 +2442,7 @@ function setupReadableStreamDefaultController(
const startResult = startAlgorithm();

PromisePrototypeThen(
PromiseResolve(startResult),
new Promise((r) => r(startResult)),
() => {
controller[kState].started = true;
assert(!controller[kState].pulling);
Expand Down Expand Up @@ -3240,7 +3241,7 @@ function setupReadableByteStreamController(
const startResult = startAlgorithm();

PromisePrototypeThen(
PromiseResolve(startResult),
new Promise((r) => r(startResult)),
() => {
controller[kState].started = true;
assert(!controller[kState].pulling);
Expand Down
116 changes: 105 additions & 11 deletions lib/internal/webstreams/transformstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const {
FunctionPrototypeCall,
ObjectDefineProperties,
PromisePrototypeThen,
PromiseResolve,
ReflectConstruct,
SymbolToStringTag,
Symbol,
Expand Down Expand Up @@ -47,6 +46,7 @@ const {
nonOpFlush,
kType,
kState,
nonOpCancel,
} = require('internal/webstreams/util');

const {
Expand Down Expand Up @@ -377,8 +377,7 @@ function initializeTransformStream(
return transformStreamDefaultSourcePullAlgorithm(stream);
},
cancel(reason) {
transformStreamErrorWritableAndUnblockWrite(stream, reason);
return PromiseResolve();
return transformStreamDefaultSourceCancelAlgorithm(stream, reason);
},
}, {
highWaterMark: readableHighWaterMark,
Expand Down Expand Up @@ -420,6 +419,10 @@ function transformStreamErrorWritableAndUnblockWrite(stream, error) {
writableStreamDefaultControllerErrorIfNeeded(
writable[kState].controller,
error);
transformStreamUnblockWrite(stream);
}

function transformStreamUnblockWrite(stream) {
if (stream[kState].backpressure)
transformStreamSetBackpressure(stream, false);
}
Expand All @@ -436,13 +439,15 @@ function setupTransformStreamDefaultController(
stream,
controller,
transformAlgorithm,
flushAlgorithm) {
flushAlgorithm,
cancelAlgorithm) {
assert(isTransformStream(stream));
assert(stream[kState].controller === undefined);
controller[kState] = {
stream,
transformAlgorithm,
flushAlgorithm,
cancelAlgorithm,
};
stream[kState].controller = controller;
}
Expand All @@ -453,21 +458,26 @@ function setupTransformStreamDefaultControllerFromTransformer(
const controller = new TransformStreamDefaultController(kSkipThrow);
const transform = transformer?.transform || defaultTransformAlgorithm;
const flush = transformer?.flush || nonOpFlush;
const cancel = transformer?.cancel || nonOpCancel;
const transformAlgorithm =
FunctionPrototypeBind(transform, transformer);
const flushAlgorithm =
FunctionPrototypeBind(flush, transformer);
const cancelAlgorithm =
FunctionPrototypeBind(cancel, transformer);

setupTransformStreamDefaultController(
stream,
controller,
transformAlgorithm,
flushAlgorithm);
flushAlgorithm,
cancelAlgorithm);
}

function transformStreamDefaultControllerClearAlgorithms(controller) {
controller[kState].transformAlgorithm = undefined;
controller[kState].flushAlgorithm = undefined;
controller[kState].cancelAlgorithm = undefined;
}

function transformStreamDefaultControllerEnqueue(controller, chunk) {
Expand Down Expand Up @@ -556,7 +566,40 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
}

async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
transformStreamError(stream, reason);
const {
controller,
readable,
} = stream[kState];

if (controller[kState].finishPromise !== undefined) {
return controller[kState].finishPromise;
}

const { promise, resolve, reject } = createDeferredPromise();
controller[kState].finishPromise = promise;
const cancelPromise = ensureIsPromise(
controller[kState].cancelAlgorithm,
controller,
reason);
transformStreamDefaultControllerClearAlgorithms(controller);

PromisePrototypeThen(
cancelPromise,
() => {
if (readable[kState].state === 'errored')
reject(readable[kState].storedError);
else {
readableStreamDefaultControllerError(readable[kState].controller, reason);
resolve();
}
},
(error) => {
readableStreamDefaultControllerError(readable[kState].controller, error);
reject(error);
},
);

return controller[kState].finishPromise;
}

function transformStreamDefaultSinkCloseAlgorithm(stream) {
Expand All @@ -565,23 +608,32 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) {
controller,
} = stream[kState];

if (controller[kState].finishPromise !== undefined) {
return controller[kState].finishPromise;
}
const { promise, resolve, reject } = createDeferredPromise();
controller[kState].finishPromise = promise;
const flushPromise =
ensureIsPromise(
controller[kState].flushAlgorithm,
controller,
controller);
transformStreamDefaultControllerClearAlgorithms(controller);
return PromisePrototypeThen(
PromisePrototypeThen(
flushPromise,
() => {
if (readable[kState].state === 'errored')
throw readable[kState].storedError;
readableStreamDefaultControllerClose(readable[kState].controller);
reject(readable[kState].storedError);
else {
readableStreamDefaultControllerClose(readable[kState].controller);
resolve();
}
},
(error) => {
transformStreamError(stream, error);
throw readable[kState].storedError;
readableStreamDefaultControllerError(readable[kState].controller, error);
reject(error);
});
return controller[kState].finishPromise;
}

function transformStreamDefaultSourcePullAlgorithm(stream) {
Expand All @@ -591,6 +643,48 @@ function transformStreamDefaultSourcePullAlgorithm(stream) {
return stream[kState].backpressureChange.promise;
}

function transformStreamDefaultSourceCancelAlgorithm(stream, reason) {
const {
controller,
writable,
} = stream[kState];

if (controller[kState].finishPromise !== undefined) {
return controller[kState].finishPromise;
}

const { promise, resolve, reject } = createDeferredPromise();
controller[kState].finishPromise = promise;
const cancelPromise = ensureIsPromise(
controller[kState].cancelAlgorithm,
controller,
reason);
transformStreamDefaultControllerClearAlgorithms(controller);

PromisePrototypeThen(cancelPromise,
() => {
if (writable[kState].state === 'errored')
reject(writable[kState].storedError);
else {
writableStreamDefaultControllerErrorIfNeeded(
writable[kState].controller,
reason);
transformStreamUnblockWrite(stream);
resolve();
}
},
(error) => {
writableStreamDefaultControllerErrorIfNeeded(
writable[kState].controller,
error);
transformStreamUnblockWrite(stream);
reject(error);
},
);

return controller[kState].finishPromise;
}

module.exports = {
TransformStream,
TransformStreamDefaultController,
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const {
FunctionPrototypeBind,
FunctionPrototypeCall,
ObjectDefineProperties,
Promise,
PromisePrototypeThen,
PromiseResolve,
PromiseReject,
Expand Down Expand Up @@ -1290,7 +1291,7 @@ function setupWritableStreamDefaultController(
const startResult = startAlgorithm();

PromisePrototypeThen(
PromiseResolve(startResult),
new Promise((r) => r(startResult)),
() => {
assert(stream[kState].state === 'writable' ||
stream[kState].state === 'erroring');
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/wpt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Last update:
- performance-timeline: https://github.com/web-platform-tests/wpt/tree/17ebc3aea0/performance-timeline
- resource-timing: https://github.com/web-platform-tests/wpt/tree/22d38586d0/resource-timing
- resources: https://github.com/web-platform-tests/wpt/tree/919874f84f/resources
- streams: https://github.com/web-platform-tests/wpt/tree/517e945bbf/streams
- streams: https://github.com/web-platform-tests/wpt/tree/a8872d92b1/streams
- url: https://github.com/web-platform-tests/wpt/tree/c2d7e70b52/url
- user-timing: https://github.com/web-platform-tests/wpt/tree/5ae85bf826/user-timing
- wasm/jsapi: https://github.com/web-platform-tests/wpt/tree/cde25e7e3c/wasm/jsapi
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/wpt/streams/piping/abort.any.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
// META: script=../resources/recording-streams.js
// META: script=../resources/test-utils.js
'use strict';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
// META: script=../resources/recording-streams.js
'use strict';

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
// META: script=../resources/test-utils.js
// META: script=../resources/recording-streams.js
'use strict';
Expand Down
12 changes: 12 additions & 0 deletions test/fixtures/wpt/streams/piping/crashtests/cross-piping.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<!DOCTYPE html>
<script type="module">
let a = new ReadableStream();
let b = self.open()
let f = new b.WritableStream();
a.pipeThrough(
{ "readable": a, "writable": f },
{ "signal": AbortSignal.abort() }
)
await new Promise(setTimeout);
structuredClone(undefined, { "transfer": [f] })
</script>
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
// META: script=../resources/test-utils.js
// META: script=../resources/recording-streams.js
'use strict';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
// META: script=../resources/test-utils.js
// META: script=../resources/recording-streams.js
'use strict';
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/wpt/streams/piping/flow-control.any.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
// META: script=../resources/test-utils.js
// META: script=../resources/rs-utils.js
// META: script=../resources/recording-streams.js
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/wpt/streams/piping/general-addition.any.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
'use strict';

promise_test(async t => {
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/wpt/streams/piping/general.any.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
// META: script=../resources/test-utils.js
// META: script=../resources/recording-streams.js
'use strict';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
// META: script=../resources/test-utils.js
// META: script=../resources/recording-streams.js
'use strict';
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/wpt/streams/piping/pipe-through.any.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
// META: script=../resources/rs-utils.js
// META: script=../resources/test-utils.js
// META: script=../resources/recording-streams.js
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/wpt/streams/piping/then-interception.any.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
// META: script=../resources/test-utils.js
// META: script=../resources/recording-streams.js
'use strict';
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/wpt/streams/piping/throwing-options.any.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
'use strict';

class ThrowingOptions {
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/wpt/streams/piping/transform-streams.any.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
'use strict';

promise_test(() => {
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/wpt/streams/queuing-strategies.any.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
'use strict';

const highWaterMarkConversions = new Map([
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
'use strict';

promise_test(() => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
// META: script=../resources/rs-utils.js
'use strict';

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm

promise_test(async t => {
const error = new Error('cannot proceed');
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
// META: script=../resources/rs-utils.js
// META: script=../resources/test-utils.js
'use strict';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm
'use strict';

promise_test(async t => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm

'use strict';

Expand Down
Loading

0 comments on commit 10a0718

Please sign in to comment.