Skip to content

Commit

Permalink
fixup! fs: add FileHandle.prototype.readableWebStream()
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Jul 12, 2021
1 parent ab0107f commit f40780c
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 2 deletions.
8 changes: 8 additions & 0 deletions doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ const file = await open('./some/file/to/read');

for await (const chunk of file.readableWebStream())
console.log(chunk);

await file.close();
```
```cjs
Expand All @@ -337,9 +339,15 @@ const {

for await (const chunk of file.readableWebStream())
console.log(chunk);

await file.close();
})();
```
While the `ReadableStream` will read the file to completion, it will not
close the `FileHandle` automatically. User code must still call the
`fileHandle.close()` method.
#### `filehandle.readFile(options)`
<!-- YAML
added: v10.0.0
Expand Down
17 changes: 16 additions & 1 deletion lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ const {
newReadableStreamFromStreamBase,
} = require('internal/webstreams/adapters');

const {
readableStreamCancel,
} = require('internal/webstreams/readablestream');

const getDirectoryEntriesPromise = promisify(getDirents);
const validateRmOptionsPromise = promisify(validateRmOptions);

Expand Down Expand Up @@ -228,7 +232,18 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
if (this[kLocked])
throw new ERR_INVALID_STATE('The FileHandle is locked');
this[kLocked] = true;
return newReadableStreamFromStreamBase(this[kHandle]);

const readable = newReadableStreamFromStreamBase(
this[kHandle],
undefined,
{ ondone: () => this[kUnref]() });

this[kRef]();
this.once('close', () => {
readableStreamCancel(readable);
});

return readable;
}

[kTransfer]() {
Expand Down
21 changes: 20 additions & 1 deletion lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -859,12 +859,20 @@ function newWritableStreamFromStreamBase(streamBase, strategy) {
* @param {QueuingStrategy} strategy
* @returns {ReadableStream}
*/
function newReadableStreamFromStreamBase(streamBase, strategy) {
function newReadableStreamFromStreamBase(streamBase, strategy, options = {}) {
validateObject(streamBase, 'streamBase');
validateObject(options, 'options');

const {
ondone = () => {},
} = options;

if (typeof streamBase.onread === 'function')
throw new ERR_INVALID_STATE('StreamBase already has a consumer');

if (typeof ondone !== 'function')
throw new ERR_INVALID_ARG_TYPE('options.ondone', 'Function', ondone);

let controller;

streamBase.onread = (arrayBuffer) => {
Expand All @@ -877,6 +885,11 @@ function newReadableStreamFromStreamBase(streamBase, strategy) {
if (nread === UV_EOF) {
controller.close();
streamBase.readStop();
try {
ondone();
} catch (error) {
controller.error(error);
}
return;
}

Expand All @@ -899,6 +912,12 @@ function newReadableStreamFromStreamBase(streamBase, strategy) {

cancel() {
const promise = createDeferredPromise();
try {
ondone();
} catch (error) {
promise.reject(error);
return promise.promise;
}
const req = new ShutdownWrap();
req.oncomplete = () => promise.resolve();
const err = streamBase.shutdown(req);
Expand Down
39 changes: 39 additions & 0 deletions test/parallel/test-filehandle-readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const {

const check = readFileSync(__filename, { encoding: 'utf8' });

// Make sure the ReadableStream works...
(async () => {
const dec = new TextDecoder();
const file = await open(__filename);
Expand All @@ -29,6 +30,8 @@ const check = readFileSync(__filename, { encoding: 'utf8' });
await file.close();
})().then(common.mustCall());

// Make sure that acquiring a ReadableStream fails if the
// FileHandle is already closed.
(async () => {
const file = await open(__filename);
await file.close();
Expand All @@ -38,6 +41,8 @@ const check = readFileSync(__filename, { encoding: 'utf8' });
});
})().then(common.mustCall());

// Make sure that acquiring a ReadableStream fails if the
// FileHandle is already closing.
(async () => {
const file = await open(__filename);
file.close();
Expand All @@ -46,3 +51,37 @@ const check = readFileSync(__filename, { encoding: 'utf8' });
code: 'ERR_INVALID_STATE',
});
})().then(common.mustCall());

// Make sure the ReadableStream is closed when the underlying
// FileHandle is closed.
(async () => {
const file = await open(__filename);
const readable = file.readableWebStream();
const reader = readable.getReader();
file.close();
await reader.closed;
})().then(common.mustCall());

// Make sure the ReadableStream is closed when the underlying
// FileHandle is closed.
(async () => {
const file = await open(__filename);
const readable = file.readableWebStream();
file.close();
const reader = readable.getReader();
await reader.closed;
})().then(common.mustCall());

// Make sure that the FileHandle is properly marked "in use"
// when a ReadableStream has been acquired for it.
(async () => {
const file = await open(__filename);
file.readableWebStream();
const mc = new MessageChannel();
mc.port1.onmessage = common.mustNotCall();
assert.throws(() => mc.port2.postMessage(file, [file]), {
code: 25 // DataCloneError
});
mc.port1.close();
await file.close();
})().then(common.mustCall());

0 comments on commit f40780c

Please sign in to comment.