Skip to content

Commit

Permalink
feat(util-stream): splitStream and headStream utilities (#1336)
Browse files Browse the repository at this point in the history
* feat(util-stream): add stream splitting function

* variable naming

* DRY isReadableStream typeguard

* update type guard

* rename type guard file

* lint
  • Loading branch information
kuhe committed Jul 15, 2024
1 parent ae8bf5c commit 7cd258f
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 7 deletions.
5 changes: 5 additions & 0 deletions .changeset/metal-snakes-remember.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@smithy/util-stream": minor
---

add splitStream and headStream utilities
10 changes: 8 additions & 2 deletions packages/util-stream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,19 @@
],
"browser": {
"./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser",
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser"
"./dist-es/headStream": "./dist-es/headStream.browser",
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser",
"./dist-es/splitStream": "./dist-es/splitStream.browser"
},
"react-native": {
"./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser",
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser",
"./dist-es/headStream": "./dist-es/headStream.browser",
"./dist-es/splitStream": "./dist-es/splitStream.browser",
"./dist-cjs/getAwsChunkedEncodingStream": "./dist-cjs/getAwsChunkedEncodingStream.browser",
"./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser"
"./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser",
"./dist-cjs/headStream": "./dist-cjs/headStream.browser",
"./dist-cjs/splitStream": "./dist-cjs/splitStream.browser"
},
"homepage": "https://github.com/awslabs/smithy-typescript/tree/main/packages/util-stream",
"repository": {
Expand Down
39 changes: 39 additions & 0 deletions packages/util-stream/src/headStream.browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* @internal
* @param stream
* @param bytes - read head bytes from the stream and discard the rest of it.
*
* Caution: the input stream must be destroyed separately, this function does not do so.
*/
export async function headStream(stream: ReadableStream, bytes: number): Promise<Uint8Array> {
let byteLengthCounter = 0;
const chunks = [];
const reader = stream.getReader();
let isDone = false;

while (!isDone) {
const { done, value } = await reader.read();
if (value) {
chunks.push(value);
byteLengthCounter += value?.byteLength ?? 0;
}
if (byteLengthCounter >= bytes) {
break;
}
isDone = done;
}
reader.releaseLock();

const collected = new Uint8Array(Math.min(bytes, byteLengthCounter));
let offset = 0;
for (const chunk of chunks) {
if (chunk.byteLength > collected.byteLength - offset) {
collected.set(chunk.subarray(0, collected.byteLength - offset), offset);
break;
} else {
collected.set(chunk, offset);
}
offset += chunk.length;
}
return collected;
}
108 changes: 108 additions & 0 deletions packages/util-stream/src/headStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { Readable } from "stream";

import { headStream } from "./headStream";
import { headStream as headWebStream } from "./headStream.browser";
import { splitStream } from "./splitStream";
import { splitStream as splitWebStream } from "./splitStream.browser";

const CHUNK_SIZE = 4;
const a32 = "abcd".repeat(32_000 / CHUNK_SIZE);
const a16 = "abcd".repeat(16_000 / CHUNK_SIZE);
const a8 = "abcd".repeat(8);
const a4 = "abcd".repeat(4);
const a2 = "abcd".repeat(2);
const a1 = "abcd".repeat(1);

describe(headStream.name, () => {
it("should collect the head of a Node.js stream", async () => {
const data = Buffer.from(a32);
const myStream = Readable.from(data);

const head = await headStream(myStream, 16_000);

expect(Buffer.from(head).toString()).toEqual(a16);
});

it("should collect the head of a web stream", async () => {
if (typeof ReadableStream !== "undefined") {
const buffer = Buffer.from(a32);
const data = Array.from(new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength));

const myStream = new ReadableStream({
start(controller) {
for (const inputChunk of data) {
controller.enqueue(new Uint8Array([inputChunk]));
}
controller.close();
},
});

const head = await headWebStream(myStream, 16_000);
expect(Buffer.from(head).toString()).toEqual(a16);
}
});
});

describe("splitStream and headStream integration", () => {
it("should split and head streams for Node.js", async () => {
const data = Buffer.from(a32);
const myStream = Readable.from(data);

const [a, _1] = await splitStream(myStream);
const [b, _2] = await splitStream(_1);
const [c, _3] = await splitStream(_2);
const [d, _4] = await splitStream(_3);
const [e, f] = await splitStream(_4);

const byteArr1 = await headStream(a, Infinity);
const byteArr2 = await headStream(b, 16_000);
const byteArr3 = await headStream(c, 8 * CHUNK_SIZE);
const byteArr4 = await headStream(d, 4 * CHUNK_SIZE);
const byteArr5 = await headStream(e, 2 * CHUNK_SIZE);
const byteArr6 = await headStream(f, CHUNK_SIZE);

await Promise.all([a, b, c, d, e, f].map((stream) => stream.destroy()));

expect(Buffer.from(byteArr1).toString()).toEqual(a32);
expect(Buffer.from(byteArr2).toString()).toEqual(a16);
expect(Buffer.from(byteArr3).toString()).toEqual(a8);
expect(Buffer.from(byteArr4).toString()).toEqual(a4);
expect(Buffer.from(byteArr5).toString()).toEqual(a2);
expect(Buffer.from(byteArr6).toString()).toEqual(a1);
});

it("should split and head streams for web streams API", async () => {
if (typeof ReadableStream !== "undefined") {
const buffer = Buffer.from(a8);
const data = Array.from(new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength));

const myStream = new ReadableStream({
start(controller) {
for (let i = 0; i < data.length; i += CHUNK_SIZE) {
controller.enqueue(new Uint8Array(data.slice(i, i + CHUNK_SIZE)));
}
controller.close();
},
});

const [a, _1] = await splitWebStream(myStream);
const [b, _2] = await splitWebStream(_1);
const [c, _3] = await splitWebStream(_2);
const [d, e] = await splitWebStream(_3);

const byteArr1 = await headWebStream(a, Infinity);
const byteArr2 = await headWebStream(b, 8 * CHUNK_SIZE);
const byteArr3 = await headWebStream(c, 4 * CHUNK_SIZE);
const byteArr4 = await headWebStream(d, 2 * CHUNK_SIZE);
const byteArr5 = await headWebStream(e, CHUNK_SIZE);

await Promise.all([a, b, c, d, e].map((stream) => stream.cancel()));

expect(Buffer.from(byteArr1).toString()).toEqual(a8);
expect(Buffer.from(byteArr2).toString()).toEqual(a8);
expect(Buffer.from(byteArr3).toString()).toEqual(a4);
expect(Buffer.from(byteArr4).toString()).toEqual(a2);
expect(Buffer.from(byteArr5).toString()).toEqual(a1);
}
});
});
49 changes: 49 additions & 0 deletions packages/util-stream/src/headStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { Readable, Writable } from "stream";

import { headStream as headWebStream } from "./headStream.browser";
import { isReadableStream } from "./stream-type-check";

/**
* @internal
* @param stream
* @param bytes - read head bytes from the stream and discard the rest of it.
*
* Caution: the input stream must be destroyed separately, this function does not do so.
*/
export const headStream = (stream: Readable | ReadableStream, bytes: number): Promise<Uint8Array> => {
if (isReadableStream(stream)) {
return headWebStream(stream, bytes);
}
return new Promise((resolve, reject) => {
const collector = new Collector();
collector.limit = bytes;
stream.pipe(collector);
stream.on("error", (err) => {
collector.end();
reject(err);
});
collector.on("error", reject);
collector.on("finish", function (this: Collector) {
const bytes = new Uint8Array(Buffer.concat(this.buffers));
resolve(bytes);
});
});
};

class Collector extends Writable {
public readonly buffers: Buffer[] = [];
public limit = Infinity;
private bytesBuffered = 0;

_write(chunk: Buffer, encoding: string, callback: (err?: Error) => void) {
this.buffers.push(chunk);
this.bytesBuffered += chunk.byteLength ?? 0;
if (this.bytesBuffered >= this.limit) {
const excess = this.bytesBuffered - this.limit;
const tailBuffer = this.buffers[this.buffers.length - 1];
this.buffers[this.buffers.length - 1] = tailBuffer.subarray(0, tailBuffer.byteLength - excess);
this.emit("finish");
}
callback();
}
}
3 changes: 3 additions & 0 deletions packages/util-stream/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
export * from "./blob/Uint8ArrayBlobAdapter";
export * from "./getAwsChunkedEncodingStream";
export * from "./sdk-stream-mixin";
export * from "./splitStream";
export * from "./headStream";
export * from "./stream-type-check";
9 changes: 4 additions & 5 deletions packages/util-stream/src/sdk-stream-mixin.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { toBase64 } from "@smithy/util-base64";
import { toHex } from "@smithy/util-hex-encoding";
import { toUtf8 } from "@smithy/util-utf8";

import { isReadableStream } from "./stream-type-check";

const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transformed.";

/**
Expand All @@ -12,7 +14,7 @@ const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transfo
* @internal
*/
export const sdkStreamMixin = (stream: unknown): SdkStream<ReadableStream | Blob> => {
if (!isBlobInstance(stream) && !isReadableStreamInstance(stream)) {
if (!isBlobInstance(stream) && !isReadableStream(stream)) {
//@ts-ignore
const name = stream?.__proto__?.constructor?.name || stream;
throw new Error(`Unexpected stream implementation, expect Blob or ReadableStream, got ${name}`);
Expand Down Expand Up @@ -64,7 +66,7 @@ export const sdkStreamMixin = (stream: unknown): SdkStream<ReadableStream | Blob
if (isBlobInstance(stream)) {
// ReadableStream is undefined in React Native
return blobToWebStream(stream);
} else if (isReadableStreamInstance(stream)) {
} else if (isReadableStream(stream)) {
return stream;
} else {
throw new Error(`Cannot transform payload to web stream, got ${stream}`);
Expand All @@ -74,6 +76,3 @@ export const sdkStreamMixin = (stream: unknown): SdkStream<ReadableStream | Blob
};

const isBlobInstance = (stream: unknown): stream is Blob => typeof Blob === "function" && stream instanceof Blob;

const isReadableStreamInstance = (stream: unknown): stream is ReadableStream =>
typeof ReadableStream === "function" && stream instanceof ReadableStream;
11 changes: 11 additions & 0 deletions packages/util-stream/src/splitStream.browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* @param stream
* @returns stream split into two identical streams.
*/
export async function splitStream(stream: ReadableStream | Blob): Promise<[ReadableStream, ReadableStream]> {
if (typeof (stream as Blob).stream === "function") {
stream = (stream as Blob).stream();
}
const readableStream = stream as ReadableStream;
return readableStream.tee();
}
43 changes: 43 additions & 0 deletions packages/util-stream/src/splitStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { streamCollector as webStreamCollector } from "@smithy/fetch-http-handler";
import { streamCollector } from "@smithy/node-http-handler";
import { Readable } from "stream";

import { splitStream } from "./splitStream";
import { splitStream as splitWebStream } from "./splitStream.browser";

describe(splitStream.name, () => {
it("should split a node:Readable stream", async () => {
const data = Buffer.from("abcd");

const myStream = Readable.from(data);
const [a, b] = await splitStream(myStream);

const buffer1 = await streamCollector(a);
const buffer2 = await streamCollector(b);

expect(buffer1).toEqual(new Uint8Array([97, 98, 99, 100]));
expect(buffer1).toEqual(buffer2);
});
it("should split a web:ReadableStream stream", async () => {
if (typeof ReadableStream !== "undefined") {
const inputChunks = [97, 98, 99, 100];

const myStream = new ReadableStream({
start(controller) {
for (const inputChunk of inputChunks) {
controller.enqueue(new Uint8Array([inputChunk]));
}
controller.close();
},
});

const [a, b] = await splitWebStream(myStream);

const bytes1 = await webStreamCollector(a);
const bytes2 = await webStreamCollector(b);

expect(bytes1).toEqual(new Uint8Array([97, 98, 99, 100]));
expect(bytes1).toEqual(bytes2);
}
});
});
24 changes: 24 additions & 0 deletions packages/util-stream/src/splitStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import type { Readable } from "stream";
import { PassThrough } from "stream";

import { splitStream as splitWebStream } from "./splitStream.browser";
import { isReadableStream } from "./stream-type-check";

/**
* @param stream
* @returns stream split into two identical streams.
*/
export async function splitStream(stream: Readable): Promise<[Readable, Readable]>;
export async function splitStream(stream: ReadableStream): Promise<[ReadableStream, ReadableStream]>;
export async function splitStream(
stream: Readable | ReadableStream
): Promise<[Readable | ReadableStream, Readable | ReadableStream]> {
if (isReadableStream(stream)) {
return splitWebStream(stream);
}
const stream1 = new PassThrough();
const stream2 = new PassThrough();
stream.pipe(stream1);
stream.pipe(stream2);
return [stream1, stream2];
}
6 changes: 6 additions & 0 deletions packages/util-stream/src/stream-type-check.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/**
* @internal
*/
export const isReadableStream = (stream: unknown): stream is ReadableStream =>
typeof ReadableStream === "function" &&
(stream?.constructor?.name === ReadableStream.name || stream instanceof ReadableStream);

0 comments on commit 7cd258f

Please sign in to comment.