Skip to content

Commit

Permalink
fix: limit stream concurrency (#77)
Browse files Browse the repository at this point in the history
Pass `maxInboundStreams` and `maxOutboundStreams` options to registrar
  • Loading branch information
achingbrain authored Jun 17, 2022
1 parent a0ef679 commit d4f1779
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@
"dependencies": {
"@libp2p/components": "^2.0.0",
"@libp2p/crypto": "^1.0.0",
"@libp2p/interface-connection": "^2.0.0",
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/interface-pubsub": "^1.0.3",
"@libp2p/interface-registrar": "^2.0.0",
"@libp2p/interfaces": "^3.0.2",
"@libp2p/logger": "^2.0.0",
"@libp2p/peer-collections": "^1.0.0",
Expand All @@ -191,10 +195,6 @@
"uint8arrays": "^3.0.0"
},
"devDependencies": {
"@libp2p/interface-connection": "^2.0.0",
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/interface-pubsub": "^1.0.1",
"@libp2p/interface-registrar": "^2.0.0",
"@libp2p/peer-id-factory": "^1.0.0",
"aegir": "^37.2.0",
"delay": "^5.0.0",
Expand Down
13 changes: 11 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi

private _registrarTopologyIds: string[] | undefined
protected enabled: boolean
private readonly maxInboundStreams: number
private readonly maxOutboundStreams: number

constructor (props: PubSubInit) {
super()
Expand All @@ -74,7 +76,9 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
globalSignaturePolicy = 'StrictSign',
canRelayMessage = false,
emitSelf = false,
messageProcessingConcurrency = 10
messageProcessingConcurrency = 10,
maxInboundStreams = 1,
maxOutboundStreams = 1
} = props

this.multicodecs = ensureArray(multicodecs)
Expand All @@ -88,6 +92,8 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
this.emitSelf = emitSelf
this.topicValidators = new Map()
this.queue = new Queue({ concurrency: messageProcessingConcurrency })
this.maxInboundStreams = maxInboundStreams
this.maxOutboundStreams = maxOutboundStreams

this._onIncomingStream = this._onIncomingStream.bind(this)
this._onPeerConnected = this._onPeerConnected.bind(this)
Expand Down Expand Up @@ -115,7 +121,10 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
const registrar = this.components.getRegistrar()
// Incoming streams
// Called after a peer dials us
await Promise.all(this.multicodecs.map(async multicodec => await registrar.handle(multicodec, this._onIncomingStream)))
await Promise.all(this.multicodecs.map(async multicodec => await registrar.handle(multicodec, this._onIncomingStream, {
maxInboundStreams: this.maxInboundStreams,
maxOutboundStreams: this.maxOutboundStreams
})))

// register protocol with topology
// Topology callbacks called on connection manager changes
Expand Down

0 comments on commit d4f1779

Please sign in to comment.