Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

feat!: add stream muxer close #254

Merged
merged 1 commit into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions packages/interface-mocks/src/muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ class MockMuxer implements StreamMuxer {
public name: string
public protocol: string = '/mock-muxer/1.0.0'

private readonly closeController: AbortController
private readonly registryInitiatorStreams: Map<string, MuxedStream>
private readonly registryRecipientStreams: Map<string, MuxedStream>
private readonly options: StreamMuxerInit
Expand All @@ -279,17 +280,11 @@ class MockMuxer implements StreamMuxer {
this.registryRecipientStreams = new Map()
this.log('create muxer')
this.options = init ?? { direction: 'inbound' }
this.closeController = new AbortController()
// receives data from the muxer at the other end of the stream
this.source = this.input = pushable({
onEnd: (err) => {
this.log('closing muxed streams')
for (const stream of this.streams) {
if (err == null) {
stream.close()
} else {
stream.abort(err)
}
}
this.close(err)
}
})

Expand All @@ -303,7 +298,7 @@ class MockMuxer implements StreamMuxer {
async sink (source: Source<Uint8Array>) {
try {
await pipe(
source,
abortableSource(source, this.closeController.signal),
(source) => map(source, buf => uint8ArrayToString(buf)),
ndjson.parse,
async (source) => {
Expand Down Expand Up @@ -366,6 +361,9 @@ class MockMuxer implements StreamMuxer {
}

newStream (name?: string) {
if (this.closeController.signal.aborted) {
throw new Error('Muxer already closed')
}
this.log('newStream %s', name)
const storedStream = this.createStream(name, 'initiator')
this.registryInitiatorStreams.set(storedStream.stream.id, storedStream)
Expand Down Expand Up @@ -399,6 +397,19 @@ class MockMuxer implements StreamMuxer {

return muxedStream
}

close (err?: Error): void {
if (this.closeController.signal.aborted) return
this.log('closing muxed streams')

if (err == null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (err) instead to catch undefined ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a linter rule against implicitly coersing to a boolean.
foo == null does the same tho, will match on both null and undefined.

this.streams.forEach(s => s.close())
} else {
this.streams.forEach(s => s.abort(err))
}
this.closeController.abort()
this.input.end(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling this.input.end(err) may not be necessary here.

I think what will happen is calling this.closeController.abort() will cause the pipe in sink to throw, the error will be caught and passed to this.input.end on line 316.

Ending the input should then cause the connection to close? It's not clear what use a muxed connection with a closed muxer is, so that's probably good.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I think the only big difference is that by calling this.input.end(err) explicitly, the error is passed to the source.
If we rely on the pipe in sink to throw, the error that the source will get will be some generic "aborted" error.

Not sure which is the desired behavior, or that either behavior matters to the interface.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this.input.end(err) is better - it's not async and the received error is more useful than 'aborted'.

}
}

class MockMuxerFactory implements StreamMuxerFactory {
Expand Down
114 changes: 114 additions & 0 deletions packages/interface-stream-muxer-compliance-tests/src/close-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,120 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
expect(dialer.streams).to.have.lengthOf(0)
})

it('calling close closes streams', async () => {
wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
let openedStreams = 0
const expectedStreams = 5
wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })

// Listener is echo server :)
const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
direction: 'inbound',
onIncomingStream: (stream) => {
openedStreams++
void pipe(stream, stream)
}
})

const p = duplexPair<Uint8Array>()
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const streams = Array(expectedStreams).fill(0).map(() => dialer.newStream())

void Promise.all(
streams.map(async stream => {
return await pipe(
infiniteRandom,
stream,
drain
)
})
)

expect(dialer.streams, 'dialer - number of opened streams should match number of calls to newStream').to.have.lengthOf(expectedStreams)

// Pause, and then close the dialer
await delay(50)

dialer.close()

expect(openedStreams, 'listener - number of opened streams should match number of calls to newStream').to.have.equal(expectedStreams)
expect(dialer.streams, 'all tracked streams should be deleted after the muxer has called close').to.have.lengthOf(0)
})

it('calling close with an error aborts streams', async () => {
let openedStreams = 0
const expectedStreams = 5
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })

// Listener is echo server :)
const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
direction: 'inbound',
onIncomingStream: (stream) => {
openedStreams++
void pipe(stream, stream)
}
})

const p = duplexPair<Uint8Array>()
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const streams = Array(expectedStreams).fill(0).map(() => dialer.newStream())

const streamPipes = streams.map(async stream => {
return await pipe(
infiniteRandom,
stream,
drain
)
})

expect(dialer.streams, 'dialer - number of opened streams should match number of calls to newStream').to.have.lengthOf(expectedStreams)

// Pause, and then close the dialer
await delay(50)

// close _with an error_
dialer.close(new Error())

const timeoutError = new Error('timeout')
for (const pipe of streamPipes) {
try {
await Promise.race([
pipe,
new Promise((_resolve, reject) => setTimeout(() => reject(timeoutError), 20))
])
expect.fail('stream pipe with infinite source should never return')
} catch (e) {
if (e === timeoutError) {
expect.fail('expected stream pipe to throw an error after muxer closed with error')
}
}
}

expect(openedStreams, 'listener - number of opened streams should match number of calls to newStream').to.have.equal(expectedStreams)
expect(dialer.streams, 'all tracked streams should be deleted after the muxer has called close').to.have.lengthOf(0)
})

it('calling newStream after close throws an error', async () => {
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })

dialer.close()

try {
dialer.newStream()
expect.fail('newStream should throw if called after close')
} catch (e) {
expect(dialer.streams, 'closed muxer should have no streams').to.have.lengthOf(0)
}
})

it('closing one of the muxed streams doesn\'t close others', async () => {
const p = duplexPair<Uint8Array>()
const dialerFactory = await common.setup()
Expand Down
5 changes: 5 additions & 0 deletions packages/interface-stream-muxer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ export interface StreamMuxer extends Duplex<Uint8Array> {
* provided, the id of the stream will be used.
*/
newStream: (name?: string) => Stream

/**
* Close or abort all tracked streams and stop the muxer
*/
close: (err?: Error) => void
}

export interface StreamMuxerInit extends AbortOptions {
Expand Down