Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow specifiying maxOutboundStreams in connection.newStream #1817

Merged
merged 1 commit into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
"@achingbrain/nat-port-mapper": "^1.0.9",
"@libp2p/crypto": "^1.0.17",
"@libp2p/interface-address-manager": "^3.0.0",
"@libp2p/interface-connection": "^5.0.0",
"@libp2p/interface-connection": "^5.1.1",
"@libp2p/interface-connection-encrypter": "^4.0.0",
"@libp2p/interface-connection-gater": "^3.0.0",
"@libp2p/interface-connection-manager": "^3.0.0",
Expand Down
17 changes: 16 additions & 1 deletion packages/libp2p/src/circuit-relay/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { RecordEnvelope } from '@libp2p/peer-record'
import { type Multiaddr, multiaddr } from '@multiformats/multiaddr'
import { pbStream, type ProtobufStream } from 'it-pb-stream'
import pDefer from 'p-defer'
import { MAX_CONNECTIONS } from '../../connection-manager/constants.js'
import {
CIRCUIT_PROTO_CODE,
DEFAULT_HOP_TIMEOUT,
Expand Down Expand Up @@ -59,6 +60,12 @@ export interface CircuitRelayServerInit {
* The maximum number of simultaneous HOP outbound streams that can be open at once
*/
maxOutboundHopStreams?: number

/**
* The maximum number of simultaneous STOP outbound streams that can be open at
* once. (default: 300)
*/
maxOutboundStopStreams?: number
}

export interface HopProtocolOptions {
Expand Down Expand Up @@ -87,6 +94,10 @@ export interface RelayServerEvents {
'relay:advert:error': CustomEvent<Error>
}

const defaults = {
maxOutboundStopStreams: MAX_CONNECTIONS
}

class CircuitRelayServer extends EventEmitter<RelayServerEvents> implements Startable, CircuitRelayService {
private readonly registrar: Registrar
private readonly peerStore: PeerStore
Expand All @@ -101,6 +112,7 @@ class CircuitRelayServer extends EventEmitter<RelayServerEvents> implements Star
private readonly shutdownController: AbortController
private readonly maxInboundHopStreams?: number
private readonly maxOutboundHopStreams?: number
private readonly maxOutboundStopStreams: number

/**
* Creates an instance of Relay
Expand All @@ -119,6 +131,7 @@ class CircuitRelayServer extends EventEmitter<RelayServerEvents> implements Star
this.shutdownController = new AbortController()
this.maxInboundHopStreams = init.maxInboundHopStreams
this.maxOutboundHopStreams = init.maxOutboundHopStreams
this.maxOutboundStopStreams = init.maxOutboundStopStreams ?? defaults.maxOutboundStopStreams

try {
// fails on node < 15.4
Expand Down Expand Up @@ -390,7 +403,9 @@ class CircuitRelayServer extends EventEmitter<RelayServerEvents> implements Star
request
}: StopOptions): Promise<Stream | undefined> {
log('starting circuit relay v2 stop request to %s', connection.remotePeer)
const stream = await connection.newStream([RELAY_V2_STOP_CODEC])
const stream = await connection.newStream([RELAY_V2_STOP_CODEC], {
maxOutboundStreams: this.maxOutboundStopStreams
})
const pbstr = pbStream(stream)
const stopstr = pbstr.pb(StopMessage)
stopstr.write(request)
Expand Down
9 changes: 5 additions & 4 deletions packages/libp2p/src/circuit-relay/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,15 @@ export interface CircuitRelayTransportInit extends RelayStoreInit {

/**
* The maximum number of simultaneous STOP outbound streams that can be open at
* once. STOP streams are opened by the relay server so this setting is
* effectively ignored. (default: 32)
* once. If this transport is used along with the relay server these settings
* should be set to the same value (default: 300)
*/
maxOutboundStopStreams?: number
}

const defaults = {
maxInboundStopStreams: MAX_CONNECTIONS
maxInboundStopStreams: MAX_CONNECTIONS,
maxOutboundStopStreams: MAX_CONNECTIONS
}

class CircuitRelayTransport implements Transport {
Expand All @@ -115,7 +116,7 @@ class CircuitRelayTransport implements Transport {
this.addressManager = components.addressManager
this.connectionGater = components.connectionGater
this.maxInboundStopStreams = init.maxInboundStopStreams ?? defaults.maxInboundStopStreams
this.maxOutboundStopStreams = init.maxOutboundStopStreams
this.maxOutboundStopStreams = init.maxOutboundStopStreams ?? defaults.maxOutboundStopStreams

if (init.discoverRelays != null && init.discoverRelays > 0) {
this.discovery = new RelayDiscovery(components)
Expand Down
17 changes: 10 additions & 7 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { createConnection } from './connection/index.js'
import { INBOUND_UPGRADE_TIMEOUT } from './connection-manager/constants.js'
import { codes } from './errors.js'
import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js'
import type { MultiaddrConnection, Connection, Stream, ConnectionProtector } from '@libp2p/interface-connection'
import type { MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions } from '@libp2p/interface-connection'
import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface-connection-encrypter'
import type { ConnectionGater } from '@libp2p/interface-connection-gater'
import type { ConnectionManager } from '@libp2p/interface-connection-manager'
Expand Down Expand Up @@ -70,17 +70,20 @@ function findIncomingStreamLimit (protocol: string, registrar: Registrar): numbe
return DEFAULT_MAX_INBOUND_STREAMS
}

function findOutgoingStreamLimit (protocol: string, registrar: Registrar): number | undefined {
function findOutgoingStreamLimit (protocol: string, registrar: Registrar, options: NewStreamOptions = {}): number {
try {
const { options } = registrar.getHandler(protocol)
return options.maxOutboundStreams

if (options.maxOutboundStreams != null) {
return options.maxOutboundStreams
}
} catch (err: any) {
if (err.code !== codes.ERR_NO_HANDLER_FOR_PROTOCOL) {
throw err
}
}

return DEFAULT_MAX_OUTBOUND_STREAMS
return options.maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS
}

function countStreams (protocol: string, direction: 'inbound' | 'outbound', connection: Connection): number {
Expand Down Expand Up @@ -428,7 +431,7 @@ export class DefaultUpgrader implements Upgrader {
}
})

newStream = async (protocols: string[], options: AbortOptions = {}): Promise<Stream> => {
newStream = async (protocols: string[], options: NewStreamOptions = {}): Promise<Stream> => {
if (muxer == null) {
throw new CodeError('Stream is not multiplexed', codes.ERR_MUXER_UNAVAILABLE)
}
Expand All @@ -450,10 +453,10 @@ export class DefaultUpgrader implements Upgrader {

const { stream, protocol } = await mss.select(muxedStream, protocols, options)

const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.registrar)
const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.registrar, options)
const streamCount = countStreams(protocol, 'outbound', connection)

if (streamCount === outgoingLimit) {
if (streamCount >= outgoingLimit) {
const err = new CodeError(`Too many outbound protocol streams for protocol "${protocol}" - limit ${outgoingLimit}`, codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS)
muxedStream.abort(err)

Expand Down
85 changes: 85 additions & 0 deletions packages/libp2p/test/upgrading/upgrader.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { codes } from '../../src/errors.js'
import { createLibp2p } from '../../src/index.js'
import { plaintext } from '../../src/insecure/index.js'
import { preSharedKey } from '../../src/pnet/index.js'
import { DEFAULT_MAX_OUTBOUND_STREAMS } from '../../src/registrar.js'
import { DefaultUpgrader } from '../../src/upgrader.js'
import swarmKey from '../fixtures/swarm.key.js'
import type { Connection, ConnectionProtector, Stream } from '@libp2p/interface-connection'
Expand Down Expand Up @@ -890,4 +891,88 @@ describe('libp2p.upgrader', () => {
await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected()
.with.property('code', codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS)
})

it('should allow overriding the number of outgoing streams that can be opened using a protocol without a handler', async () => {
const localDeferred = pDefer<Components>()
const remoteDeferred = pDefer<Components>()
const protocol = '/a-test-protocol/1.0.0'
const remotePeer = peers[1]
libp2p = await createLibp2p({
peerId: peers[0],
transports: [
webSockets()
],
streamMuxers: [
yamux()
],
connectionEncryption: [
plaintext()
],
services: {
test: (components: any) => {
localDeferred.resolve(components)
}
},
connectionGater: mockConnectionGater()
})

remoteLibp2p = await createLibp2p({
peerId: remotePeer,
transports: [
webSockets()
],
streamMuxers: [
yamux()
],
connectionEncryption: [
plaintext()
],
services: {
test: (components: any) => {
remoteDeferred.resolve(components)
}
}
})

const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })

const localComponents = await localDeferred.promise
const remoteComponents = await remoteDeferred.promise

const [localToRemote] = await Promise.all([
localComponents.upgrader.upgradeOutbound(outbound),
remoteComponents.upgrader.upgradeInbound(inbound)
])

let streamCount = 0

const limit = DEFAULT_MAX_OUTBOUND_STREAMS + 1

await remoteLibp2p.handle(protocol, (data) => {
streamCount++
}, {
maxInboundStreams: limit + 1,
maxOutboundStreams: 10
})

expect(streamCount).to.equal(0)

for (let i = 0; i < limit; i++) {
await localToRemote.newStream(protocol, {
maxOutboundStreams: limit
})
}

expect(streamCount).to.equal(limit)

// should reject without overriding limit
await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected()
.with.property('code', codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS)

// should reject even with overriding limit
await expect(localToRemote.newStream(protocol, {
maxOutboundStreams: limit
})).to.eventually.be.rejected()
.with.property('code', codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS)
})
})