Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: dial relay when we are dialed via it but have no reservation #2252

Merged
merged 2 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/transport-circuit-relay-v2/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"p-defer": "^4.0.0",
"p-retry": "^6.1.0",
"protons-runtime": "^5.0.0",
"race-signal": "^1.0.2",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.6"
},
Expand Down
341 changes: 6 additions & 335 deletions packages/transport-circuit-relay-v2/src/transport/index.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,16 @@
import { CodeError } from '@libp2p/interface/errors'
import { symbol, type Transport, type CreateListenerOptions, type Listener, type Upgrader } from '@libp2p/interface/transport'
import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id'
import { streamToMaConnection } from '@libp2p/utils/stream-to-ma-conn'
import * as mafmt from '@multiformats/mafmt'
import { multiaddr } from '@multiformats/multiaddr'
import { pbStream } from 'it-protobuf-stream'
import { CIRCUIT_PROTO_CODE, ERR_HOP_REQUEST_FAILED, ERR_RELAYED_DIAL, MAX_CONNECTIONS, RELAY_V2_HOP_CODEC, RELAY_V2_STOP_CODEC } from '../constants.js'
import { StopMessage, HopMessage, Status } from '../pb/index.js'
import { RelayDiscovery, type RelayDiscoveryComponents } from './discovery.js'
import { createListener } from './listener.js'
import { type RelayStoreInit, ReservationStore } from './reservation-store.js'
import type { Libp2pEvents, AbortOptions, ComponentLogger, Logger } from '@libp2p/interface'
import type { Connection, Stream } from '@libp2p/interface/connection'
import { type Transport, type Upgrader } from '@libp2p/interface/transport'
import { type RelayDiscoveryComponents } from './discovery.js'
import { type RelayStoreInit } from './reservation-store.js'
import { CircuitRelayTransport } from './transport.js'
import type { Libp2pEvents, ComponentLogger } from '@libp2p/interface'
import type { ConnectionGater } from '@libp2p/interface/connection-gater'
import type { ContentRouting } from '@libp2p/interface/content-routing'
import type { TypedEventTarget } from '@libp2p/interface/events'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { PeerStore } from '@libp2p/interface/peer-store'
import type { AddressManager } from '@libp2p/interface-internal/address-manager'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar'
import type { Multiaddr } from '@multiformats/multiaddr'

const isValidStop = (request: StopMessage): request is Required<StopMessage> => {
if (request.peer == null) {
return false
}

try {
request.peer.addrs.forEach(multiaddr)
} catch {
return false
}

return true
}
import type { Registrar } from '@libp2p/interface-internal/registrar'

export interface CircuitRelayTransportComponents extends RelayDiscoveryComponents {
peerId: PeerId
Expand All @@ -49,16 +25,6 @@ export interface CircuitRelayTransportComponents extends RelayDiscoveryComponent
logger: ComponentLogger
}

interface ConnectOptions {
stream: Stream
connection: Connection
destinationPeer: PeerId
destinationAddr: Multiaddr
relayAddr: Multiaddr
ma: Multiaddr
disconnectOnFailure: boolean
}

/**
* RelayConfig configures the circuit v2 relay transport.
*/
Expand Down Expand Up @@ -96,301 +62,6 @@ export interface CircuitRelayTransportInit extends RelayStoreInit {
reservationCompletionTimeout?: number
}

const defaults = {
maxInboundStopStreams: MAX_CONNECTIONS,
maxOutboundStopStreams: MAX_CONNECTIONS,
stopTimeout: 30000
}

class CircuitRelayTransport implements Transport {
private readonly discovery?: RelayDiscovery
private readonly registrar: Registrar
private readonly peerStore: PeerStore
private readonly connectionManager: ConnectionManager
private readonly peerId: PeerId
private readonly upgrader: Upgrader
private readonly addressManager: AddressManager
private readonly connectionGater: ConnectionGater
private readonly reservationStore: ReservationStore
private readonly logger: ComponentLogger
private readonly maxInboundStopStreams: number
private readonly maxOutboundStopStreams?: number
private readonly stopTimeout: number
private started: boolean
private readonly log: Logger

constructor (components: CircuitRelayTransportComponents, init: CircuitRelayTransportInit) {
this.log = components.logger.forComponent('libp2p:circuit-relay:transport')
this.registrar = components.registrar
this.peerStore = components.peerStore
this.connectionManager = components.connectionManager
this.logger = components.logger
this.peerId = components.peerId
this.upgrader = components.upgrader
this.addressManager = components.addressManager
this.connectionGater = components.connectionGater
this.maxInboundStopStreams = init.maxInboundStopStreams ?? defaults.maxInboundStopStreams
this.maxOutboundStopStreams = init.maxOutboundStopStreams ?? defaults.maxOutboundStopStreams
this.stopTimeout = init.stopTimeout ?? defaults.stopTimeout

if (init.discoverRelays != null && init.discoverRelays > 0) {
this.discovery = new RelayDiscovery(components)
this.discovery.addEventListener('relay:discover', (evt) => {
this.reservationStore.addRelay(evt.detail, 'discovered')
.catch(err => {
this.log.error('could not add discovered relay %p', evt.detail, err)
})
})
}

this.reservationStore = new ReservationStore(components, init)
this.reservationStore.addEventListener('relay:not-enough-relays', () => {
this.discovery?.discover()
.catch(err => {
this.log.error('could not discover relays', err)
})
})

this.started = false
}

isStarted (): boolean {
return this.started
}

async start (): Promise<void> {
await this.reservationStore.start()
await this.discovery?.start()

await this.registrar.handle(RELAY_V2_STOP_CODEC, (data) => {
void this.onStop(data).catch(err => {
this.log.error('error while handling STOP protocol', err)
data.stream.abort(err)
})
}, {
maxInboundStreams: this.maxInboundStopStreams,
maxOutboundStreams: this.maxOutboundStopStreams,
runOnTransientConnection: true
})

this.started = true
}

async stop (): Promise<void> {
this.discovery?.stop()
await this.reservationStore.stop()
await this.registrar.unhandle(RELAY_V2_STOP_CODEC)

this.started = false
}

readonly [symbol] = true

readonly [Symbol.toStringTag] = 'libp2p/circuit-relay-v2'

/**
* Dial a peer over a relay
*/
async dial (ma: Multiaddr, options: AbortOptions = {}): Promise<Connection> {
if (ma.protoCodes().filter(code => code === CIRCUIT_PROTO_CODE).length !== 1) {
const errMsg = 'Invalid circuit relay address'
this.log.error(errMsg, ma)
throw new CodeError(errMsg, ERR_RELAYED_DIAL)
}

// Check the multiaddr to see if it contains a relay and a destination peer
const addrs = ma.toString().split('/p2p-circuit')
const relayAddr = multiaddr(addrs[0])
const destinationAddr = multiaddr(addrs[addrs.length - 1])
const relayId = relayAddr.getPeerId()
const destinationId = destinationAddr.getPeerId()

if (relayId == null || destinationId == null) {
const errMsg = `Circuit relay dial to ${ma.toString()} failed as address did not have peer ids`
this.log.error(errMsg)
throw new CodeError(errMsg, ERR_RELAYED_DIAL)
}

const relayPeer = peerIdFromString(relayId)
const destinationPeer = peerIdFromString(destinationId)

let disconnectOnFailure = false
const relayConnections = this.connectionManager.getConnections(relayPeer)
let relayConnection = relayConnections[0]

if (relayConnection == null) {
await this.peerStore.merge(relayPeer, {
multiaddrs: [relayAddr]
})
relayConnection = await this.connectionManager.openConnection(relayPeer, options)
disconnectOnFailure = true
}

let stream: Stream | undefined

try {
stream = await relayConnection.newStream(RELAY_V2_HOP_CODEC)

return await this.connectV2({
stream,
connection: relayConnection,
destinationPeer,
destinationAddr,
relayAddr,
ma,
disconnectOnFailure
})
} catch (err: any) {
this.log.error('circuit relay dial to destination %p via relay %p failed', destinationPeer, relayPeer, err)

if (stream != null) {
stream.abort(err)
}
disconnectOnFailure && await relayConnection.close()
throw err
}
}

async connectV2 (
{
stream, connection, destinationPeer,
destinationAddr, relayAddr, ma,
disconnectOnFailure
}: ConnectOptions
): Promise<Connection> {
try {
const pbstr = pbStream(stream)
const hopstr = pbstr.pb(HopMessage)
await hopstr.write({
type: HopMessage.Type.CONNECT,
peer: {
id: destinationPeer.toBytes(),
addrs: [multiaddr(destinationAddr).bytes]
}
})

const status = await hopstr.read()

if (status.status !== Status.OK) {
throw new CodeError(`failed to connect via relay with status ${status?.status?.toString() ?? 'undefined'}`, ERR_HOP_REQUEST_FAILED)
}

const maConn = streamToMaConnection({
stream: pbstr.unwrap(),
remoteAddr: ma,
localAddr: relayAddr.encapsulate(`/p2p-circuit/p2p/${this.peerId.toString()}`),
logger: this.logger
})

this.log('new outbound transient connection %a', maConn.remoteAddr)
return await this.upgrader.upgradeOutbound(maConn, {
transient: true
})
} catch (err: any) {
this.log.error(`Circuit relay dial to destination ${destinationPeer.toString()} via relay ${connection.remotePeer.toString()} failed`, err)
disconnectOnFailure && await connection.close()
throw err
}
}

/**
* Create a listener
*/
createListener (options: CreateListenerOptions): Listener {
return createListener({
connectionManager: this.connectionManager,
relayStore: this.reservationStore,
logger: this.logger
})
}

/**
* Filter check for all Multiaddrs that this transport can dial on
*
* @param {Multiaddr[]} multiaddrs
* @returns {Multiaddr[]}
*/
filter (multiaddrs: Multiaddr[]): Multiaddr[] {
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]

return multiaddrs.filter((ma) => {
return mafmt.Circuit.matches(ma)
})
}

/**
* An incoming STOP request means a remote peer wants to dial us via a relay
*/
async onStop ({ connection, stream }: IncomingStreamData): Promise<void> {
const signal = AbortSignal.timeout(this.stopTimeout)
const pbstr = pbStream(stream).pb(StopMessage)
const request = await pbstr.read({
signal
})

this.log('new circuit relay v2 stop stream from %p with type %s', connection.remotePeer, request.type)

if (request?.type === undefined) {
this.log.error('type was missing from circuit v2 stop protocol request from %s', connection.remotePeer)
await pbstr.write({ type: StopMessage.Type.STATUS, status: Status.MALFORMED_MESSAGE }, {
signal
})
await stream.close()
return
}

// Validate the STOP request has the required input
if (request.type !== StopMessage.Type.CONNECT) {
this.log.error('invalid stop connect request via peer %p', connection.remotePeer)
await pbstr.write({ type: StopMessage.Type.STATUS, status: Status.UNEXPECTED_MESSAGE }, {
signal
})
await stream.close()
return
}

if (!isValidStop(request)) {
this.log.error('invalid stop connect request via peer %p', connection.remotePeer)
await pbstr.write({ type: StopMessage.Type.STATUS, status: Status.MALFORMED_MESSAGE }, {
signal
})
await stream.close()
return
}

const remotePeerId = peerIdFromBytes(request.peer.id)

if ((await this.connectionGater.denyInboundRelayedConnection?.(connection.remotePeer, remotePeerId)) === true) {
this.log.error('connection gater denied inbound relayed connection from %p', connection.remotePeer)
await pbstr.write({ type: StopMessage.Type.STATUS, status: Status.PERMISSION_DENIED }, {
signal
})
await stream.close()
return
}

this.log.trace('sending success response to %p', connection.remotePeer)
await pbstr.write({ type: StopMessage.Type.STATUS, status: Status.OK }, {
signal
})

const remoteAddr = connection.remoteAddr.encapsulate(`/p2p-circuit/p2p/${remotePeerId.toString()}`)
const localAddr = this.addressManager.getAddresses()[0]
const maConn = streamToMaConnection({
stream: pbstr.unwrap().unwrap(),
remoteAddr,
localAddr,
logger: this.logger
})

this.log('new inbound transient connection %a', maConn.remoteAddr)
await this.upgrader.upgradeInbound(maConn, {
transient: true
})
this.log('%s connection %a upgraded', 'inbound', maConn.remoteAddr)
}
}

export function circuitRelayTransport (init: CircuitRelayTransportInit = {}): (components: CircuitRelayTransportComponents) => Transport {
return (components) => {
return new CircuitRelayTransport(components, init)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class CircuitRelayTransportListener extends TypedEventEmitter<ListenerEvents> im
const relayConn = await this.connectionManager.openConnection(relayAddr)

if (!this.relayStore.hasReservation(relayConn.remotePeer)) {
this.log('making reservation on peer %p', relayConn.remotePeer)
// addRelay calls transportManager.listen which calls this listen method
await this.relayStore.addRelay(relayConn.remotePeer, 'configured')
return
Expand Down
Loading
Loading