Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: implement DataQueue and non-memory resident Blob #45258

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -3324,6 +3324,45 @@ a colon, Node.js will open a file system stream, as described by
Functions based on `fs.open()` exhibit this behavior as well:
`fs.writeFile()`, `fs.readFile()`, etc.

### `fs.openAsBlob(path[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `path` {string|Buffer|URL}
* `options` {Object}
* `type` {string} An optional mime type for the blob.
* Return: {Promise} containing {Blob}

Returns a {Blob} whose data is backed by the given file.
jasnell marked this conversation as resolved.
Show resolved Hide resolved

The file must not be modified after the {Blob} is created. Any modifications
will cause reading the {Blob} data to fail with a `DOMException`.
error. Synchronous stat operations on the file when the `Blob` is created, and
before each read in order to detect whether the file data has been modified
on disk.

```mjs
import { openAsBlob } from 'node:fs';

const blob = await openAsBlob('the.file.txt');
const ab = await blob.arrayBuffer();
blob.stream();
```

```cjs
const { openAsBlob } = require('node:fs');

(async () => {
const blob = await openAsBlob('the.file.txt');
const ab = await blob.arrayBuffer();
blob.stream();
})();
```

### `fs.opendir(path[, options], callback)`

<!-- YAML
Expand Down
19 changes: 19 additions & 0 deletions lib/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const {
ObjectDefineProperties,
ObjectDefineProperty,
Promise,
PromiseResolve,
ReflectApply,
SafeMap,
SafeSet,
Expand Down Expand Up @@ -62,6 +63,9 @@ const { isArrayBufferView } = require('internal/util/types');
// it's re-initialized after deserialization.

const binding = internalBinding('fs');

const { createBlobFromFilePath } = require('internal/blob');

const { Buffer } = require('buffer');
const {
aggregateTwoErrors,
Expand Down Expand Up @@ -586,6 +590,20 @@ function openSync(path, flags, mode) {
return result;
}

/**
* @param {string | Buffer | URL } path
* @returns {Promise<Blob>}
*/
function openAsBlob(path, options = kEmptyObject) {
validateObject(options, 'options');
const type = options.type || '';
validateString(type, 'options.type');
// The underlying implementation here returns the Blob synchronously for now.
// To give ourselves flexibility to maybe return the Blob asynchronously,
// this API returns a Promise.
return PromiseResolve(createBlobFromFilePath(getValidatedPath(path), { type }));
}

/**
* Reads file from the specified `fd` (file descriptor).
* @param {number} fd
Expand Down Expand Up @@ -3022,6 +3040,7 @@ module.exports = fs = {
mkdtempSync,
open,
openSync,
openAsBlob,
readdir,
readdirSync,
read,
Expand Down
158 changes: 102 additions & 56 deletions lib/internal/blob.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
'use strict';

const {
ArrayBuffer,
ArrayFrom,
MathMax,
MathMin,
ObjectDefineProperties,
ObjectDefineProperty,
PromiseResolve,
PromiseReject,
SafePromisePrototypeFinally,
PromiseResolve,
ReflectConstruct,
RegExpPrototypeExec,
RegExpPrototypeSymbolReplace,
Expand All @@ -22,7 +22,8 @@ const {

const {
createBlob: _createBlob,
FixedSizeBlobCopyJob,
createBlobFromFilePath: _createBlobFromFilePath,
concat,
getDataObject,
} = internalBinding('blob');

Expand All @@ -48,32 +49,31 @@ const {
customInspectSymbol: kInspect,
kEmptyObject,
kEnumerableProperty,
lazyDOMException,
} = require('internal/util');
const { inspect } = require('internal/util/inspect');

const {
AbortError,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_THIS,
ERR_BUFFER_TOO_LARGE,
}
},
} = require('internal/errors');

const {
isUint32,
validateDictionary,
} = require('internal/validators');

const {
CountQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');

const kHandle = Symbol('kHandle');
const kState = Symbol('kState');
const kIndex = Symbol('kIndex');
const kType = Symbol('kType');
const kLength = Symbol('kLength');
const kArrayBufferPromise = Symbol('kArrayBufferPromise');

const kMaxChunkSize = 65536;

const disallowedTypeCharacters = /[^\u{0020}-\u{007E}]/u;

Expand Down Expand Up @@ -266,40 +266,35 @@ class Blob {
if (!isBlob(this))
return PromiseReject(new ERR_INVALID_THIS('Blob'));

// If there's already a promise in flight for the content,
// reuse it, but only while it's in flight. After the cached
// promise resolves it will be cleared, allowing it to be
// garbage collected as soon as possible.
if (this[kArrayBufferPromise])
return this[kArrayBufferPromise];

const job = new FixedSizeBlobCopyJob(this[kHandle]);

const ret = job.run();

// If the job returns a value immediately, the ArrayBuffer
// was generated synchronously and should just be returned
// directly.
if (ret !== undefined)
return PromiseResolve(ret);
if (this.size === 0) {
return PromiseResolve(new ArrayBuffer(0));
}

const {
promise,
resolve,
reject,
} = createDeferredPromise();

job.ondone = (err, ab) => {
if (err !== undefined)
return reject(new AbortError(undefined, { cause: err }));
resolve(ab);
const { promise, resolve, reject } = createDeferredPromise();
const reader = this[kHandle].getReader();
const buffers = [];
const readNext = () => {
reader.pull((status, buffer) => {
if (status === 0) {
// EOS, concat & resolve
// buffer should be undefined here
resolve(concat(buffers));
return;
} else if (status < 0) {
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
reject(error);
return;
}
if (buffer !== undefined)
buffers.push(buffer);
readNext();
});
};
this[kArrayBufferPromise] =
SafePromisePrototypeFinally(
promise,
() => this[kArrayBufferPromise] = undefined);

return this[kArrayBufferPromise];
readNext();
return promise;
}

/**
Expand All @@ -321,24 +316,63 @@ class Blob {
if (!isBlob(this))
throw new ERR_INVALID_THIS('Blob');

const self = this;
if (this.size === 0) {
return new lazyReadableStream({
start(c) { c.close(); }
});
}

const reader = this[kHandle].getReader();
return new lazyReadableStream({
async start() {
this[kState] = await self.arrayBuffer();
this[kIndex] = 0;
start(c) {
// There really should only be one read at a time so using an
// array here is purely defensive.
this.pendingPulls = [];
},

pull(controller) {
if (this[kState].byteLength - this[kIndex] <= kMaxChunkSize) {
controller.enqueue(new Uint8Array(this[kState], this[kIndex]));
controller.close();
this[kState] = undefined;
} else {
controller.enqueue(new Uint8Array(this[kState], this[kIndex], kMaxChunkSize));
this[kIndex] += kMaxChunkSize;
pull(c) {
const { promise, resolve, reject } = createDeferredPromise();
this.pendingPulls.push({ resolve, reject });
reader.pull((status, buffer) => {
// If pendingPulls is empty here, the stream had to have
// been canceled, and we don't really care about the result.
// we can simply exit.
if (this.pendingPulls.length === 0) {
return;
}
const pending = this.pendingPulls.shift();
if (status === 0) {
// EOS
c.close();
pending.resolve();
return;
} else if (status < 0) {
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
const error = lazyDOMException('The blob could not be read', 'NotReadableError');

c.error(error);
pending.reject(error);
return;
}
if (buffer !== undefined) {
c.enqueue(new Uint8Array(buffer));
}
pending.resolve();
});
return promise;
},
cancel(reason) {
// Reject any currently pending pulls here.
for (const pending of this.pendingPulls) {
pending.reject(reason);
}
this.pendingPulls = [];
}
});
// We set the highWaterMark to 0 because we do not want the stream to
// start reading immediately on creation. We want it to wait until read
// is called.
}, new CountQueuingStrategy({ highWaterMark: 0 }));
}
}

Expand Down Expand Up @@ -406,10 +440,22 @@ function resolveObjectURL(url) {
}
}

// TODO(@jasnell): Now that the File class exists, we might consider having
// this return a `File` instead of a `Blob`.
function createBlobFromFilePath(path, options) {
const maybeBlob = _createBlobFromFilePath(path);
if (maybeBlob === undefined) {
return lazyDOMException('The blob could not be read', 'NotReadableError');
}
const { 0: blob, 1: length } = maybeBlob;
return createBlob(blob, length, options?.type);
}

module.exports = {
Blob,
ClonedBlob,
createBlob,
createBlobFromFilePath,
isBlob,
kHandle,
resolveObjectURL,
Expand Down
3 changes: 3 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@
'src/cleanup_queue.cc',
'src/connect_wrap.cc',
'src/connection_wrap.cc',
'src/dataqueue/queue.cc',
'src/debug_utils.cc',
'src/env.cc',
'src/fs_event_wrap.cc',
Expand Down Expand Up @@ -580,6 +581,7 @@
'src/cleanup_queue-inl.h',
'src/connect_wrap.h',
'src/connection_wrap.h',
'src/dataqueue/queue.h',
'src/debug_utils.h',
'src/debug_utils-inl.h',
'src/env_properties.h',
Expand Down Expand Up @@ -991,6 +993,7 @@
'test/cctest/test_sockaddr.cc',
'test/cctest/test_traced_value.cc',
'test/cctest/test_util.cc',
'test/cctest/test_dataqueue.cc',
],

'conditions': [
Expand Down
Loading