Skip to content

Commit

Permalink
feat: mark connections with limits as transient (#1890)
Browse files Browse the repository at this point in the history
Some connections have resources limits imposed on them, such as circuit relay connections.  If these limits are breached the connection will be closed by the remote.

When this is the case, the connection will have a `.transient` boolean property set to true.

By default any attempt to run a protocol over a transient connection will throw (outgoing) or be reset (incoming), this is to prevent, for example, bitswap exceeding the connection transfer limit and causing the connection to be closed by the relay server when it should be reserved for the WebRTC SDP exchange to allow incoming dials.

Protocols can opt-in to being run over transient connections by specifying a `runOnTransientConnection` flag during `libp2p.handle` (incoming) and `connection.openStream`/`libp2p.dialProtocol` (outgoing).

Closes #1611
  • Loading branch information
achingbrain authored Jul 26, 2023
1 parent 7debe03 commit a1ec46b
Show file tree
Hide file tree
Showing 21 changed files with 227 additions and 57 deletions.
2 changes: 2 additions & 0 deletions packages/interface-compliance-tests/src/mocks/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class MockConnection implements Connection {
public status: ConnectionStatus
public streams: Stream[]
public tags: string[]
public transient: boolean

private readonly muxer: StreamMuxer
private readonly maConn: MultiaddrConnection
Expand All @@ -63,6 +64,7 @@ class MockConnection implements Connection {
this.tags = []
this.muxer = muxer
this.maConn = maConn
this.transient = false
}

async newStream (protocols: string | string[], options?: AbortOptions): Promise<Stream> {
Expand Down
2 changes: 1 addition & 1 deletion packages/interface-compliance-tests/src/mocks/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { mockConnection } from './connection.js'
import type { Libp2pEvents } from '@libp2p/interface'
import type { Connection, MultiaddrConnection } from '@libp2p/interface/connection'
import type { EventEmitter } from '@libp2p/interface/events'
import type { Upgrader, UpgraderOptions } from '@libp2p/interface/transport'
import type { Registrar } from '@libp2p/interface-internal/registrar'
import type { Upgrader, UpgraderOptions } from '@libp2p/interface-internal/upgrader'

export interface MockUpgraderInit {
registrar?: Registrar
Expand Down
7 changes: 7 additions & 0 deletions packages/interface-internal/src/registrar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ export interface StreamHandlerOptions {
* How many outgoing streams can be open for this protocol at the same time on each connection (default: 64)
*/
maxOutboundStreams?: number

/**
* If true, allow this protocol to run on limited connections (e.g.
* connections with data or duration limits such as circuit relay
* connections) (default: false)
*/
runOnTransientConnection?: boolean
}

export interface StreamHandlerRecord {
Expand Down
20 changes: 0 additions & 20 deletions packages/interface-internal/src/upgrader/index.ts

This file was deleted.

14 changes: 14 additions & 0 deletions packages/interface/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ export interface NewStreamOptions extends AbortOptions {
* for the protocol
*/
maxOutboundStreams?: number

/**
* Opt-in to running over a transient connection - one that has time/data limits
* placed on it.
*/
runOnTransientConnection?: boolean
}

export type ConnectionStatus = 'open' | 'closing' | 'closed'
Expand Down Expand Up @@ -239,6 +245,14 @@ export interface Connection {
*/
status: ConnectionStatus

/**
* A transient connection is one that is not expected to be open for very long
* or one that cannot transfer very much data, such as one being used as a
* circuit relay connection. Protocols need to explicitly opt-in to being run
* over transient connections.
*/
transient: boolean

/**
* Create a new stream on this connection and negotiate one of the passed protocols
*/
Expand Down
4 changes: 2 additions & 2 deletions packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* ```
*/

import type { Connection, Stream } from './connection/index.js'
import type { Connection, NewStreamOptions, Stream } from './connection/index.js'
import type { ContentRouting } from './content-routing/index.js'
import type { EventEmitter } from './events.js'
import type { KeyChain } from './keychain/index.js'
Expand Down Expand Up @@ -503,7 +503,7 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ev
* pipe([1, 2, 3], stream, consume)
* ```
*/
dialProtocol: (peer: PeerId | Multiaddr | Multiaddr[], protocols: string | string[], options?: AbortOptions) => Promise<Stream>
dialProtocol: (peer: PeerId | Multiaddr | Multiaddr[], protocols: string | string[], options?: NewStreamOptions) => Promise<Stream>

/**
* Attempts to gracefully close an open connection to the given peer. If the
Expand Down
6 changes: 6 additions & 0 deletions packages/interface/src/stream-handler/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ export interface StreamHandlerOptions {
* How many outgoing streams can be open for this protocol at the same time on each connection (default: 64)
*/
maxOutboundStreams?: number

/**
* Opt-in to running over a transient connection - one that has time/data limits
* placed on it.
*/
runOnTransientConnection?: boolean
}

export interface StreamHandlerRecord {
Expand Down
6 changes: 6 additions & 0 deletions packages/interface/src/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ export interface UpgraderOptions {
skipEncryption?: boolean
skipProtection?: boolean
muxerFactory?: StreamMuxerFactory

/**
* The passed MultiaddrConnection has limits place on duration and/or data
* transfer amounts so is not expected to be open for very long.
*/
transient?: boolean
}

export interface Upgrader {
Expand Down
6 changes: 5 additions & 1 deletion packages/libp2p-daemon-server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ export class Server implements Libp2pServer {
const { peer, proto } = request.streamOpen
const peerId = peerIdFromBytes(peer)
const connection = await this.libp2p.dial(peerId)
const stream = await connection.newStream(proto)
const stream = await connection.newStream(proto, {
runOnTransientConnection: true
})

return {
streamInfo: {
Expand Down Expand Up @@ -178,6 +180,8 @@ export class Server implements Libp2pServer {
})
}
})
}, {
runOnTransientConnection: true
})
}

Expand Down
11 changes: 7 additions & 4 deletions packages/libp2p/src/circuit-relay/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import { MAX_CONNECTIONS } from '../../connection-manager/constants.js'
import {
CIRCUIT_PROTO_CODE,
DEFAULT_HOP_TIMEOUT,
RELAY_SOURCE_TAG
, RELAY_V2_HOP_CODEC, RELAY_V2_STOP_CODEC
RELAY_SOURCE_TAG,
RELAY_V2_HOP_CODEC,
RELAY_V2_STOP_CODEC
} from '../constants.js'
import { HopMessage, type Reservation, Status, StopMessage } from '../pb/index.js'
import { createLimitedRelay } from '../utils.js'
Expand Down Expand Up @@ -172,7 +173,8 @@ class CircuitRelayServer extends EventEmitter<RelayServerEvents> implements Star
})
}, {
maxInboundStreams: this.maxInboundHopStreams,
maxOutboundStreams: this.maxOutboundHopStreams
maxOutboundStreams: this.maxOutboundHopStreams,
runOnTransientConnection: true
})

this.reservationStore.start()
Expand Down Expand Up @@ -404,7 +406,8 @@ class CircuitRelayServer extends EventEmitter<RelayServerEvents> implements Star
}: StopOptions): Promise<Stream | undefined> {
log('starting circuit relay v2 stop request to %s', connection.remotePeer)
const stream = await connection.newStream([RELAY_V2_STOP_CODEC], {
maxOutboundStreams: this.maxOutboundStopStreams
maxOutboundStreams: this.maxOutboundStopStreams,
runOnTransientConnection: true
})
const pbstr = pbStream(stream)
const stopstr = pbstr.pb(StopMessage)
Expand Down
17 changes: 10 additions & 7 deletions packages/libp2p/src/circuit-relay/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ class CircuitRelayTransport implements Transport {
})
}, {
maxInboundStreams: this.maxInboundStopStreams,
maxOutboundStreams: this.maxOutboundStopStreams
maxOutboundStreams: this.maxOutboundStopStreams,
runOnTransientConnection: true
})

this.started = true
Expand Down Expand Up @@ -275,16 +276,16 @@ class CircuitRelayTransport implements Transport {
throw new CodeError(`failed to connect via relay with status ${status?.status?.toString() ?? 'undefined'}`, codes.ERR_HOP_REQUEST_FAILED)
}

// TODO: do something with limit and transient connection

const maConn = streamToMaConnection({
stream: pbstr.unwrap(),
remoteAddr: ma,
localAddr: relayAddr.encapsulate(`/p2p-circuit/p2p/${this.peerId.toString()}`)
})

log('new outbound connection %a', maConn.remoteAddr)
return await this.upgrader.upgradeOutbound(maConn)
log('new outbound transient connection %a', maConn.remoteAddr)
return await this.upgrader.upgradeOutbound(maConn, {
transient: true
})
} catch (err) {
log.error(`Circuit relay dial to destination ${destinationPeer.toString()} via relay ${connection.remotePeer.toString()} failed`, err)
disconnectOnFailure && await connection.close()
Expand Down Expand Up @@ -380,8 +381,10 @@ class CircuitRelayTransport implements Transport {
localAddr
})

log('new inbound connection %s', maConn.remoteAddr)
await this.upgrader.upgradeInbound(maConn)
log('new inbound transient connection %a', maConn.remoteAddr)
await this.upgrader.upgradeInbound(maConn, {
transient: true
})
log('%s connection %a upgraded', 'inbound', maConn.remoteAddr)
}
}
Expand Down
14 changes: 11 additions & 3 deletions packages/libp2p/src/connection/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { setMaxListeners } from 'events'
import { type Direction, symbol, type Connection, type Stream, type ConnectionTimeline, type ConnectionStatus } from '@libp2p/interface/connection'
import { symbol } from '@libp2p/interface/connection'
import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import type { AbortOptions } from '@libp2p/interface'
import type { Direction, Connection, Stream, ConnectionTimeline, ConnectionStatus, NewStreamOptions } from '@libp2p/interface/connection'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { Multiaddr } from '@multiformats/multiaddr'

Expand All @@ -22,6 +23,7 @@ interface ConnectionInit {
timeline: ConnectionTimeline
multiplexer?: string
encryption?: string
transient?: boolean
}

/**
Expand Down Expand Up @@ -49,6 +51,7 @@ export class ConnectionImpl implements Connection {
public multiplexer?: string
public encryption?: string
public status: ConnectionStatus
public transient: boolean

/**
* User provided tags
Expand All @@ -59,7 +62,7 @@ export class ConnectionImpl implements Connection {
/**
* Reference to the new stream function of the multiplexer
*/
private readonly _newStream: (protocols: string[], options?: AbortOptions) => Promise<Stream>
private readonly _newStream: (protocols: string[], options?: NewStreamOptions) => Promise<Stream>

/**
* Reference to the close function of the raw connection
Expand Down Expand Up @@ -88,6 +91,7 @@ export class ConnectionImpl implements Connection {
this.timeline = init.timeline
this.multiplexer = init.multiplexer
this.encryption = init.encryption
this.transient = init.transient ?? false

this._newStream = newStream
this._close = close
Expand All @@ -110,7 +114,7 @@ export class ConnectionImpl implements Connection {
/**
* Create a new stream from this connection
*/
async newStream (protocols: string | string[], options?: AbortOptions): Promise<Stream> {
async newStream (protocols: string | string[], options?: NewStreamOptions): Promise<Stream> {
if (this.status === 'closing') {
throw new CodeError('the connection is being closed', 'ERR_CONNECTION_BEING_CLOSED')
}
Expand All @@ -123,6 +127,10 @@ export class ConnectionImpl implements Connection {
protocols = [protocols]
}

if (this.transient && options?.runOnTransientConnection !== true) {
throw new CodeError('Cannot open protocol stream on transient connection', 'ERR_TRANSIENT_CONNECTION')
}

const stream = await this._newStream(protocols, options)

stream.direction = 'outbound'
Expand Down
19 changes: 14 additions & 5 deletions packages/libp2p/src/identify/identify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ const defaultValues = {
maxPushOutgoingStreams: 1,
maxObservedAddresses: 10,
maxIdentifyMessageSize: 8192,
runOnConnectionOpen: true
runOnConnectionOpen: true,
runOnTransientConnection: true
}

export class DefaultIdentifyService implements Startable, IdentifyService {
Expand All @@ -70,6 +71,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
private readonly maxIdentifyMessageSize: number
private readonly maxObservedAddresses: number
private readonly events: EventEmitter<Libp2pEvents>
private readonly runOnTransientConnection: boolean

constructor (components: IdentifyServiceComponents, init: IdentifyServiceInit) {
this.started = false
Expand All @@ -89,6 +91,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
this.maxPushOutgoingStreams = init.maxPushOutgoingStreams ?? defaultValues.maxPushOutgoingStreams
this.maxIdentifyMessageSize = init.maxIdentifyMessageSize ?? defaultValues.maxIdentifyMessageSize
this.maxObservedAddresses = init.maxObservedAddresses ?? defaultValues.maxObservedAddresses
this.runOnTransientConnection = init.runOnTransientConnection ?? defaultValues.runOnTransientConnection

// Store self host metadata
this.host = {
Expand Down Expand Up @@ -141,15 +144,17 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
})
}, {
maxInboundStreams: this.maxInboundStreams,
maxOutboundStreams: this.maxOutboundStreams
maxOutboundStreams: this.maxOutboundStreams,
runOnTransientConnection: this.runOnTransientConnection
})
await this.registrar.handle(this.identifyPushProtocolStr, (data) => {
void this._handlePush(data).catch(err => {
log.error(err)
})
}, {
maxInboundStreams: this.maxPushIncomingStreams,
maxOutboundStreams: this.maxPushOutgoingStreams
maxOutboundStreams: this.maxPushOutgoingStreams,
runOnTransientConnection: this.runOnTransientConnection
})

this.started = true
Expand Down Expand Up @@ -189,7 +194,8 @@ export class DefaultIdentifyService implements Startable, IdentifyService {

try {
stream = await connection.newStream([this.identifyPushProtocolStr], {
signal
signal,
runOnTransientConnection: this.runOnTransientConnection
})

const pb = pbStream(stream, {
Expand Down Expand Up @@ -257,7 +263,10 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
options.signal = options.signal ?? AbortSignal.timeout(this.timeout)

try {
stream = await connection.newStream([this.identifyProtocolStr], options)
stream = await connection.newStream([this.identifyProtocolStr], {
...options,
runOnTransientConnection: this.runOnTransientConnection
})

const pb = pbStream(stream, {
maxDataLength: this.maxIdentifyMessageSize ?? MAX_IDENTIFY_MESSAGE_SIZE
Expand Down
5 changes: 5 additions & 0 deletions packages/libp2p/src/identify/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ export interface IdentifyServiceInit {
* Whether to automatically dial identify on newly opened connections (default: true)
*/
runOnConnectionOpen?: boolean

/**
* Whether to run on connections with data or duration limits (default: true)
*/
runOnTransientConnection?: boolean
}

export interface IdentifyServiceComponents {
Expand Down
4 changes: 2 additions & 2 deletions packages/libp2p/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { DefaultUpgrader } from './upgrader.js'
import type { Components } from './components.js'
import type { Libp2p, Libp2pInit, Libp2pOptions } from './index.js'
import type { Libp2pEvents, PendingDial, ServiceMap, AbortOptions } from '@libp2p/interface'
import type { Connection, Stream } from '@libp2p/interface/connection'
import type { Connection, NewStreamOptions, Stream } from '@libp2p/interface/connection'
import type { KeyChain } from '@libp2p/interface/keychain'
import type { Metrics } from '@libp2p/interface/metrics'
import type { PeerId } from '@libp2p/interface/peer-id'
Expand Down Expand Up @@ -283,7 +283,7 @@ export class Libp2pNode<T extends ServiceMap = Record<string, unknown>> extends
return this.components.connectionManager.openConnection(peer, options)
}

async dialProtocol (peer: PeerId | Multiaddr | Multiaddr[], protocols: string | string[], options: AbortOptions = {}): Promise<Stream> {
async dialProtocol (peer: PeerId | Multiaddr | Multiaddr[], protocols: string | string[], options: NewStreamOptions = {}): Promise<Stream> {
if (protocols == null) {
throw new CodeError('no protocols were provided to open a stream', codes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
}
Expand Down
Loading

0 comments on commit a1ec46b

Please sign in to comment.