Skip to content

Commit

Permalink
fix: allow specifiying maxOutboundStreams in connection.newStream (#1817
Browse files Browse the repository at this point in the history
)

To allow overriding the default maximum outbound streams for a
protocol when it's not been specified in a handler, allow passing
it as an option when opening a new outbound stream.
  • Loading branch information
achingbrain authored Jun 14, 2023
1 parent 0d11c24 commit b348fba
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 13 deletions.
2 changes: 1 addition & 1 deletion packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
"@achingbrain/nat-port-mapper": "^1.0.9",
"@libp2p/crypto": "^1.0.17",
"@libp2p/interface-address-manager": "^3.0.0",
"@libp2p/interface-connection": "^5.0.0",
"@libp2p/interface-connection": "^5.1.1",
"@libp2p/interface-connection-encrypter": "^4.0.0",
"@libp2p/interface-connection-gater": "^3.0.0",
"@libp2p/interface-connection-manager": "^3.0.0",
Expand Down
17 changes: 16 additions & 1 deletion packages/libp2p/src/circuit-relay/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { RecordEnvelope } from '@libp2p/peer-record'
import { type Multiaddr, multiaddr } from '@multiformats/multiaddr'
import { pbStream, type ProtobufStream } from 'it-pb-stream'
import pDefer from 'p-defer'
import { MAX_CONNECTIONS } from '../../connection-manager/constants.js'
import {
CIRCUIT_PROTO_CODE,
DEFAULT_HOP_TIMEOUT,
Expand Down Expand Up @@ -59,6 +60,12 @@ export interface CircuitRelayServerInit {
* The maximum number of simultaneous HOP outbound streams that can be open at once
*/
maxOutboundHopStreams?: number

/**
* The maximum number of simultaneous STOP outbound streams that can be open at
* once. (default: 300)
*/
maxOutboundStopStreams?: number
}

export interface HopProtocolOptions {
Expand Down Expand Up @@ -87,6 +94,10 @@ export interface RelayServerEvents {
'relay:advert:error': CustomEvent<Error>
}

const defaults = {
maxOutboundStopStreams: MAX_CONNECTIONS
}

class CircuitRelayServer extends EventEmitter<RelayServerEvents> implements Startable, CircuitRelayService {
private readonly registrar: Registrar
private readonly peerStore: PeerStore
Expand All @@ -101,6 +112,7 @@ class CircuitRelayServer extends EventEmitter<RelayServerEvents> implements Star
private readonly shutdownController: AbortController
private readonly maxInboundHopStreams?: number
private readonly maxOutboundHopStreams?: number
private readonly maxOutboundStopStreams: number

/**
* Creates an instance of Relay
Expand All @@ -119,6 +131,7 @@ class CircuitRelayServer extends EventEmitter<RelayServerEvents> implements Star
this.shutdownController = new AbortController()
this.maxInboundHopStreams = init.maxInboundHopStreams
this.maxOutboundHopStreams = init.maxOutboundHopStreams
this.maxOutboundStopStreams = init.maxOutboundStopStreams ?? defaults.maxOutboundStopStreams

try {
// fails on node < 15.4
Expand Down Expand Up @@ -390,7 +403,9 @@ class CircuitRelayServer extends EventEmitter<RelayServerEvents> implements Star
request
}: StopOptions): Promise<Stream | undefined> {
log('starting circuit relay v2 stop request to %s', connection.remotePeer)
const stream = await connection.newStream([RELAY_V2_STOP_CODEC])
const stream = await connection.newStream([RELAY_V2_STOP_CODEC], {
maxOutboundStreams: this.maxOutboundStopStreams
})
const pbstr = pbStream(stream)
const stopstr = pbstr.pb(StopMessage)
stopstr.write(request)
Expand Down
9 changes: 5 additions & 4 deletions packages/libp2p/src/circuit-relay/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,15 @@ export interface CircuitRelayTransportInit extends RelayStoreInit {

/**
* The maximum number of simultaneous STOP outbound streams that can be open at
* once. STOP streams are opened by the relay server so this setting is
* effectively ignored. (default: 32)
* once. If this transport is used along with the relay server these settings
* should be set to the same value (default: 300)
*/
maxOutboundStopStreams?: number
}

const defaults = {
maxInboundStopStreams: MAX_CONNECTIONS
maxInboundStopStreams: MAX_CONNECTIONS,
maxOutboundStopStreams: MAX_CONNECTIONS
}

class CircuitRelayTransport implements Transport {
Expand All @@ -115,7 +116,7 @@ class CircuitRelayTransport implements Transport {
this.addressManager = components.addressManager
this.connectionGater = components.connectionGater
this.maxInboundStopStreams = init.maxInboundStopStreams ?? defaults.maxInboundStopStreams
this.maxOutboundStopStreams = init.maxOutboundStopStreams
this.maxOutboundStopStreams = init.maxOutboundStopStreams ?? defaults.maxOutboundStopStreams

if (init.discoverRelays != null && init.discoverRelays > 0) {
this.discovery = new RelayDiscovery(components)
Expand Down
17 changes: 10 additions & 7 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { createConnection } from './connection/index.js'
import { INBOUND_UPGRADE_TIMEOUT } from './connection-manager/constants.js'
import { codes } from './errors.js'
import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js'
import type { MultiaddrConnection, Connection, Stream, ConnectionProtector } from '@libp2p/interface-connection'
import type { MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions } from '@libp2p/interface-connection'
import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface-connection-encrypter'
import type { ConnectionGater } from '@libp2p/interface-connection-gater'
import type { ConnectionManager } from '@libp2p/interface-connection-manager'
Expand Down Expand Up @@ -70,17 +70,20 @@ function findIncomingStreamLimit (protocol: string, registrar: Registrar): numbe
return DEFAULT_MAX_INBOUND_STREAMS
}

function findOutgoingStreamLimit (protocol: string, registrar: Registrar): number | undefined {
function findOutgoingStreamLimit (protocol: string, registrar: Registrar, options: NewStreamOptions = {}): number {
try {
const { options } = registrar.getHandler(protocol)
return options.maxOutboundStreams

if (options.maxOutboundStreams != null) {
return options.maxOutboundStreams
}
} catch (err: any) {
if (err.code !== codes.ERR_NO_HANDLER_FOR_PROTOCOL) {
throw err
}
}

return DEFAULT_MAX_OUTBOUND_STREAMS
return options.maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS
}

function countStreams (protocol: string, direction: 'inbound' | 'outbound', connection: Connection): number {
Expand Down Expand Up @@ -428,7 +431,7 @@ export class DefaultUpgrader implements Upgrader {
}
})

newStream = async (protocols: string[], options: AbortOptions = {}): Promise<Stream> => {
newStream = async (protocols: string[], options: NewStreamOptions = {}): Promise<Stream> => {
if (muxer == null) {
throw new CodeError('Stream is not multiplexed', codes.ERR_MUXER_UNAVAILABLE)
}
Expand All @@ -450,10 +453,10 @@ export class DefaultUpgrader implements Upgrader {

const { stream, protocol } = await mss.select(muxedStream, protocols, options)

const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.registrar)
const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.registrar, options)
const streamCount = countStreams(protocol, 'outbound', connection)

if (streamCount === outgoingLimit) {
if (streamCount >= outgoingLimit) {
const err = new CodeError(`Too many outbound protocol streams for protocol "${protocol}" - limit ${outgoingLimit}`, codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS)
muxedStream.abort(err)

Expand Down
85 changes: 85 additions & 0 deletions packages/libp2p/test/upgrading/upgrader.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { codes } from '../../src/errors.js'
import { createLibp2p } from '../../src/index.js'
import { plaintext } from '../../src/insecure/index.js'
import { preSharedKey } from '../../src/pnet/index.js'
import { DEFAULT_MAX_OUTBOUND_STREAMS } from '../../src/registrar.js'
import { DefaultUpgrader } from '../../src/upgrader.js'
import swarmKey from '../fixtures/swarm.key.js'
import type { Connection, ConnectionProtector, Stream } from '@libp2p/interface-connection'
Expand Down Expand Up @@ -890,4 +891,88 @@ describe('libp2p.upgrader', () => {
await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected()
.with.property('code', codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS)
})

it('should allow overriding the number of outgoing streams that can be opened using a protocol without a handler', async () => {
const localDeferred = pDefer<Components>()
const remoteDeferred = pDefer<Components>()
const protocol = '/a-test-protocol/1.0.0'
const remotePeer = peers[1]
libp2p = await createLibp2p({
peerId: peers[0],
transports: [
webSockets()
],
streamMuxers: [
yamux()
],
connectionEncryption: [
plaintext()
],
services: {
test: (components: any) => {
localDeferred.resolve(components)
}
},
connectionGater: mockConnectionGater()
})

remoteLibp2p = await createLibp2p({
peerId: remotePeer,
transports: [
webSockets()
],
streamMuxers: [
yamux()
],
connectionEncryption: [
plaintext()
],
services: {
test: (components: any) => {
remoteDeferred.resolve(components)
}
}
})

const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })

const localComponents = await localDeferred.promise
const remoteComponents = await remoteDeferred.promise

const [localToRemote] = await Promise.all([
localComponents.upgrader.upgradeOutbound(outbound),
remoteComponents.upgrader.upgradeInbound(inbound)
])

let streamCount = 0

const limit = DEFAULT_MAX_OUTBOUND_STREAMS + 1

await remoteLibp2p.handle(protocol, (data) => {
streamCount++
}, {
maxInboundStreams: limit + 1,
maxOutboundStreams: 10
})

expect(streamCount).to.equal(0)

for (let i = 0; i < limit; i++) {
await localToRemote.newStream(protocol, {
maxOutboundStreams: limit
})
}

expect(streamCount).to.equal(limit)

// should reject without overriding limit
await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected()
.with.property('code', codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS)

// should reject even with overriding limit
await expect(localToRemote.newStream(protocol, {
maxOutboundStreams: limit
})).to.eventually.be.rejected()
.with.property('code', codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS)
})
})

0 comments on commit b348fba

Please sign in to comment.