Skip to content

Commit

Permalink
fix: fix types and linting (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain authored Mar 11, 2024
1 parent afedf8b commit ceae18f
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 45 deletions.
6 changes: 3 additions & 3 deletions src/duplex.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { sink } from './sink.js'
import { source } from './source.js'
import type { Duplex } from 'node:stream'
import type { Source, Duplex as ItDuplex } from 'it-stream-types'
import type { Duplex } from 'node:stream'

/**
* Convert a Node.js [`Duplex`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_duplex_and_transform_streams)
* Convert a Node.js [`Duplex`](https://nodejs.org/dist/latest/docs/api/stream.html#class-streamduplex)
* stream to a [duplex iterable](https://achingbrain.github.io/it-stream-types/interfaces/Duplex.html).
*/
export function duplex <TSource = unknown, TSink = TSource> (duplex: Duplex): ItDuplex<AsyncGenerator<TSource>, Source<TSink>, Promise<void>> {
export function duplex <TSource = Uint8Array, TSink = TSource> (duplex: Duplex): ItDuplex<AsyncGenerator<TSource>, Source<TSink>, Promise<void>> {
return {
sink: sink(duplex),
source: source(duplex)
Expand Down
47 changes: 30 additions & 17 deletions src/sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,55 @@ import type { Sink, Source } from 'it-stream-types'
import type { Writable } from 'node:stream'

/**
* Convert a Node.js [`Writable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_writable_streams)
* Convert a Node.js [`Writable`](https://nodejs.org/dist/latest/docs/api/stream.html#class-streamwritable)
* stream to a [sink](https://achingbrain.github.io/it-stream-types/interfaces/Sink.html).
*/
export function sink <T> (writable: Writable): Sink<Source<T>, Promise<void>> {
return async (source: Source<T>): Promise<void> => {
const maybeEndSource = () => {
const maybeEndSource = async (): Promise<void> => {
if (isAsyncGenerator(source)) {
// @ts-expect-error return method expects an argument
return source.return()
await source.return(undefined)
}
}

let error: Error | undefined
let errCb: ((err: Error) => void) | undefined
const errorHandler = (err: Error): void => {
error = err
errCb?.(err)

// When the writable errors, try to end the source to exit iteration early
maybeEndSource()
.catch(err => {
err = new AggregateError([
error,
err
], 'The Writable emitted an error, additionally an error occurred while ending the Source')
})
.finally(() => {
errCb?.(err)
})
}

let closeCb: (() => void) | undefined
let closed = false
const closeHandler = () => {
const closeHandler = (): void => {
closed = true
closeCb?.()
}

let finishCb: (() => void) | undefined
let finished = false
const finishHandler = () => {
const finishHandler = (): void => {
finished = true
finishCb?.()
}

let drainCb: (() => void) | undefined
const drainHandler = () => {
const drainHandler = (): void => {
drainCb?.()
}

const waitForDrainOrClose = () => {
const waitForDrainOrClose = async (): Promise<void> => {
return new Promise<void>((resolve, reject) => {
closeCb = drainCb = resolve
errCb = reject
Expand All @@ -51,17 +59,22 @@ export function sink <T> (writable: Writable): Sink<Source<T>, Promise<void>> {
})
}

const waitForDone = () => {
const waitForDone = async (): Promise<void> => {
// Immediately try to end the source
maybeEndSource()
await maybeEndSource()

return new Promise<void>((resolve, reject) => {
if (closed || finished || error) return resolve()
if (closed || finished || (error != null)) {
resolve()
return
}

finishCb = closeCb = resolve
errCb = reject
})
}

const cleanup = () => {
const cleanup = (): void => {
writable.removeListener('error', errorHandler)
writable.removeListener('close', closeHandler)
writable.removeListener('finish', finishHandler)
Expand All @@ -74,18 +87,18 @@ export function sink <T> (writable: Writable): Sink<Source<T>, Promise<void>> {

try {
for await (const value of source) {
if (!writable.writable || writable.destroyed || error) {
if (!writable.writable || writable.destroyed || (error != null)) {
break
}

if (writable.write(value as any) === false) {
if (!writable.write(value as any)) {
await waitForDrainOrClose()
}
}
} catch (err: any) {
// error is set by stream error handler so only destroy stream if source
// threw
if (!error) {
if (error == null) {
writable.destroy(err)
}

Expand All @@ -104,7 +117,7 @@ export function sink <T> (writable: Writable): Sink<Source<T>, Promise<void>> {
await waitForDone()

// Notify the user an error occurred
if (error) throw error
if (error != null) throw error
} finally {
// Clean up listeners
cleanup()
Expand Down
4 changes: 2 additions & 2 deletions src/source.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import type { Readable } from 'node:stream'

/**
* Convert a Node.js [`Readable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_readable_streams)
* Convert a Node.js [`Readable`](https://nodejs.org/dist/latest/docs/api/stream.html#class-streamreadable)
* stream or a browser [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream)
* to an [iterable source](https://achingbrain.github.io/it-stream-types/types/Source.html).
*/
export function source <T = unknown> (readable: Readable | ReadableStream<T>): AsyncGenerator<T> {
export function source <T = Uint8Array> (readable: Readable | ReadableStream<T>): AsyncGenerator<T> {
// Browser ReadableStream
if (isReadableStream(readable)) {
return (async function * () {
Expand Down
2 changes: 1 addition & 1 deletion src/transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { Source, Transform } from 'it-stream-types'
import type { Duplex } from 'node:stream'

/**
* Convert a [`Transform`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_duplex_and_transform_streams)
* Convert a [`Transform`](https://nodejs.org/dist/latest/docs/api/stream.html#class-streamtransform)
* stream to an [iterable transform](https://achingbrain.github.io/it-stream-types/interfaces/Transform.html).
*/
export function transform <Input = unknown, Output = Input> (transform: Duplex): Transform<Source<Input>, AsyncGenerator<Output>> {
Expand Down
8 changes: 4 additions & 4 deletions test/duplex.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'aegir/chai'
import { Duplex } from 'node:stream'
import { pipe } from 'it-pipe'
import { expect } from 'aegir/chai'
import all from 'it-all'
import { pipe } from 'it-pipe'
import Fifo from 'p-fifo'
import * as toIterable from '../src/index.js'
import { randomInt, randomBytes } from './helpers/random.js'
Expand All @@ -16,10 +16,10 @@ describe('duplex', () => {
toIterable.duplex(new Duplex({
objectMode: true,
write (chunk, enc, cb) {
fifo.push(chunk).then(() => cb())
void fifo.push(chunk).then(() => { cb() })
},
final (cb) {
fifo.push(null).then(() => cb())
void fifo.push(null).then(() => { cb() })
},
async read (size) {
while (true) {
Expand Down
2 changes: 1 addition & 1 deletion test/helpers/random.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ export function randomInt (min: number, max: number): number {
return Math.floor(Math.random() * (max - min)) + min
}

export function randomBytes (min: number, max: number) {
export function randomBytes (min: number, max: number): Uint8Array {
return Crypto.randomBytes(randomInt(min, max))
}
20 changes: 10 additions & 10 deletions test/sink.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { expect } from 'aegir/chai'
import { Writable } from 'node:stream'
import * as toIterable from '../src/index.js'
import { expect } from 'aegir/chai'
import delay from 'delay'
import { pipe } from 'it-pipe'
import * as toIterable from '../src/index.js'
import { randomInt, randomBytes } from './helpers/random.js'
import delay from 'delay'

const slowIterator = async function * (values: Uint8Array[]) {
const slowIterator = async function * (values: Uint8Array[]): AsyncGenerator<Uint8Array> {
for (const value of values) {
await delay(1)
yield value
Expand Down Expand Up @@ -42,7 +42,7 @@ describe('sink', () => {
},
next () {
const value = input[i++]
return { done: !value, value }
return { done: value == null, value }
},
return () {
returnCalled = true
Expand Down Expand Up @@ -207,7 +207,7 @@ describe('sink', () => {
autoDestroy: false,
write (chunk, enc, cb) {
stream.emit('error', new Error('boom'))
setImmediate(() => cb())
setImmediate(() => { cb() })
}
})

Expand All @@ -233,7 +233,7 @@ describe('sink', () => {
next: () => {
return {
value: input.pop(),
done: !input.length
done: input.length === 0
}
}
}
Expand All @@ -242,7 +242,7 @@ describe('sink', () => {
autoDestroy: false,
write (chunk, enc, cb) {
stream.emit('error', new Error('boom'))
setImmediate(() => cb())
setImmediate(() => { cb() })
}
})

Expand All @@ -268,7 +268,7 @@ describe('sink', () => {
next: () => {
return {
value: input.pop(),
done: !input.length
done: input.length === 0
}
}
}
Expand All @@ -277,7 +277,7 @@ describe('sink', () => {
autoDestroy: false,
highWaterMark: 0, // cause sink to wait for drain event
write (chunk, enc, cb) {
setImmediate(() => cb(new Error('boom')))
setImmediate(() => { cb(new Error('boom')) })
}
})

Expand Down
6 changes: 3 additions & 3 deletions test/source.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'aegir/chai'
import { Readable } from 'node:stream'
import * as toIterable from '../src/index.js'
import { expect } from 'aegir/chai'
import all from 'it-all'
import * as toIterable from '../src/index.js'
import { randomInt, randomBytes } from './helpers/random.js'

describe('source', () => {
Expand All @@ -12,7 +12,7 @@ describe('source', () => {
objectMode: true,
read () {
while (true) {
const data = input[i++] || null
const data = input[i++] ?? null
if (!this.push(data)) break
}
}
Expand Down
11 changes: 7 additions & 4 deletions test/transform.spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { expect } from 'aegir/chai'
import { Transform } from 'node:stream'
import { expect } from 'aegir/chai'
import bl from 'bl'
import all from 'it-all'
import { pipe } from 'it-pipe'
import bl from 'bl'
import * as toIterable from '../src/index.js'
import { randomInt, randomBytes } from './helpers/random.js'

Expand Down Expand Up @@ -81,7 +81,10 @@ describe('transform', () => {
toIterable.transform(new Transform({
transform (chunk, enc, cb) {
i++
if (i > 2) return cb(new Error('boom'))
if (i > 2) {
cb(new Error('boom'))
return
}
cb(null, chunk)
}
})),
Expand All @@ -92,7 +95,7 @@ describe('transform', () => {
})

it('should destroy transform stream and pass through errors from source', async () => {
async function * input () {
async function * input (): AsyncGenerator<string> {
yield 'hello,'
yield 'world,'
throw new Error('test error')
Expand Down

0 comments on commit ceae18f

Please sign in to comment.