Skip to content

Commit

Permalink
buffer: add Blob.prototype.stream method and other cleanups
Browse files Browse the repository at this point in the history
Adds the `stream()` method to get a `ReadableStream` for the `Blob`.
Also makes some other improvements to get the implementation closer
to the API standard definition.

Signed-off-by: James M Snell <jasnell@gmail.com>
  • Loading branch information
jasnell committed Aug 7, 2021
1 parent c524107 commit 7eb128a
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 31 deletions.
13 changes: 11 additions & 2 deletions doc/api/buffer.md
Original file line number Diff line number Diff line change
Expand Up @@ -507,15 +507,24 @@ added: v15.7.0
Creates and returns a new `Blob` containing a subset of this `Blob` objects
data. The original `Blob` is not altered.

### `blob.stream()`
<!-- YAML
added: REPLACEME
-->

* Returns: {ReadableStream}

Returns a new `ReadableStream` that allows the content of the `Blob` to be read.

### `blob.text()`
<!-- YAML
added: v15.7.0
-->

* Returns: {Promise}

Returns a promise that resolves the contents of the `Blob` decoded as a UTF-8
string.
Returns a promise that fulfills with the contents of the `Blob` decoded as a
UTF-8 string.

### `blob.type`
<!-- YAML
Expand Down
145 changes: 116 additions & 29 deletions lib/internal/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ const {
MathMax,
MathMin,
ObjectDefineProperty,
ObjectSetPrototypeOf,
PromiseResolve,
PromiseReject,
PromisePrototypeFinally,
ReflectConstruct,
RegExpPrototypeTest,
StringPrototypeToLowerCase,
Symbol,
Expand All @@ -16,14 +18,14 @@ const {
} = primordials;

const {
createBlob,
createBlob: _createBlob,
FixedSizeBlobCopyJob,
} = internalBinding('buffer');

const { TextDecoder } = require('internal/encoding');

const {
JSTransferable,
makeTransferable,
kClone,
kDeserialize,
} = require('internal/worker/js_transferable');
Expand All @@ -44,6 +46,7 @@ const {
AbortError,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_THIS,
ERR_BUFFER_TOO_LARGE,
}
} = require('internal/errors');
Expand All @@ -56,17 +59,27 @@ const {
const kHandle = Symbol('kHandle');
const kType = Symbol('kType');
const kLength = Symbol('kLength');
const kArrayBufferPromise = Symbol('kArrayBufferPromise');

const disallowedTypeCharacters = /[^\u{0020}-\u{007E}]/u;

let Buffer;
let ReadableStream;

function lazyBuffer() {
if (Buffer === undefined)
Buffer = require('buffer').Buffer;
return Buffer;
}

function lazyReadableStream(options) {
if (ReadableStream === undefined) {
ReadableStream =
require('internal/webstreams/readablestream').ReadableStream;
}
return new ReadableStream(options);
}

function isBlob(object) {
return object?.[kHandle] !== undefined;
}
Expand All @@ -89,16 +102,7 @@ function getSource(source, encoding) {
return [source.byteLength, source];
}

class InternalBlob extends JSTransferable {
constructor(handle, length, type = '') {
super();
this[kHandle] = handle;
this[kType] = type;
this[kLength] = length;
}
}

class Blob extends JSTransferable {
class Blob {
constructor(sources = [], options = {}) {
emitExperimentalWarning('buffer.Blob');
if (sources === null ||
Expand All @@ -120,13 +124,15 @@ class Blob extends JSTransferable {
if (!isUint32(length))
throw new ERR_BUFFER_TOO_LARGE(0xFFFFFFFF);

super();
this[kHandle] = createBlob(sources_, length);
this[kHandle] = _createBlob(sources_, length);
this[kLength] = length;

type = `${type}`;
this[kType] = RegExpPrototypeTest(disallowedTypeCharacters, type) ?
'' : StringPrototypeToLowerCase(type);

// eslint-disable-next-line no-constructor-return
return makeTransferable(this);
}

[kInspect](depth, options) {
Expand All @@ -150,7 +156,7 @@ class Blob extends JSTransferable {
const length = this[kLength];
return {
data: { handle, type, length },
deserializeInfo: 'internal/blob:InternalBlob'
deserializeInfo: 'internal/blob:ClonedBlob'
};
}

Expand All @@ -160,11 +166,35 @@ class Blob extends JSTransferable {
this[kLength] = length;
}

get type() { return this[kType]; }
/**
* @readonly
* @type {string}
*/
get type() {
if (!isBlob(this))
throw new ERR_INVALID_THIS('Blob');
return this[kType];
}

get size() { return this[kLength]; }
/**
* @readonly
* @type {number}
*/
get size() {
if (!isBlob(this))
throw new ERR_INVALID_THIS('Blob');
return this[kLength];
}

/**
* @param {number} [start]
* @param {number} [end]
* @param {string} [contentType]
* @returns {Blob}
*/
slice(start = 0, end = this[kLength], contentType = '') {
if (!isBlob(this))
throw new ERR_INVALID_THIS('Blob');
if (start < 0) {
start = MathMax(this[kLength] + start, 0);
} else {
Expand All @@ -188,49 +218,106 @@ class Blob extends JSTransferable {

const span = MathMax(end - start, 0);

return new InternalBlob(
this[kHandle].slice(start, start + span), span, contentType);
return createBlob(
this[kHandle].slice(start, start + span),
span,
contentType);
}

async arrayBuffer() {
/**
* @returns {Promise<ArrayBuffer>}
*/
arrayBuffer() {
if (!isBlob(this))
return PromiseReject(new ERR_INVALID_THIS('Blob'));

// If there's already a promise in flight for the content,
// reuse it, but only once. After the cached promise resolves
// it will be cleared, allowing it to be garbage collected
// as soon as possible.
if (this[kArrayBufferPromise])
return this[kArrayBufferPromise];

const job = new FixedSizeBlobCopyJob(this[kHandle]);

const ret = job.run();

// If the job returns a value immediately, the ArrayBuffer
// was generated synchronously and should just be returned
// directly.
if (ret !== undefined)
return PromiseResolve(ret);

const {
promise,
resolve,
reject
reject,
} = createDeferredPromise();

job.ondone = (err, ab) => {
if (err !== undefined)
return reject(new AbortError());
resolve(ab);
};
this[kArrayBufferPromise] =
PromisePrototypeFinally(
promise,
() => this[kArrayBufferPromise] = undefined);

return promise;
return this[kArrayBufferPromise];
}

/**
*
* @returns {Promise<string>}
*/
async text() {
if (!isBlob(this))
throw new ERR_INVALID_THIS('Blob');

const dec = new TextDecoder();
return dec.decode(await this.arrayBuffer());
}

/**
* @returns {ReadableStream}
*/
stream() {
if (!isBlob(this))
throw new ERR_INVALID_THIS('Blob');

const self = this;
return new lazyReadableStream({
async start(controller) {
const ab = await self.arrayBuffer();
controller.enqueue(new Uint8Array(ab));
controller.close();
}
});
}
}

function ClonedBlob() {
return makeTransferable(ReflectConstruct(function() {}, [], Blob));
}
ClonedBlob.prototype[kDeserialize] = () => {};

function createBlob(handle, length, type = '') {
return makeTransferable(ReflectConstruct(function() {
this[kHandle] = handle;
this[kType] = type;
this[kLength] = length;
}, [], Blob));
}

ObjectDefineProperty(Blob.prototype, SymbolToStringTag, {
configurable: true,
value: 'Blob',
});

InternalBlob.prototype.constructor = Blob;
ObjectSetPrototypeOf(
InternalBlob.prototype,
Blob.prototype);

module.exports = {
Blob,
InternalBlob,
ClonedBlob,
createBlob,
isBlob,
};
18 changes: 18 additions & 0 deletions test/parallel/test-blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,21 @@ assert.throws(() => new Blob({}), {
'Blob { size: 0, type: \'\' }');
assert.strictEqual(inspect(b, { depth: -1 }), '[Blob]');
}

{
// The Blob has to be over a specific size for the data to
// be copied asynchronously..
const b = new Blob(['hello', 'there'.repeat(820)]);
assert.strictEqual(b.arrayBuffer(), b.arrayBuffer());
b.arrayBuffer().then(common.mustCall());
}

(async () => {
const b = new Blob(['hello']);
const reader = b.stream().getReader();
let res = await reader.read();
assert.strictEqual(res.value.byteLength, 5);
assert(!res.done);
res = await reader.read();
assert(res.done);
})().then(common.mustCall());

0 comments on commit 7eb128a

Please sign in to comment.