Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
feat!: add stream muxer close
Browse files Browse the repository at this point in the history
BREAKING CHANGE: StreamMuxer now has a `close` method
  • Loading branch information
wemeetagain committed Jun 21, 2022
1 parent 2afe2cc commit f184744
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 9 deletions.
29 changes: 20 additions & 9 deletions packages/interface-mocks/src/muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ class MockMuxer implements StreamMuxer {
public name: string
public protocol: string = '/mock-muxer/1.0.0'

private readonly closeController: AbortController
private readonly registryInitiatorStreams: Map<string, MuxedStream>
private readonly registryRecipientStreams: Map<string, MuxedStream>
private readonly options: StreamMuxerInit
Expand All @@ -279,17 +280,11 @@ class MockMuxer implements StreamMuxer {
this.registryRecipientStreams = new Map()
this.log('create muxer')
this.options = init ?? { direction: 'inbound' }
this.closeController = new AbortController()
// receives data from the muxer at the other end of the stream
this.source = this.input = pushable({
onEnd: (err) => {
this.log('closing muxed streams')
for (const stream of this.streams) {
if (err == null) {
stream.close()
} else {
stream.abort(err)
}
}
this.close(err)
}
})

Expand All @@ -303,7 +298,7 @@ class MockMuxer implements StreamMuxer {
async sink (source: Source<Uint8Array>) {
try {
await pipe(
source,
abortableSource(source, this.closeController.signal),
(source) => map(source, buf => uint8ArrayToString(buf)),
ndjson.parse,
async (source) => {
Expand Down Expand Up @@ -366,6 +361,9 @@ class MockMuxer implements StreamMuxer {
}

newStream (name?: string) {
if (this.closeController.signal.aborted) {
throw new Error('Muxer already closed')
}
this.log('newStream %s', name)
const storedStream = this.createStream(name, 'initiator')
this.registryInitiatorStreams.set(storedStream.stream.id, storedStream)
Expand Down Expand Up @@ -399,6 +397,19 @@ class MockMuxer implements StreamMuxer {

return muxedStream
}

close (err?: Error): void {
if (this.closeController.signal.aborted) return
this.log('closing muxed streams')

if (err == null) {
this.streams.forEach(s => s.close())
} else {
this.streams.forEach(s => s.abort(err))
}
this.closeController.abort()
this.input.end(err)
}
}

class MockMuxerFactory implements StreamMuxerFactory {
Expand Down
110 changes: 110 additions & 0 deletions packages/interface-stream-muxer-compliance-tests/src/close-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,116 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
expect(dialer.streams).to.have.lengthOf(0)
})

it('calling close closes streams', async () => {
let openedStreams = 0
const expectedStreams = 5
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer()

// Listener is echo server :)
const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
onIncomingStream: (stream) => {
openedStreams++
void pipe(stream, stream)
}
})

const p = duplexPair<Uint8Array>()
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const streams = Array(expectedStreams).fill(0).map(() => dialer.newStream())

void Promise.all(
streams.map(async stream => {
return await pipe(
infiniteRandom,
stream,
drain
)
})
)

expect(dialer.streams, 'dialer - number of opened streams should match number of calls to newStream').to.have.lengthOf(expectedStreams)

// Pause, and then close the dialer
await delay(50)

dialer.close()

expect(openedStreams, 'listener - number of opened streams should match number of calls to newStream').to.have.equal(expectedStreams)
expect(dialer.streams, 'all tracked streams should be deleted after the muxer has called close').to.have.lengthOf(0)
})

it('calling close with an error aborts streams', async () => {
let openedStreams = 0
const expectedStreams = 5
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer()

// Listener is echo server :)
const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
onIncomingStream: (stream) => {
openedStreams++
void pipe(stream, stream)
}
})

const p = duplexPair<Uint8Array>()
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const streams = Array(expectedStreams).fill(0).map(() => dialer.newStream())

void Promise.all(
streams.map(async stream => {
return await pipe(
infiniteRandom,
stream,
drain
)
})
)

expect(dialer.streams, 'dialer - number of opened streams should match number of calls to newStream').to.have.lengthOf(expectedStreams)

// Pause, and then close the dialer
await delay(50)

// close _with an error_
dialer.close(new Error())

expect(openedStreams, 'listener - number of opened streams should match number of calls to newStream').to.have.equal(expectedStreams)
expect(dialer.streams, 'all tracked streams should be deleted after the muxer has called close').to.have.lengthOf(0)
})

it('calling newStream after close throws an error', async () => {
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer()

// Listener is echo server :)
const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
onIncomingStream: (stream) => void pipe(stream, stream)
})

const p = duplexPair<Uint8Array>()
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

// close the dialer
dialer.close()

try {
dialer.newStream()
expect.fail('newStream should throw if called after close')
} catch (e) {
expect(dialer.streams, 'closed muxer should have no streams').to.have.lengthOf(0)
}
})

it('closing one of the muxed streams doesn\'t close others', async () => {
const p = duplexPair<Uint8Array>()
const dialerFactory = await common.setup()
Expand Down
5 changes: 5 additions & 0 deletions packages/interface-stream-muxer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ export interface StreamMuxer extends Duplex<Uint8Array> {
* provided, the id of the stream will be used.
*/
newStream: (name?: string) => Stream

/**
* Close or abort all tracked streams and stop the muxer
*/
close: (err?: Error) => void
}

export interface StreamMuxerInit extends AbortOptions {
Expand Down

0 comments on commit f184744

Please sign in to comment.