From 8641d7d8ce8005f8ed80d17c67f97b3928729a46 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 14 Jun 2023 16:42:43 +0200 Subject: [PATCH] fix: allow specifiying maxOutboundStreams in connection.newStream To allow overriding the default maximum outbound streams for a protocol when it's not been specified in a handler, allow passing it as an option when opening a new outbound stream. --- packages/libp2p/package.json | 2 +- .../libp2p/src/circuit-relay/server/index.ts | 17 +++- .../src/circuit-relay/transport/index.ts | 9 +- packages/libp2p/src/upgrader.ts | 17 ++-- .../libp2p/test/upgrading/upgrader.spec.ts | 85 +++++++++++++++++++ 5 files changed, 117 insertions(+), 13 deletions(-) diff --git a/packages/libp2p/package.json b/packages/libp2p/package.json index b4e91966ee..f8b2476d38 100644 --- a/packages/libp2p/package.json +++ b/packages/libp2p/package.json @@ -121,7 +121,7 @@ "@achingbrain/nat-port-mapper": "^1.0.9", "@libp2p/crypto": "^1.0.17", "@libp2p/interface-address-manager": "^3.0.0", - "@libp2p/interface-connection": "^5.0.0", + "@libp2p/interface-connection": "^5.1.1", "@libp2p/interface-connection-encrypter": "^4.0.0", "@libp2p/interface-connection-gater": "^3.0.0", "@libp2p/interface-connection-manager": "^3.0.0", diff --git a/packages/libp2p/src/circuit-relay/server/index.ts b/packages/libp2p/src/circuit-relay/server/index.ts index 414bd350ad..401a74d1b9 100644 --- a/packages/libp2p/src/circuit-relay/server/index.ts +++ b/packages/libp2p/src/circuit-relay/server/index.ts @@ -6,6 +6,7 @@ import { RecordEnvelope } from '@libp2p/peer-record' import { type Multiaddr, multiaddr } from '@multiformats/multiaddr' import { pbStream, type ProtobufStream } from 'it-pb-stream' import pDefer from 'p-defer' +import { MAX_CONNECTIONS } from '../../connection-manager/constants.js' import { CIRCUIT_PROTO_CODE, DEFAULT_HOP_TIMEOUT, @@ -59,6 +60,12 @@ export interface CircuitRelayServerInit { * The maximum number of simultaneous HOP outbound streams that can be open at once */ maxOutboundHopStreams?: number + + /** + * The maximum number of simultaneous STOP outbound streams that can be open at + * once. (default: 300) + */ + maxOutboundStopStreams?: number } export interface HopProtocolOptions { @@ -87,6 +94,10 @@ export interface RelayServerEvents { 'relay:advert:error': CustomEvent } +const defaults = { + maxOutboundStopStreams: MAX_CONNECTIONS +} + class CircuitRelayServer extends EventEmitter implements Startable, CircuitRelayService { private readonly registrar: Registrar private readonly peerStore: PeerStore @@ -101,6 +112,7 @@ class CircuitRelayServer extends EventEmitter implements Star private readonly shutdownController: AbortController private readonly maxInboundHopStreams?: number private readonly maxOutboundHopStreams?: number + private readonly maxOutboundStopStreams: number /** * Creates an instance of Relay @@ -119,6 +131,7 @@ class CircuitRelayServer extends EventEmitter implements Star this.shutdownController = new AbortController() this.maxInboundHopStreams = init.maxInboundHopStreams this.maxOutboundHopStreams = init.maxOutboundHopStreams + this.maxOutboundStopStreams = init.maxOutboundStopStreams ?? defaults.maxOutboundStopStreams try { // fails on node < 15.4 @@ -390,7 +403,9 @@ class CircuitRelayServer extends EventEmitter implements Star request }: StopOptions): Promise { log('starting circuit relay v2 stop request to %s', connection.remotePeer) - const stream = await connection.newStream([RELAY_V2_STOP_CODEC]) + const stream = await connection.newStream([RELAY_V2_STOP_CODEC], { + maxOutboundStreams: this.maxOutboundStopStreams + }) const pbstr = pbStream(stream) const stopstr = pbstr.pb(StopMessage) stopstr.write(request) diff --git a/packages/libp2p/src/circuit-relay/transport/index.ts b/packages/libp2p/src/circuit-relay/transport/index.ts index 92043c7dff..504e53dcec 100644 --- a/packages/libp2p/src/circuit-relay/transport/index.ts +++ b/packages/libp2p/src/circuit-relay/transport/index.ts @@ -82,14 +82,15 @@ export interface CircuitRelayTransportInit extends RelayStoreInit { /** * The maximum number of simultaneous STOP outbound streams that can be open at - * once. STOP streams are opened by the relay server so this setting is - * effectively ignored. (default: 32) + * once. If this transport is used along with the relay server these settings + * should be set to the same value (default: 300) */ maxOutboundStopStreams?: number } const defaults = { - maxInboundStopStreams: MAX_CONNECTIONS + maxInboundStopStreams: MAX_CONNECTIONS, + maxOutboundStopStreams: MAX_CONNECTIONS } class CircuitRelayTransport implements Transport { @@ -115,7 +116,7 @@ class CircuitRelayTransport implements Transport { this.addressManager = components.addressManager this.connectionGater = components.connectionGater this.maxInboundStopStreams = init.maxInboundStopStreams ?? defaults.maxInboundStopStreams - this.maxOutboundStopStreams = init.maxOutboundStopStreams + this.maxOutboundStopStreams = init.maxOutboundStopStreams ?? defaults.maxOutboundStopStreams if (init.discoverRelays != null && init.discoverRelays > 0) { this.discovery = new RelayDiscovery(components) diff --git a/packages/libp2p/src/upgrader.ts b/packages/libp2p/src/upgrader.ts index 21d1da5b4f..c495be1edc 100644 --- a/packages/libp2p/src/upgrader.ts +++ b/packages/libp2p/src/upgrader.ts @@ -9,7 +9,7 @@ import { createConnection } from './connection/index.js' import { INBOUND_UPGRADE_TIMEOUT } from './connection-manager/constants.js' import { codes } from './errors.js' import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js' -import type { MultiaddrConnection, Connection, Stream, ConnectionProtector } from '@libp2p/interface-connection' +import type { MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions } from '@libp2p/interface-connection' import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface-connection-encrypter' import type { ConnectionGater } from '@libp2p/interface-connection-gater' import type { ConnectionManager } from '@libp2p/interface-connection-manager' @@ -70,17 +70,20 @@ function findIncomingStreamLimit (protocol: string, registrar: Registrar): numbe return DEFAULT_MAX_INBOUND_STREAMS } -function findOutgoingStreamLimit (protocol: string, registrar: Registrar): number | undefined { +function findOutgoingStreamLimit (protocol: string, registrar: Registrar, options: NewStreamOptions = {}): number { try { const { options } = registrar.getHandler(protocol) - return options.maxOutboundStreams + + if (options.maxOutboundStreams != null) { + return options.maxOutboundStreams + } } catch (err: any) { if (err.code !== codes.ERR_NO_HANDLER_FOR_PROTOCOL) { throw err } } - return DEFAULT_MAX_OUTBOUND_STREAMS + return options.maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS } function countStreams (protocol: string, direction: 'inbound' | 'outbound', connection: Connection): number { @@ -428,7 +431,7 @@ export class DefaultUpgrader implements Upgrader { } }) - newStream = async (protocols: string[], options: AbortOptions = {}): Promise => { + newStream = async (protocols: string[], options: NewStreamOptions = {}): Promise => { if (muxer == null) { throw new CodeError('Stream is not multiplexed', codes.ERR_MUXER_UNAVAILABLE) } @@ -450,10 +453,10 @@ export class DefaultUpgrader implements Upgrader { const { stream, protocol } = await mss.select(muxedStream, protocols, options) - const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.registrar) + const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.registrar, options) const streamCount = countStreams(protocol, 'outbound', connection) - if (streamCount === outgoingLimit) { + if (streamCount >= outgoingLimit) { const err = new CodeError(`Too many outbound protocol streams for protocol "${protocol}" - limit ${outgoingLimit}`, codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS) muxedStream.abort(err) diff --git a/packages/libp2p/test/upgrading/upgrader.spec.ts b/packages/libp2p/test/upgrading/upgrader.spec.ts index b4bdc5ac3c..62ecbbfd87 100644 --- a/packages/libp2p/test/upgrading/upgrader.spec.ts +++ b/packages/libp2p/test/upgrading/upgrader.spec.ts @@ -27,6 +27,7 @@ import { codes } from '../../src/errors.js' import { createLibp2p } from '../../src/index.js' import { plaintext } from '../../src/insecure/index.js' import { preSharedKey } from '../../src/pnet/index.js' +import { DEFAULT_MAX_OUTBOUND_STREAMS } from '../../src/registrar.js' import { DefaultUpgrader } from '../../src/upgrader.js' import swarmKey from '../fixtures/swarm.key.js' import type { Connection, ConnectionProtector, Stream } from '@libp2p/interface-connection' @@ -890,4 +891,88 @@ describe('libp2p.upgrader', () => { await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected() .with.property('code', codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS) }) + + it('should allow overriding the number of outgoing streams that can be opened using a protocol without a handler', async () => { + const localDeferred = pDefer() + const remoteDeferred = pDefer() + const protocol = '/a-test-protocol/1.0.0' + const remotePeer = peers[1] + libp2p = await createLibp2p({ + peerId: peers[0], + transports: [ + webSockets() + ], + streamMuxers: [ + yamux() + ], + connectionEncryption: [ + plaintext() + ], + services: { + test: (components: any) => { + localDeferred.resolve(components) + } + }, + connectionGater: mockConnectionGater() + }) + + remoteLibp2p = await createLibp2p({ + peerId: remotePeer, + transports: [ + webSockets() + ], + streamMuxers: [ + yamux() + ], + connectionEncryption: [ + plaintext() + ], + services: { + test: (components: any) => { + remoteDeferred.resolve(components) + } + } + }) + + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const localComponents = await localDeferred.promise + const remoteComponents = await remoteDeferred.promise + + const [localToRemote] = await Promise.all([ + localComponents.upgrader.upgradeOutbound(outbound), + remoteComponents.upgrader.upgradeInbound(inbound) + ]) + + let streamCount = 0 + + const limit = DEFAULT_MAX_OUTBOUND_STREAMS + 1 + + await remoteLibp2p.handle(protocol, (data) => { + streamCount++ + }, { + maxInboundStreams: limit + 1, + maxOutboundStreams: 10 + }) + + expect(streamCount).to.equal(0) + + for (let i = 0; i < limit; i++) { + await localToRemote.newStream(protocol, { + maxOutboundStreams: limit + }) + } + + expect(streamCount).to.equal(limit) + + // should reject without overriding limit + await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected() + .with.property('code', codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS) + + // should reject even with overriding limit + await expect(localToRemote.newStream(protocol, { + maxOutboundStreams: limit + })).to.eventually.be.rejected() + .with.property('code', codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS) + }) })