diff --git a/package.json b/package.json index 70bf617..87a9e82 100644 --- a/package.json +++ b/package.json @@ -152,7 +152,7 @@ "@libp2p/interfaces": "^3.2.0", "@libp2p/logger": "^2.0.0", "abortable-iterator": "^4.0.2", - "any-signal": "^3.0.0", + "any-signal": "^4.0.1", "benchmark": "^2.1.4", "it-batched-bytes": "^1.0.0", "it-pushable": "^3.1.0", diff --git a/src/mplex.ts b/src/mplex.ts index c2e8c44..69bcfd7 100644 --- a/src/mplex.ts +++ b/src/mplex.ts @@ -12,7 +12,7 @@ import type { Sink } from 'it-stream-types' import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interface-stream-muxer' import type { Stream } from '@libp2p/interface-connection' import type { MplexInit } from './index.js' -import anySignal from 'any-signal' +import { anySignal } from 'any-signal' import type { Uint8ArrayList } from 'uint8arraylist' const log = logger('libp2p:mplex') @@ -192,14 +192,11 @@ export class MplexStreamMuxer implements StreamMuxer { */ _createSink (): Sink { const sink: Sink = async source => { - // see: https://github.com/jacobheun/any-signal/pull/18 - const abortSignals = [this.closeController.signal] - if (this._init.signal != null) { - abortSignals.push(this._init.signal) - } - source = abortableSource(source, anySignal(abortSignals)) + const signal = anySignal([this.closeController.signal, this._init.signal]) try { + source = abortableSource(source, signal) + const decoder = new Decoder(this._init.maxMsgSize, this._init.maxUnprocessedMessageQueueSize) for await (const chunk of source) { @@ -212,6 +209,8 @@ export class MplexStreamMuxer implements StreamMuxer { } catch (err: any) { log('error in sink', err) this._source.end(err) // End the source with an error + } finally { + signal.clear() } } diff --git a/src/stream.ts b/src/stream.ts index 949e574..924b0a8 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -160,13 +160,15 @@ export function createStream (options: Options): MplexStream { throw new CodeError('stream closed for writing', ERR_SINK_ENDED) } - source = abortableSource(source, anySignal([ + const signal = anySignal([ abortController.signal, resetController.signal, closeController.signal - ])) + ]) try { + source = abortableSource(source, signal) + if (type === 'initiator') { // If initiator, open a new stream send({ id, type: InitiatorMessageTypes.NEW_STREAM, data: new Uint8ArrayList(uint8ArrayFromString(streamName)) }) } @@ -214,6 +216,8 @@ export function createStream (options: Options): MplexStream { streamSource.end(err) onSinkEnd(err) return + } finally { + signal.clear() } try {