Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs, stream: add initial Symbol.dispose and Symbol.asyncDispose support #48518

Merged
merged 10 commits into from
Jun 25, 2023
8 changes: 8 additions & 0 deletions doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,14 @@ On Linux, positional writes don't work when the file is opened in append mode.
The kernel ignores the position argument and always appends the data to
the end of the file.

#### `filehandle[Symbol.asyncDispose]()`
MoLow marked this conversation as resolved.
Show resolved Hide resolved

<!-- YAML
added: REPLACEME
-->

An alias for `filehandle.close()`.

### `fsPromises.access(path[, mode])`

<!-- YAML
Expand Down
8 changes: 8 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1904,6 +1904,14 @@ option. In the code example above, data will be in a single chunk if the file
has less then 64 KiB of data because no `highWaterMark` option is provided to
[`fs.createReadStream()`][].

##### `readable[Symbol.asyncDispose]()`
MoLow marked this conversation as resolved.
Show resolved Hide resolved

<!-- YAML
added: REPLACEME
-->

An alias for [`readable.destroy()`][readable-destroy] with an `AbortError`.
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
benjamingr marked this conversation as resolved.
Show resolved Hide resolved

##### `readable.compose(stream[, options])`

<!-- YAML
Expand Down
5 changes: 5 additions & 0 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const {
SafeArrayIterator,
SafePromisePrototypeFinally,
Symbol,
SymbolAsyncDispose,
Uint8Array,
FunctionPrototypeBind,
} = primordials;
Expand Down Expand Up @@ -246,6 +247,10 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
return this[kClosePromise];
};

async [SymbolAsyncDispose]() {
return this.close();
}

/**
* @typedef {import('../webstreams/readablestream').ReadableStream
* } ReadableStream
Expand Down
5 changes: 5 additions & 0 deletions lib/internal/per_context/primordials.js
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ function copyPrototype(src, dest, prefix) {
copyPrototype(original.prototype, primordials, `${name}Prototype`);
});

// Define Symbol.Disposed and Symbol.AsyncDispose
// Until these are defined by the environment.
MoLow marked this conversation as resolved.
Show resolved Hide resolved
primordials.SymbolDispose ??= primordials.SymbolFor('nodejs.dispose');
MoLow marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this mean Symbol.keyFor of these symbols won't properly return undefined?

primordials.SymbolAsyncDispose ??= primordials.SymbolFor('nodejs.asyncDispose');

// Create copies of intrinsic objects that require a valid `this` to call
// static methods.
// Refs: https://www.ecma-international.org/ecma-262/#sec-promise.all
Expand Down
12 changes: 12 additions & 0 deletions lib/internal/process/pre_execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ const {
ObjectGetOwnPropertyDescriptor,
SafeMap,
StringPrototypeStartsWith,
Symbol,
SymbolDispose,
SymbolAsyncDispose,
globalThis,
} = primordials;

Expand Down Expand Up @@ -82,6 +85,8 @@ function prepareExecution(options) {

require('internal/dns/utils').initializeDns();

setupSymbolDisposePolyfill();

if (isMainThread) {
assert(internalBinding('worker').isMainThread);
// Worker threads will get the manifest in the message handler.
Expand Down Expand Up @@ -119,6 +124,13 @@ function prepareExecution(options) {
}
}

function setupSymbolDisposePolyfill() {
MoLow marked this conversation as resolved.
Show resolved Hide resolved
// eslint-disable-next-line node-core/prefer-primordials
Symbol.dispose ??= SymbolDispose;
// eslint-disable-next-line node-core/prefer-primordials
Symbol.asyncDispose ??= SymbolAsyncDispose;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be a good thing to polyfill them? Wouldn't users assume certain behaviors from the runtime because they exists?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a way for typescript or bable to recognize these symbols without them being global.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina this is confirmed (with the TS team) to play nice. Users would be able to use them with the polyfill but not the syntactic sugar until v8 ships

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That breaks core-js.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

function setupUserModules(isLoaderWorker = false) {
initializeCJSLoader();
initializeESMLoader(isLoaderWorker);
Expand Down
11 changes: 11 additions & 0 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const {
ObjectSetPrototypeOf,
Promise,
SafeSet,
SymbolAsyncDispose,
SymbolAsyncIterator,
Symbol,
} = primordials;
Expand Down Expand Up @@ -67,6 +68,7 @@ const {
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
ERR_UNKNOWN_ENCODING,
},
AbortError,
} = require('internal/errors');
const { validateObject } = require('internal/validators');

Expand Down Expand Up @@ -234,6 +236,15 @@ Readable.prototype[EE.captureRejectionSymbol] = function(err) {
this.destroy(err);
};

Readable.prototype[SymbolAsyncDispose] = function() {
let error;
if (!this.destroyed) {
error = this.readableEnded ? null : new AbortError();
this.destroy(error);
}
return new Promise((resolve, reject) => eos(this, (err) => (err && err !== error ? reject(err) : resolve(null))));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the outcome of the following?

await readable[Symbol.asyncDispose]();
await readable[Symbol.asyncDispose]();

Ideally, the second call should be a noop, but it seems like the second call could actually throw the AbortError recorded by the first call, since the AbortError is only ignored in the first call.

For reference, here is the guidance from the spec related to how a Symbol.asyncDispose method should behave:

If called more than once on the same object, the function should not throw an exception. However, this requirement is not enforced.

Copy link
Member

@benjamingr benjamingr Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second call would not call this.destroy since this.destroyed would be true and thus would not cause any side effects and reject/fulfill with the same way as the first one. So:

await readable[Symbol.asyncDispose](); // destroys the stream
await readable[Symbol.asyncDispose](); // a no-op

The only difference is if the stream itself was destroyed with an error (e.g. the first call was unable to destroy it correctly) the second call would also reject with the same error as the first one.

readable.destroy(ERR_COULD_NOT_RELEASE_CONNECTION); // or some such error
await readable[Symbol.asyncDispose](); // rejects with ERR_COULD_NOT_RELEASE_CONNECTION

The AbortError is filtered out but is still needed since someone may be iterating the stream while we are disposing it and from their point of view an error happened.

Another thing to consider is two concurrent calls too readable[Symbol.asyncDispose](); for example:

await Promise.all([readable[Symbol.asyncDispose](), readable[Symbol.asyncDispose]()]);

In that case I think it's possible the error is emitted twice which may be a bug, but I think it's fine since .destroy sets .destroyed synchronously IIRC - what do you think @ronag , should be fine ?

Copy link

@rbuckton rbuckton Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining. I wanted to verify because it seemed that, if eos could emit the AbortError from the call to this.destroy(), then it would potentially be possible to observe that AbortError out of band in either a sequential or concurrent call.

It might make sense to have tests for these two cases, at least, to verify these expectations:

// test 1: verify sequential calls to [Symbol.asyncDispose]()
await readable[Symbol.asyncDispose]();
await readable[Symbol.asyncDispose]();

// test 2: verify concurrent calls to [Symbol.asyncDispose]();
await Promise.all([readable[Symbol.asyncDispose](), readable[Symbol.asyncDispose]()]);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rbuckton wanna open a PR? If you do, the easiest would be to add it to test/parallel/test-stream-readable-dispose.js and you don't have to build node for it you can use a nightly since it's (hopefully) a passing test.

It could be aa good opportunity to practice contributing code to Ndoe :)

Otherwise I'll add the test when I tackle the next stream API. Would this sort of test make sense for any disposable? Is there a list of principles we should follow?

};

// Manually shove something into the read() buffer.
// This returns true if the highWaterMark has not been hit yet,
// similar to how Writable.write() returns true if you should
Expand Down
12 changes: 12 additions & 0 deletions test/parallel/test-fs-promises-file-handle-dispose.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict';

const common = require('../common');
const { promises: fs } = require('fs');

async function doOpen() {
const fh = await fs.open(__filename);
fh.on('close', common.mustCall());
await fh[Symbol.asyncDispose]();
}

doOpen().then(common.mustCall());
23 changes: 23 additions & 0 deletions test/parallel/test-stream-readable-dispose.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
'use strict';

const common = require('../common');
const { Readable } = require('stream');
const assert = require('assert');

{
const read = new Readable({
read() {}
});
read.resume();

read.on('end', common.mustNotCall('no end event'));
read.on('close', common.mustCall());
read.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));

read[Symbol.asyncDispose]().then(common.mustCall(() => {
assert.strictEqual(read.errored.name, 'AbortError');
assert.strictEqual(read.destroyed, true);
}));
}
2 changes: 2 additions & 0 deletions typings/primordials.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ declare namespace primordials {
export const SymbolFor: typeof Symbol.for
export const SymbolKeyFor: typeof Symbol.keyFor
export const SymbolAsyncIterator: typeof Symbol.asyncIterator
export const SymbolDispose: typeof Symbol
export const SymbolAsyncDispose: typeof Symbol
MoLow marked this conversation as resolved.
Show resolved Hide resolved
export const SymbolHasInstance: typeof Symbol.hasInstance
export const SymbolIsConcatSpreadable: typeof Symbol.isConcatSpreadable
export const SymbolIterator: typeof Symbol.iterator
Expand Down