Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
deps: update any-signal to 4.x.x (#270)
Browse files Browse the repository at this point in the history
`any-signal` can now remove the event listeners it installs preventing
a source of memory leaks.
  • Loading branch information
achingbrain authored Apr 13, 2023
1 parent 0b0df94 commit 2820884
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 10 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@
"@libp2p/interfaces": "^3.2.0",
"@libp2p/logger": "^2.0.0",
"abortable-iterator": "^4.0.2",
"any-signal": "^3.0.0",
"any-signal": "^4.0.1",
"benchmark": "^2.1.4",
"it-batched-bytes": "^1.0.0",
"it-pushable": "^3.1.0",
Expand Down
13 changes: 6 additions & 7 deletions src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type { Sink } from 'it-stream-types'
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interface-stream-muxer'
import type { Stream } from '@libp2p/interface-connection'
import type { MplexInit } from './index.js'
import anySignal from 'any-signal'
import { anySignal } from 'any-signal'
import type { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p:mplex')
Expand Down Expand Up @@ -192,14 +192,11 @@ export class MplexStreamMuxer implements StreamMuxer {
*/
_createSink (): Sink<Uint8Array> {
const sink: Sink<Uint8Array> = async source => {
// see: https://github.com/jacobheun/any-signal/pull/18
const abortSignals = [this.closeController.signal]
if (this._init.signal != null) {
abortSignals.push(this._init.signal)
}
source = abortableSource(source, anySignal(abortSignals))
const signal = anySignal([this.closeController.signal, this._init.signal])

try {
source = abortableSource(source, signal)

const decoder = new Decoder(this._init.maxMsgSize, this._init.maxUnprocessedMessageQueueSize)

for await (const chunk of source) {
Expand All @@ -212,6 +209,8 @@ export class MplexStreamMuxer implements StreamMuxer {
} catch (err: any) {
log('error in sink', err)
this._source.end(err) // End the source with an error
} finally {
signal.clear()
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,15 @@ export function createStream (options: Options): MplexStream {
throw new CodeError('stream closed for writing', ERR_SINK_ENDED)
}

source = abortableSource(source, anySignal([
const signal = anySignal([
abortController.signal,
resetController.signal,
closeController.signal
]))
])

try {
source = abortableSource(source, signal)

if (type === 'initiator') { // If initiator, open a new stream
send({ id, type: InitiatorMessageTypes.NEW_STREAM, data: new Uint8ArrayList(uint8ArrayFromString(streamName)) })
}
Expand Down Expand Up @@ -214,6 +216,8 @@ export function createStream (options: Options): MplexStream {
streamSource.end(err)
onSinkEnd(err)
return
} finally {
signal.clear()
}

try {
Expand Down

0 comments on commit 2820884

Please sign in to comment.