Skip to content

Commit

Permalink
perf(experimental/web_streams_connection): use `ReadableStreamBYOBRea…
Browse files Browse the repository at this point in the history
…der` (#414)
  • Loading branch information
uki00a authored Dec 19, 2023
1 parent c49df9b commit bef3f38
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 36 deletions.
45 changes: 33 additions & 12 deletions internal/buffered_readable_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ const LF = "\n".charCodeAt(0);
* {@link https://github.com/denoland/deno_std/blob/0.204.0/io/buf_reader.ts}
*/
export class BufferedReadableStream {
#reader: ReadableStreamDefaultReader<Uint8Array>;
#reader: ReadableStreamBYOBReader;
#buffer: Uint8Array;
constructor(readable: ReadableStream<Uint8Array>) {
// TODO: This class could probably be optimized with a BYOB reader.
this.#reader = readable.getReader();
this.#reader = readable.getReader({ mode: "byob" });
this.#buffer = new Uint8Array(0);
}

Expand All @@ -27,16 +26,38 @@ export class BufferedReadableStream {
}
}

async readFull(buffer: Uint8Array): Promise<void> {
if (buffer.length <= this.#buffer.length) {
buffer.set(this.#consume(buffer.length));
return;
async readN(n: number): Promise<Uint8Array> {
if (n <= this.#buffer.length) {
return this.#consume(n);
}
for (;;) {
await this.#fill();
if (this.#buffer.length >= buffer.length) break;

if (n === 0) {
return new Uint8Array(0);
}

if (this.#buffer.length === 0) {
const buffer = new Uint8Array(n);
const { done, value } = await this.#reader.read(buffer, {
min: buffer.length,
});
if (done) {
throw new Deno.errors.BadResource();
}
return value;
} else {
const remaining = n - this.#buffer.length;
const buffer = new Uint8Array(remaining);
const { value, done } = await this.#reader.read(buffer, {
min: remaining,
});
if (done) {
throw new Deno.errors.BadResource();
}

const result = concateBytes(this.#buffer, value);
this.#buffer = new Uint8Array();
return result;
}
return this.readFull(buffer);
}

#consume(n: number): Uint8Array {
Expand All @@ -46,7 +67,7 @@ export class BufferedReadableStream {
}

async #fill() {
const chunk = await this.#reader.read();
const chunk = await this.#reader.read(new Uint8Array(1024));
if (chunk.done) {
throw new Deno.errors.BadResource();
}
Expand Down
40 changes: 18 additions & 22 deletions internal/buffered_readable_stream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,70 +22,65 @@ Deno.test({
await assertRejects(() => buffered.readLine(), Deno.errors.BadResource);
});

await t.step("readFull", async () => {
await t.step("readN", async () => {
const readable = createReadableStreamFromString(
"$12\r\nhello_world!\r\n",
);
const buffered = new BufferedReadableStream(readable);

await buffered.readFull(new Uint8Array(0));
await buffered.readN(0);
assertEquals(decoder.decode(await buffered.readLine()), "$12\r\n");

{
const buf = new Uint8Array(5);
await buffered.readFull(buf);
const buf = await buffered.readN(5);
assertEquals(decoder.decode(buf), "hello");
}

await buffered.readFull(new Uint8Array(0));
await buffered.readN(0);

{
const buf = new Uint8Array(7);
await buffered.readFull(buf);
const buf = await buffered.readN(7);
assertEquals(decoder.decode(buf), "_world!");
}

await buffered.readFull(new Uint8Array(0));
await buffered.readN(0);

{
const buf = new Uint8Array(2);
await buffered.readFull(buf);
const buf = await buffered.readN(2);
assertEquals(decoder.decode(buf), "\r\n");
}

await buffered.readFull(new Uint8Array(0));
await buffered.readN(0);
await assertRejects(
() => buffered.readFull(new Uint8Array(1)),
() => buffered.readN(1),
Deno.errors.BadResource,
);
});

await t.step(
"`readFull` should not throw `RangeError: offset is out of bounds` error",
"`readN` should not throw `RangeError: offset is out of bounds` error",
async () => {
const readable = new ReadableStream<Uint8Array>({
const readable = new ReadableStream({
type: "bytes",
start(controller) {
controller.enqueue(encoder.encode("foobar"));
controller.close();
},
});
const buffered = new BufferedReadableStream(readable);
{
const buf = new Uint8Array(3);
await buffered.readFull(buf);
const buf = await buffered.readN(3);
assertEquals(decoder.decode(buf), "foo");
}

{
const buf = new Uint8Array(1);
await buffered.readFull(buf);
const buf = await buffered.readN(1);
assertEquals(decoder.decode(buf), "b");
}

await buffered.readFull(new Uint8Array(0));
await buffered.readN(0);
{
const buf = new Uint8Array(2);
await buffered.readFull(buf);
const buf = await buffered.readN(2);
assertEquals(decoder.decode(buf), "ar");
}
},
Expand All @@ -96,7 +91,8 @@ Deno.test({
function createReadableStreamFromString(s: string): ReadableStream<Uint8Array> {
const encoder = new TextEncoder();
let numRead = 0;
return new ReadableStream<Uint8Array>({
return new ReadableStream({
type: "bytes",
pull(controller) {
controller.enqueue(encoder.encode(s[numRead]));
numRead++;
Expand Down
3 changes: 1 addition & 2 deletions protocol/web_streams/reply.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ export async function readReply(
// nil bulk reply
return null;
}
const buf = new Uint8Array(size + 2);
await readable.readFull(buf);
const buf = await readable.readN(size + 2);
const body = buf.subarray(0, size); // Strip CR and LF.
return returnUint8Arrays ? body : decoder.decode(body);
}
Expand Down

0 comments on commit bef3f38

Please sign in to comment.