Skip to content

Commit

Permalink
feat(stream-extra): add an error handler to WrapReadableStream
Browse files Browse the repository at this point in the history
  • Loading branch information
yume-chan committed Nov 28, 2024
1 parent 3e28019 commit 6ae5f38
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
30 changes: 18 additions & 12 deletions libraries/stream-extra/src/wrap-readable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ export type WrapReadableStreamStart<T> = (

export interface ReadableStreamWrapper<T> {
start: WrapReadableStreamStart<T>;
cancel?(reason?: unknown): MaybePromiseLike<void>;
close?(): MaybePromiseLike<void>;
cancel?: (reason?: unknown) => MaybePromiseLike<void>;
close?: () => MaybePromiseLike<void>;
error?: (reason?: unknown) => MaybePromiseLike<void>;
}

function getWrappedReadableStream<T>(
Expand Down Expand Up @@ -57,27 +58,32 @@ export class WrapReadableStream<T> extends ReadableStream<T> {
super(
{
start: async (controller) => {
// `start` is invoked before `ReadableStream`'s constructor finish,
// so using `this` synchronously causes
// "Must call super constructor in derived class before accessing 'this' or returning from derived constructor".
// Queue a microtask to avoid this.
await Promise.resolve();

this.readable = await getWrappedReadableStream(
const readable = await getWrappedReadableStream(
wrapper,
controller,
);
// `start` is called in `super()`, so can't use `this` synchronously.
// but it's fine after the first `await`
this.readable = readable;
this.#reader = this.readable.getReader();
},
pull: async (controller) => {
const result = await this.#reader.read();
if (result.done) {
const { done, value } = await this.#reader
.read()
.catch((e) => {
if ("error" in wrapper) {
wrapper.error(e);
}
throw e;
});

if (done) {
controller.close();
if ("close" in wrapper) {
await wrapper.close?.();
}
} else {
controller.enqueue(result.value);
controller.enqueue(value);
}
},
cancel: async (reason) => {
Expand Down
11 changes: 4 additions & 7 deletions libraries/stream-extra/src/wrap-writable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,10 @@ export class WrapWritableStream<T> extends WritableStream<T> {
) {
super({
start: async () => {
// `start` is invoked before `ReadableStream`'s constructor finish,
// so using `this` synchronously causes
// "Must call super constructor in derived class before accessing 'this' or returning from derived constructor".
// Queue a microtask to avoid this.
await Promise.resolve();

this.writable = await getWrappedWritableStream(start);
const writable = await getWrappedWritableStream(start);
// `start` is called in `super()`, so can't use `this` synchronously.
// but it's fine after the first `await`
this.writable = writable;
this.#writer = this.writable.getWriter();
},
write: async (chunk) => {
Expand Down

0 comments on commit 6ae5f38

Please sign in to comment.