From ceae18f30d7422b5b50d7ceb858ce38ab5aab81b Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Mon, 11 Mar 2024 08:19:09 +0100 Subject: [PATCH] fix: fix types and linting (#22) --- src/duplex.ts | 6 +++--- src/sink.ts | 47 +++++++++++++++++++++++++++--------------- src/source.ts | 4 ++-- src/transform.ts | 2 +- test/duplex.spec.ts | 8 +++---- test/helpers/random.ts | 2 +- test/sink.spec.ts | 20 +++++++++--------- test/source.spec.ts | 6 +++--- test/transform.spec.ts | 11 ++++++---- 9 files changed, 61 insertions(+), 45 deletions(-) diff --git a/src/duplex.ts b/src/duplex.ts index cebc471..6f0edeb 100644 --- a/src/duplex.ts +++ b/src/duplex.ts @@ -1,13 +1,13 @@ import { sink } from './sink.js' import { source } from './source.js' -import type { Duplex } from 'node:stream' import type { Source, Duplex as ItDuplex } from 'it-stream-types' +import type { Duplex } from 'node:stream' /** - * Convert a Node.js [`Duplex`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_duplex_and_transform_streams) + * Convert a Node.js [`Duplex`](https://nodejs.org/dist/latest/docs/api/stream.html#class-streamduplex) * stream to a [duplex iterable](https://achingbrain.github.io/it-stream-types/interfaces/Duplex.html). */ -export function duplex (duplex: Duplex): ItDuplex, Source, Promise> { +export function duplex (duplex: Duplex): ItDuplex, Source, Promise> { return { sink: sink(duplex), source: source(duplex) diff --git a/src/sink.ts b/src/sink.ts index 2861caa..69aa958 100644 --- a/src/sink.ts +++ b/src/sink.ts @@ -2,15 +2,14 @@ import type { Sink, Source } from 'it-stream-types' import type { Writable } from 'node:stream' /** - * Convert a Node.js [`Writable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_writable_streams) + * Convert a Node.js [`Writable`](https://nodejs.org/dist/latest/docs/api/stream.html#class-streamwritable) * stream to a [sink](https://achingbrain.github.io/it-stream-types/interfaces/Sink.html). */ export function sink (writable: Writable): Sink, Promise> { return async (source: Source): Promise => { - const maybeEndSource = () => { + const maybeEndSource = async (): Promise => { if (isAsyncGenerator(source)) { - // @ts-expect-error return method expects an argument - return source.return() + await source.return(undefined) } } @@ -18,31 +17,40 @@ export function sink (writable: Writable): Sink, Promise> { let errCb: ((err: Error) => void) | undefined const errorHandler = (err: Error): void => { error = err - errCb?.(err) + // When the writable errors, try to end the source to exit iteration early maybeEndSource() + .catch(err => { + err = new AggregateError([ + error, + err + ], 'The Writable emitted an error, additionally an error occurred while ending the Source') + }) + .finally(() => { + errCb?.(err) + }) } let closeCb: (() => void) | undefined let closed = false - const closeHandler = () => { + const closeHandler = (): void => { closed = true closeCb?.() } let finishCb: (() => void) | undefined let finished = false - const finishHandler = () => { + const finishHandler = (): void => { finished = true finishCb?.() } let drainCb: (() => void) | undefined - const drainHandler = () => { + const drainHandler = (): void => { drainCb?.() } - const waitForDrainOrClose = () => { + const waitForDrainOrClose = async (): Promise => { return new Promise((resolve, reject) => { closeCb = drainCb = resolve errCb = reject @@ -51,17 +59,22 @@ export function sink (writable: Writable): Sink, Promise> { }) } - const waitForDone = () => { + const waitForDone = async (): Promise => { // Immediately try to end the source - maybeEndSource() + await maybeEndSource() + return new Promise((resolve, reject) => { - if (closed || finished || error) return resolve() + if (closed || finished || (error != null)) { + resolve() + return + } + finishCb = closeCb = resolve errCb = reject }) } - const cleanup = () => { + const cleanup = (): void => { writable.removeListener('error', errorHandler) writable.removeListener('close', closeHandler) writable.removeListener('finish', finishHandler) @@ -74,18 +87,18 @@ export function sink (writable: Writable): Sink, Promise> { try { for await (const value of source) { - if (!writable.writable || writable.destroyed || error) { + if (!writable.writable || writable.destroyed || (error != null)) { break } - if (writable.write(value as any) === false) { + if (!writable.write(value as any)) { await waitForDrainOrClose() } } } catch (err: any) { // error is set by stream error handler so only destroy stream if source // threw - if (!error) { + if (error == null) { writable.destroy(err) } @@ -104,7 +117,7 @@ export function sink (writable: Writable): Sink, Promise> { await waitForDone() // Notify the user an error occurred - if (error) throw error + if (error != null) throw error } finally { // Clean up listeners cleanup() diff --git a/src/source.ts b/src/source.ts index b1f2912..659d5a3 100644 --- a/src/source.ts +++ b/src/source.ts @@ -1,11 +1,11 @@ import type { Readable } from 'node:stream' /** - * Convert a Node.js [`Readable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_readable_streams) + * Convert a Node.js [`Readable`](https://nodejs.org/dist/latest/docs/api/stream.html#class-streamreadable) * stream or a browser [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) * to an [iterable source](https://achingbrain.github.io/it-stream-types/types/Source.html). */ -export function source (readable: Readable | ReadableStream): AsyncGenerator { +export function source (readable: Readable | ReadableStream): AsyncGenerator { // Browser ReadableStream if (isReadableStream(readable)) { return (async function * () { diff --git a/src/transform.ts b/src/transform.ts index 6d35cf3..2a4063c 100644 --- a/src/transform.ts +++ b/src/transform.ts @@ -3,7 +3,7 @@ import type { Source, Transform } from 'it-stream-types' import type { Duplex } from 'node:stream' /** - * Convert a [`Transform`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_duplex_and_transform_streams) + * Convert a [`Transform`](https://nodejs.org/dist/latest/docs/api/stream.html#class-streamtransform) * stream to an [iterable transform](https://achingbrain.github.io/it-stream-types/interfaces/Transform.html). */ export function transform (transform: Duplex): Transform, AsyncGenerator> { diff --git a/test/duplex.spec.ts b/test/duplex.spec.ts index 827c846..380a230 100644 --- a/test/duplex.spec.ts +++ b/test/duplex.spec.ts @@ -1,7 +1,7 @@ -import { expect } from 'aegir/chai' import { Duplex } from 'node:stream' -import { pipe } from 'it-pipe' +import { expect } from 'aegir/chai' import all from 'it-all' +import { pipe } from 'it-pipe' import Fifo from 'p-fifo' import * as toIterable from '../src/index.js' import { randomInt, randomBytes } from './helpers/random.js' @@ -16,10 +16,10 @@ describe('duplex', () => { toIterable.duplex(new Duplex({ objectMode: true, write (chunk, enc, cb) { - fifo.push(chunk).then(() => cb()) + void fifo.push(chunk).then(() => { cb() }) }, final (cb) { - fifo.push(null).then(() => cb()) + void fifo.push(null).then(() => { cb() }) }, async read (size) { while (true) { diff --git a/test/helpers/random.ts b/test/helpers/random.ts index 907dc70..f8ffe78 100644 --- a/test/helpers/random.ts +++ b/test/helpers/random.ts @@ -9,6 +9,6 @@ export function randomInt (min: number, max: number): number { return Math.floor(Math.random() * (max - min)) + min } -export function randomBytes (min: number, max: number) { +export function randomBytes (min: number, max: number): Uint8Array { return Crypto.randomBytes(randomInt(min, max)) } diff --git a/test/sink.spec.ts b/test/sink.spec.ts index ef723e0..ec6d3b1 100644 --- a/test/sink.spec.ts +++ b/test/sink.spec.ts @@ -1,11 +1,11 @@ -import { expect } from 'aegir/chai' import { Writable } from 'node:stream' -import * as toIterable from '../src/index.js' +import { expect } from 'aegir/chai' +import delay from 'delay' import { pipe } from 'it-pipe' +import * as toIterable from '../src/index.js' import { randomInt, randomBytes } from './helpers/random.js' -import delay from 'delay' -const slowIterator = async function * (values: Uint8Array[]) { +const slowIterator = async function * (values: Uint8Array[]): AsyncGenerator { for (const value of values) { await delay(1) yield value @@ -42,7 +42,7 @@ describe('sink', () => { }, next () { const value = input[i++] - return { done: !value, value } + return { done: value == null, value } }, return () { returnCalled = true @@ -207,7 +207,7 @@ describe('sink', () => { autoDestroy: false, write (chunk, enc, cb) { stream.emit('error', new Error('boom')) - setImmediate(() => cb()) + setImmediate(() => { cb() }) } }) @@ -233,7 +233,7 @@ describe('sink', () => { next: () => { return { value: input.pop(), - done: !input.length + done: input.length === 0 } } } @@ -242,7 +242,7 @@ describe('sink', () => { autoDestroy: false, write (chunk, enc, cb) { stream.emit('error', new Error('boom')) - setImmediate(() => cb()) + setImmediate(() => { cb() }) } }) @@ -268,7 +268,7 @@ describe('sink', () => { next: () => { return { value: input.pop(), - done: !input.length + done: input.length === 0 } } } @@ -277,7 +277,7 @@ describe('sink', () => { autoDestroy: false, highWaterMark: 0, // cause sink to wait for drain event write (chunk, enc, cb) { - setImmediate(() => cb(new Error('boom'))) + setImmediate(() => { cb(new Error('boom')) }) } }) diff --git a/test/source.spec.ts b/test/source.spec.ts index 318c625..b43745e 100644 --- a/test/source.spec.ts +++ b/test/source.spec.ts @@ -1,7 +1,7 @@ -import { expect } from 'aegir/chai' import { Readable } from 'node:stream' -import * as toIterable from '../src/index.js' +import { expect } from 'aegir/chai' import all from 'it-all' +import * as toIterable from '../src/index.js' import { randomInt, randomBytes } from './helpers/random.js' describe('source', () => { @@ -12,7 +12,7 @@ describe('source', () => { objectMode: true, read () { while (true) { - const data = input[i++] || null + const data = input[i++] ?? null if (!this.push(data)) break } } diff --git a/test/transform.spec.ts b/test/transform.spec.ts index f00671c..40d04e0 100644 --- a/test/transform.spec.ts +++ b/test/transform.spec.ts @@ -1,8 +1,8 @@ -import { expect } from 'aegir/chai' import { Transform } from 'node:stream' +import { expect } from 'aegir/chai' +import bl from 'bl' import all from 'it-all' import { pipe } from 'it-pipe' -import bl from 'bl' import * as toIterable from '../src/index.js' import { randomInt, randomBytes } from './helpers/random.js' @@ -81,7 +81,10 @@ describe('transform', () => { toIterable.transform(new Transform({ transform (chunk, enc, cb) { i++ - if (i > 2) return cb(new Error('boom')) + if (i > 2) { + cb(new Error('boom')) + return + } cb(null, chunk) } })), @@ -92,7 +95,7 @@ describe('transform', () => { }) it('should destroy transform stream and pass through errors from source', async () => { - async function * input () { + async function * input (): AsyncGenerator { yield 'hello,' yield 'world,' throw new Error('test error')