Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(util-stream): splitStream and headStream utilities #1336

Merged
merged 6 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/metal-snakes-remember.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@smithy/util-stream": minor
---

add splitStream and headStream utilities
10 changes: 8 additions & 2 deletions packages/util-stream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,19 @@
],
"browser": {
"./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser",
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser"
"./dist-es/headStream": "./dist-es/headStream.browser",
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser",
"./dist-es/splitStream": "./dist-es/splitStream.browser"
},
"react-native": {
"./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser",
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser",
"./dist-es/headStream": "./dist-es/headStream.browser",
"./dist-es/splitStream": "./dist-es/splitStream.browser",
"./dist-cjs/getAwsChunkedEncodingStream": "./dist-cjs/getAwsChunkedEncodingStream.browser",
"./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser"
"./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser",
"./dist-cjs/headStream": "./dist-cjs/headStream.browser",
"./dist-cjs/splitStream": "./dist-cjs/splitStream.browser"
},
"homepage": "https://github.com/awslabs/smithy-typescript/tree/main/packages/util-stream",
"repository": {
Expand Down
39 changes: 39 additions & 0 deletions packages/util-stream/src/headStream.browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* @internal
* @param stream
* @param bytes - read head bytes from the stream and discard the rest of it.
*
* Caution: the input stream must be destroyed separately, this function does not do so.
*/
export async function headStream(stream: ReadableStream, bytes: number): Promise<Uint8Array> {
let byteLengthCounter = 0;
const chunks = [];
const reader = stream.getReader();
let isDone = false;

while (!isDone) {
const { done, value } = await reader.read();
if (value) {
chunks.push(value);
byteLengthCounter += value?.byteLength ?? 0;
}
if (byteLengthCounter >= bytes) {
break;
}
isDone = done;
}
reader.releaseLock();

const collected = new Uint8Array(Math.min(bytes, byteLengthCounter));
let offset = 0;
for (const chunk of chunks) {
if (chunk.byteLength > collected.byteLength - offset) {
collected.set(chunk.subarray(0, collected.byteLength - offset), offset);
break;
} else {
collected.set(chunk, offset);
}
offset += chunk.length;
}
return collected;
}
108 changes: 108 additions & 0 deletions packages/util-stream/src/headStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { Readable } from "stream";

import { headStream } from "./headStream";
import { headStream as headWebStream } from "./headStream.browser";
import { splitStream } from "./splitStream";
import { splitStream as splitWebStream } from "./splitStream.browser";

const CHUNK_SIZE = 4;
const a32 = "abcd".repeat(32_000 / CHUNK_SIZE);
const a16 = "abcd".repeat(16_000 / CHUNK_SIZE);
const a8 = "abcd".repeat(8);
const a4 = "abcd".repeat(4);
const a2 = "abcd".repeat(2);
const a1 = "abcd".repeat(1);

describe(headStream.name, () => {
it("should collect the head of a Node.js stream", async () => {
const data = Buffer.from(a32);
const myStream = Readable.from(data);

const head = await headStream(myStream, 16_000);

expect(Buffer.from(head).toString()).toEqual(a16);
});

it("should collect the head of a web stream", async () => {
if (typeof ReadableStream !== "undefined") {
const buffer = Buffer.from(a32);
const data = Array.from(new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength));

const myStream = new ReadableStream({
start(controller) {
for (const inputChunk of data) {
controller.enqueue(new Uint8Array([inputChunk]));
}
controller.close();
},
});

const head = await headWebStream(myStream, 16_000);
expect(Buffer.from(head).toString()).toEqual(a16);
}
});
});

describe("splitStream and headStream integration", () => {
it("should split and head streams for Node.js", async () => {
const data = Buffer.from(a32);
const myStream = Readable.from(data);

const [a, _1] = await splitStream(myStream);
const [b, _2] = await splitStream(_1);
const [c, _3] = await splitStream(_2);
const [d, _4] = await splitStream(_3);
const [e, f] = await splitStream(_4);

const byteArr1 = await headStream(a, Infinity);
const byteArr2 = await headStream(b, 16_000);
const byteArr3 = await headStream(c, 8 * CHUNK_SIZE);
const byteArr4 = await headStream(d, 4 * CHUNK_SIZE);
const byteArr5 = await headStream(e, 2 * CHUNK_SIZE);
const byteArr6 = await headStream(f, CHUNK_SIZE);

await Promise.all([a, b, c, d, e, f].map((stream) => stream.destroy()));

expect(Buffer.from(byteArr1).toString()).toEqual(a32);
expect(Buffer.from(byteArr2).toString()).toEqual(a16);
expect(Buffer.from(byteArr3).toString()).toEqual(a8);
expect(Buffer.from(byteArr4).toString()).toEqual(a4);
expect(Buffer.from(byteArr5).toString()).toEqual(a2);
expect(Buffer.from(byteArr6).toString()).toEqual(a1);
});

it("should split and head streams for web streams API", async () => {
if (typeof ReadableStream !== "undefined") {
const buffer = Buffer.from(a8);
const data = Array.from(new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength));

const myStream = new ReadableStream({
start(controller) {
for (let i = 0; i < data.length; i += CHUNK_SIZE) {
controller.enqueue(new Uint8Array(data.slice(i, i + CHUNK_SIZE)));
}
controller.close();
},
});

const [a, _1] = await splitWebStream(myStream);
const [b, _2] = await splitWebStream(_1);
const [c, _3] = await splitWebStream(_2);
const [d, e] = await splitWebStream(_3);

const byteArr1 = await headWebStream(a, Infinity);
const byteArr2 = await headWebStream(b, 8 * CHUNK_SIZE);
const byteArr3 = await headWebStream(c, 4 * CHUNK_SIZE);
const byteArr4 = await headWebStream(d, 2 * CHUNK_SIZE);
const byteArr5 = await headWebStream(e, CHUNK_SIZE);

await Promise.all([a, b, c, d, e].map((stream) => stream.cancel()));

expect(Buffer.from(byteArr1).toString()).toEqual(a8);
expect(Buffer.from(byteArr2).toString()).toEqual(a8);
expect(Buffer.from(byteArr3).toString()).toEqual(a4);
expect(Buffer.from(byteArr4).toString()).toEqual(a2);
expect(Buffer.from(byteArr5).toString()).toEqual(a1);
}
});
});
49 changes: 49 additions & 0 deletions packages/util-stream/src/headStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { Readable, Writable } from "stream";

import { headStream as headWebStream } from "./headStream.browser";
import { isReadableStream } from "./stream-type-check";

/**
* @internal
* @param stream
* @param bytes - read head bytes from the stream and discard the rest of it.
*
* Caution: the input stream must be destroyed separately, this function does not do so.
*/
export const headStream = (stream: Readable | ReadableStream, bytes: number): Promise<Uint8Array> => {
if (isReadableStream(stream)) {
return headWebStream(stream, bytes);
}
return new Promise((resolve, reject) => {
const collector = new Collector();
collector.limit = bytes;
stream.pipe(collector);
stream.on("error", (err) => {
collector.end();
reject(err);
});
collector.on("error", reject);
collector.on("finish", function (this: Collector) {
const bytes = new Uint8Array(Buffer.concat(this.buffers));
resolve(bytes);
});
});
};

class Collector extends Writable {
public readonly buffers: Buffer[] = [];
public limit = Infinity;
private bytesBuffered = 0;

_write(chunk: Buffer, encoding: string, callback: (err?: Error) => void) {
this.buffers.push(chunk);
this.bytesBuffered += chunk.byteLength ?? 0;
if (this.bytesBuffered >= this.limit) {
const excess = this.bytesBuffered - this.limit;
const tailBuffer = this.buffers[this.buffers.length - 1];
this.buffers[this.buffers.length - 1] = tailBuffer.subarray(0, tailBuffer.byteLength - excess);
this.emit("finish");
}
callback();
}
}
3 changes: 3 additions & 0 deletions packages/util-stream/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
export * from "./blob/Uint8ArrayBlobAdapter";
export * from "./getAwsChunkedEncodingStream";
export * from "./sdk-stream-mixin";
export * from "./splitStream";
export * from "./headStream";
export * from "./stream-type-check";
9 changes: 4 additions & 5 deletions packages/util-stream/src/sdk-stream-mixin.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { toBase64 } from "@smithy/util-base64";
import { toHex } from "@smithy/util-hex-encoding";
import { toUtf8 } from "@smithy/util-utf8";

import { isReadableStream } from "./stream-type-check";

const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transformed.";

/**
Expand All @@ -12,7 +14,7 @@ const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transfo
* @internal
*/
export const sdkStreamMixin = (stream: unknown): SdkStream<ReadableStream | Blob> => {
if (!isBlobInstance(stream) && !isReadableStreamInstance(stream)) {
if (!isBlobInstance(stream) && !isReadableStream(stream)) {
//@ts-ignore
const name = stream?.__proto__?.constructor?.name || stream;
throw new Error(`Unexpected stream implementation, expect Blob or ReadableStream, got ${name}`);
Expand Down Expand Up @@ -64,7 +66,7 @@ export const sdkStreamMixin = (stream: unknown): SdkStream<ReadableStream | Blob
if (isBlobInstance(stream)) {
// ReadableStream is undefined in React Native
return blobToWebStream(stream);
} else if (isReadableStreamInstance(stream)) {
} else if (isReadableStream(stream)) {
return stream;
} else {
throw new Error(`Cannot transform payload to web stream, got ${stream}`);
Expand All @@ -74,6 +76,3 @@ export const sdkStreamMixin = (stream: unknown): SdkStream<ReadableStream | Blob
};

const isBlobInstance = (stream: unknown): stream is Blob => typeof Blob === "function" && stream instanceof Blob;

const isReadableStreamInstance = (stream: unknown): stream is ReadableStream =>
typeof ReadableStream === "function" && stream instanceof ReadableStream;
11 changes: 11 additions & 0 deletions packages/util-stream/src/splitStream.browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* @param stream
* @returns stream split into two identical streams.
*/
export async function splitStream(stream: ReadableStream | Blob): Promise<[ReadableStream, ReadableStream]> {
if (typeof (stream as Blob).stream === "function") {
stream = (stream as Blob).stream();
}
const readableStream = stream as ReadableStream;
return readableStream.tee();
}
43 changes: 43 additions & 0 deletions packages/util-stream/src/splitStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { streamCollector as webStreamCollector } from "@smithy/fetch-http-handler";
import { streamCollector } from "@smithy/node-http-handler";
import { Readable } from "stream";

import { splitStream } from "./splitStream";
import { splitStream as splitWebStream } from "./splitStream.browser";

describe(splitStream.name, () => {
it("should split a node:Readable stream", async () => {
const data = Buffer.from("abcd");

const myStream = Readable.from(data);
const [a, b] = await splitStream(myStream);

const buffer1 = await streamCollector(a);
const buffer2 = await streamCollector(b);

expect(buffer1).toEqual(new Uint8Array([97, 98, 99, 100]));
expect(buffer1).toEqual(buffer2);
});
it("should split a web:ReadableStream stream", async () => {
if (typeof ReadableStream !== "undefined") {
const inputChunks = [97, 98, 99, 100];

const myStream = new ReadableStream({
start(controller) {
for (const inputChunk of inputChunks) {
controller.enqueue(new Uint8Array([inputChunk]));
}
controller.close();
},
});

const [a, b] = await splitWebStream(myStream);

const bytes1 = await webStreamCollector(a);
const bytes2 = await webStreamCollector(b);

expect(bytes1).toEqual(new Uint8Array([97, 98, 99, 100]));
expect(bytes1).toEqual(bytes2);
}
});
});
24 changes: 24 additions & 0 deletions packages/util-stream/src/splitStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import type { Readable } from "stream";
import { PassThrough } from "stream";

import { splitStream as splitWebStream } from "./splitStream.browser";
import { isReadableStream } from "./stream-type-check";

/**
* @param stream
* @returns stream split into two identical streams.
*/
export async function splitStream(stream: Readable): Promise<[Readable, Readable]>;
export async function splitStream(stream: ReadableStream): Promise<[ReadableStream, ReadableStream]>;
export async function splitStream(
stream: Readable | ReadableStream
): Promise<[Readable | ReadableStream, Readable | ReadableStream]> {
if (isReadableStream(stream)) {
return splitWebStream(stream);
}
const stream1 = new PassThrough();
const stream2 = new PassThrough();
stream.pipe(stream1);
stream.pipe(stream2);
return [stream1, stream2];
}
6 changes: 6 additions & 0 deletions packages/util-stream/src/stream-type-check.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/**
* @internal
*/
export const isReadableStream = (stream: unknown): stream is ReadableStream =>
typeof ReadableStream === "function" &&
(stream?.constructor?.name === ReadableStream.name || stream instanceof ReadableStream);
Loading