Skip to content

Commit

Permalink
fs, stream: initial Symbol.dispose and Symbol.asyncDispose support
Browse files Browse the repository at this point in the history
Co-authored-by: Benjamin Gruenbaum <benjamingr@gmail.com>
PR-URL: #48518
Backport-PR-URL: #49598
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Erick Wendel <erick.workspace@gmail.com>
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
2 people authored and ruyadorno committed Sep 11, 2023
1 parent 404f3ab commit 6747721
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 0 deletions.
10 changes: 10 additions & 0 deletions doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,16 @@ On Linux, positional writes don't work when the file is opened in append mode.
The kernel ignores the position argument and always appends the data to
the end of the file.
#### `filehandle[Symbol.asyncDispose]()`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
An alias for `filehandle.close()`.
### `fsPromises.access(path[, mode])`
<!-- YAML
Expand Down
11 changes: 11 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1902,6 +1902,17 @@ option. In the code example above, data will be in a single chunk if the file
has less then 64 KiB of data because no `highWaterMark` option is provided to
[`fs.createReadStream()`][].

##### `readable[Symbol.asyncDispose]()`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
Calls [`readable.destroy()`][readable-destroy] with an `AbortError` and returns
a promise that fulfills when the stream is finished.

##### `readable.compose(stream[, options])`

<!-- YAML
Expand Down
5 changes: 5 additions & 0 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const {
SafeArrayIterator,
SafePromisePrototypeFinally,
Symbol,
SymbolAsyncDispose,
Uint8Array,
FunctionPrototypeBind,
} = primordials;
Expand Down Expand Up @@ -241,6 +242,10 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
return this[kClosePromise];
};

async [SymbolAsyncDispose]() {
return this.close();
}

/**
* @typedef {import('../webstreams/readablestream').ReadableStream
* } ReadableStream
Expand Down
6 changes: 6 additions & 0 deletions lib/internal/per_context/primordials.js
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ function copyPrototype(src, dest, prefix) {
copyPrototype(original.prototype, primordials, `${name}Prototype`);
});

// Define Symbol.Dispose and Symbol.AsyncDispose
// Until these are defined by the environment.
// TODO(MoLow): Remove this polyfill once Symbol.dispose and Symbol.asyncDispose are available in V8.
primordials.SymbolDispose ??= primordials.SymbolFor('nodejs.dispose');
primordials.SymbolAsyncDispose ??= primordials.SymbolFor('nodejs.asyncDispose');

// Create copies of intrinsic objects that require a valid `this` to call
// static methods.
// Refs: https://www.ecma-international.org/ecma-262/#sec-promise.all
Expand Down
13 changes: 13 additions & 0 deletions lib/internal/process/pre_execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ const {
SafeMap,
SafeWeakMap,
StringPrototypeStartsWith,
Symbol,
SymbolDispose,
SymbolAsyncDispose,
globalThis,
} = primordials;

Expand Down Expand Up @@ -72,6 +75,8 @@ function prepareExecution(options) {
initializeDeprecations();
require('internal/dns/utils').initializeDns();

setupSymbolDisposePolyfill();

if (isMainThread) {
assert(internalBinding('worker').isMainThread);
// Worker threads will get the manifest in the message handler.
Expand Down Expand Up @@ -109,6 +114,14 @@ function prepareExecution(options) {
}
}

function setupSymbolDisposePolyfill() {
// TODO(MoLow): Remove this polyfill once Symbol.dispose and Symbol.asyncDispose are available in V8.
// eslint-disable-next-line node-core/prefer-primordials
Symbol.dispose ??= SymbolDispose;
// eslint-disable-next-line node-core/prefer-primordials
Symbol.asyncDispose ??= SymbolAsyncDispose;
}

function setupUserModules() {
initializeCJSLoader();
initializeESMLoader();
Expand Down
11 changes: 11 additions & 0 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const {
ObjectSetPrototypeOf,
Promise,
SafeSet,
SymbolAsyncDispose,
SymbolAsyncIterator,
Symbol,
} = primordials;
Expand Down Expand Up @@ -66,6 +67,7 @@ const {
ERR_STREAM_PUSH_AFTER_EOF,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
},
AbortError,
} = require('internal/errors');
const { validateObject } = require('internal/validators');

Expand Down Expand Up @@ -226,6 +228,15 @@ Readable.prototype[EE.captureRejectionSymbol] = function(err) {
this.destroy(err);
};

Readable.prototype[SymbolAsyncDispose] = function() {
let error;
if (!this.destroyed) {
error = this.readableEnded ? null : new AbortError();
this.destroy(error);
}
return new Promise((resolve, reject) => eos(this, (err) => (err && err !== error ? reject(err) : resolve(null))));
};

// Manually shove something into the read() buffer.
// This returns true if the highWaterMark has not been hit yet,
// similar to how Writable.write() returns true if you should
Expand Down
12 changes: 12 additions & 0 deletions test/parallel/test-fs-promises-file-handle-dispose.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict';

const common = require('../common');
const { promises: fs } = require('fs');

async function doOpen() {
const fh = await fs.open(__filename);
fh.on('close', common.mustCall());
await fh[Symbol.asyncDispose]();
}

doOpen().then(common.mustCall());
23 changes: 23 additions & 0 deletions test/parallel/test-stream-readable-dispose.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
'use strict';

const common = require('../common');
const { Readable } = require('stream');
const assert = require('assert');

{
const read = new Readable({
read() {}
});
read.resume();

read.on('end', common.mustNotCall('no end event'));
read.on('close', common.mustCall());
read.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));

read[Symbol.asyncDispose]().then(common.mustCall(() => {
assert.strictEqual(read.errored.name, 'AbortError');
assert.strictEqual(read.destroyed, true);
}));
}
2 changes: 2 additions & 0 deletions typings/primordials.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ declare namespace primordials {
export const SymbolFor: typeof Symbol.for
export const SymbolKeyFor: typeof Symbol.keyFor
export const SymbolAsyncIterator: typeof Symbol.asyncIterator
export const SymbolDispose: typeof Symbol // TODO(MoLow): use typeof Symbol.dispose when it's available
export const SymbolAsyncDispose: typeof Symbol // TODO(MoLow): use typeof Symbol.asyncDispose when it's available
export const SymbolHasInstance: typeof Symbol.hasInstance
export const SymbolIsConcatSpreadable: typeof Symbol.isConcatSpreadable
export const SymbolIterator: typeof Symbol.iterator
Expand Down

0 comments on commit 6747721

Please sign in to comment.