diff --git a/packages/interface-compliance-tests/src/mocks/connection.ts b/packages/interface-compliance-tests/src/mocks/connection.ts index cbd99530da..8c88ba057b 100644 --- a/packages/interface-compliance-tests/src/mocks/connection.ts +++ b/packages/interface-compliance-tests/src/mocks/connection.ts @@ -43,6 +43,7 @@ class MockConnection implements Connection { public status: ConnectionStatus public streams: Stream[] public tags: string[] + public transient: boolean private readonly muxer: StreamMuxer private readonly maConn: MultiaddrConnection @@ -63,6 +64,7 @@ class MockConnection implements Connection { this.tags = [] this.muxer = muxer this.maConn = maConn + this.transient = false } async newStream (protocols: string | string[], options?: AbortOptions): Promise { diff --git a/packages/interface-compliance-tests/src/mocks/upgrader.ts b/packages/interface-compliance-tests/src/mocks/upgrader.ts index 816b0532bb..b4ef88002d 100644 --- a/packages/interface-compliance-tests/src/mocks/upgrader.ts +++ b/packages/interface-compliance-tests/src/mocks/upgrader.ts @@ -2,8 +2,8 @@ import { mockConnection } from './connection.js' import type { Libp2pEvents } from '@libp2p/interface' import type { Connection, MultiaddrConnection } from '@libp2p/interface/connection' import type { EventEmitter } from '@libp2p/interface/events' +import type { Upgrader, UpgraderOptions } from '@libp2p/interface/transport' import type { Registrar } from '@libp2p/interface-internal/registrar' -import type { Upgrader, UpgraderOptions } from '@libp2p/interface-internal/upgrader' export interface MockUpgraderInit { registrar?: Registrar diff --git a/packages/interface-internal/src/registrar/index.ts b/packages/interface-internal/src/registrar/index.ts index 2c3712d04e..a833cf7940 100644 --- a/packages/interface-internal/src/registrar/index.ts +++ b/packages/interface-internal/src/registrar/index.ts @@ -20,6 +20,13 @@ export interface StreamHandlerOptions { * How many outgoing streams can be open for this protocol at the same time on each connection (default: 64) */ maxOutboundStreams?: number + + /** + * If true, allow this protocol to run on limited connections (e.g. + * connections with data or duration limits such as circuit relay + * connections) (default: false) + */ + runOnTransientConnection?: boolean } export interface StreamHandlerRecord { diff --git a/packages/interface-internal/src/upgrader/index.ts b/packages/interface-internal/src/upgrader/index.ts deleted file mode 100644 index 3f7c49436f..0000000000 --- a/packages/interface-internal/src/upgrader/index.ts +++ /dev/null @@ -1,20 +0,0 @@ -import type { Connection, MultiaddrConnection } from '@libp2p/interface/connection' -import type { StreamMuxerFactory } from '@libp2p/interface/stream-muxer' - -export interface UpgraderOptions { - skipEncryption?: boolean - skipProtection?: boolean - muxerFactory?: StreamMuxerFactory -} - -export interface Upgrader { - /** - * Upgrades an outbound connection on `transport.dial`. - */ - upgradeOutbound: (maConn: MultiaddrConnection, opts?: UpgraderOptions) => Promise - - /** - * Upgrades an inbound connection on transport listener. - */ - upgradeInbound: (maConn: MultiaddrConnection, opts?: UpgraderOptions) => Promise -} diff --git a/packages/interface/src/connection/index.ts b/packages/interface/src/connection/index.ts index 07ffa2b7ac..6a2100f75d 100644 --- a/packages/interface/src/connection/index.ts +++ b/packages/interface/src/connection/index.ts @@ -178,6 +178,12 @@ export interface NewStreamOptions extends AbortOptions { * for the protocol */ maxOutboundStreams?: number + + /** + * Opt-in to running over a transient connection - one that has time/data limits + * placed on it. + */ + runOnTransientConnection?: boolean } export type ConnectionStatus = 'open' | 'closing' | 'closed' @@ -239,6 +245,14 @@ export interface Connection { */ status: ConnectionStatus + /** + * A transient connection is one that is not expected to be open for very long + * or one that cannot transfer very much data, such as one being used as a + * circuit relay connection. Protocols need to explicitly opt-in to being run + * over transient connections. + */ + transient: boolean + /** * Create a new stream on this connection and negotiate one of the passed protocols */ diff --git a/packages/interface/src/index.ts b/packages/interface/src/index.ts index 91860040b0..198493a297 100644 --- a/packages/interface/src/index.ts +++ b/packages/interface/src/index.ts @@ -14,7 +14,7 @@ * ``` */ -import type { Connection, Stream } from './connection/index.js' +import type { Connection, NewStreamOptions, Stream } from './connection/index.js' import type { ContentRouting } from './content-routing/index.js' import type { EventEmitter } from './events.js' import type { KeyChain } from './keychain/index.js' @@ -503,7 +503,7 @@ export interface Libp2p extends Startable, Ev * pipe([1, 2, 3], stream, consume) * ``` */ - dialProtocol: (peer: PeerId | Multiaddr | Multiaddr[], protocols: string | string[], options?: AbortOptions) => Promise + dialProtocol: (peer: PeerId | Multiaddr | Multiaddr[], protocols: string | string[], options?: NewStreamOptions) => Promise /** * Attempts to gracefully close an open connection to the given peer. If the diff --git a/packages/interface/src/stream-handler/index.ts b/packages/interface/src/stream-handler/index.ts index f5ab77038a..e4ca91c6b3 100644 --- a/packages/interface/src/stream-handler/index.ts +++ b/packages/interface/src/stream-handler/index.ts @@ -19,6 +19,12 @@ export interface StreamHandlerOptions { * How many outgoing streams can be open for this protocol at the same time on each connection (default: 64) */ maxOutboundStreams?: number + + /** + * Opt-in to running over a transient connection - one that has time/data limits + * placed on it. + */ + runOnTransientConnection?: boolean } export interface StreamHandlerRecord { diff --git a/packages/interface/src/transport/index.ts b/packages/interface/src/transport/index.ts index 5b84a60e1d..203b1bcf5a 100644 --- a/packages/interface/src/transport/index.ts +++ b/packages/interface/src/transport/index.ts @@ -96,6 +96,12 @@ export interface UpgraderOptions { skipEncryption?: boolean skipProtection?: boolean muxerFactory?: StreamMuxerFactory + + /** + * The passed MultiaddrConnection has limits place on duration and/or data + * transfer amounts so is not expected to be open for very long. + */ + transient?: boolean } export interface Upgrader { diff --git a/packages/libp2p-daemon-server/src/index.ts b/packages/libp2p-daemon-server/src/index.ts index c9599172e2..acc9d26067 100644 --- a/packages/libp2p-daemon-server/src/index.ts +++ b/packages/libp2p-daemon-server/src/index.ts @@ -103,7 +103,9 @@ export class Server implements Libp2pServer { const { peer, proto } = request.streamOpen const peerId = peerIdFromBytes(peer) const connection = await this.libp2p.dial(peerId) - const stream = await connection.newStream(proto) + const stream = await connection.newStream(proto, { + runOnTransientConnection: true + }) return { streamInfo: { @@ -178,6 +180,8 @@ export class Server implements Libp2pServer { }) } }) + }, { + runOnTransientConnection: true }) } diff --git a/packages/libp2p/src/circuit-relay/server/index.ts b/packages/libp2p/src/circuit-relay/server/index.ts index 67f74b74f8..42f931ca16 100644 --- a/packages/libp2p/src/circuit-relay/server/index.ts +++ b/packages/libp2p/src/circuit-relay/server/index.ts @@ -10,8 +10,9 @@ import { MAX_CONNECTIONS } from '../../connection-manager/constants.js' import { CIRCUIT_PROTO_CODE, DEFAULT_HOP_TIMEOUT, - RELAY_SOURCE_TAG - , RELAY_V2_HOP_CODEC, RELAY_V2_STOP_CODEC + RELAY_SOURCE_TAG, + RELAY_V2_HOP_CODEC, + RELAY_V2_STOP_CODEC } from '../constants.js' import { HopMessage, type Reservation, Status, StopMessage } from '../pb/index.js' import { createLimitedRelay } from '../utils.js' @@ -172,7 +173,8 @@ class CircuitRelayServer extends EventEmitter implements Star }) }, { maxInboundStreams: this.maxInboundHopStreams, - maxOutboundStreams: this.maxOutboundHopStreams + maxOutboundStreams: this.maxOutboundHopStreams, + runOnTransientConnection: true }) this.reservationStore.start() @@ -404,7 +406,8 @@ class CircuitRelayServer extends EventEmitter implements Star }: StopOptions): Promise { log('starting circuit relay v2 stop request to %s', connection.remotePeer) const stream = await connection.newStream([RELAY_V2_STOP_CODEC], { - maxOutboundStreams: this.maxOutboundStopStreams + maxOutboundStreams: this.maxOutboundStopStreams, + runOnTransientConnection: true }) const pbstr = pbStream(stream) const stopstr = pbstr.pb(StopMessage) diff --git a/packages/libp2p/src/circuit-relay/transport/index.ts b/packages/libp2p/src/circuit-relay/transport/index.ts index 42f504cfc1..139fb69d7c 100644 --- a/packages/libp2p/src/circuit-relay/transport/index.ts +++ b/packages/libp2p/src/circuit-relay/transport/index.ts @@ -169,7 +169,8 @@ class CircuitRelayTransport implements Transport { }) }, { maxInboundStreams: this.maxInboundStopStreams, - maxOutboundStreams: this.maxOutboundStopStreams + maxOutboundStreams: this.maxOutboundStopStreams, + runOnTransientConnection: true }) this.started = true @@ -275,16 +276,16 @@ class CircuitRelayTransport implements Transport { throw new CodeError(`failed to connect via relay with status ${status?.status?.toString() ?? 'undefined'}`, codes.ERR_HOP_REQUEST_FAILED) } - // TODO: do something with limit and transient connection - const maConn = streamToMaConnection({ stream: pbstr.unwrap(), remoteAddr: ma, localAddr: relayAddr.encapsulate(`/p2p-circuit/p2p/${this.peerId.toString()}`) }) - log('new outbound connection %a', maConn.remoteAddr) - return await this.upgrader.upgradeOutbound(maConn) + log('new outbound transient connection %a', maConn.remoteAddr) + return await this.upgrader.upgradeOutbound(maConn, { + transient: true + }) } catch (err) { log.error(`Circuit relay dial to destination ${destinationPeer.toString()} via relay ${connection.remotePeer.toString()} failed`, err) disconnectOnFailure && await connection.close() @@ -380,8 +381,10 @@ class CircuitRelayTransport implements Transport { localAddr }) - log('new inbound connection %s', maConn.remoteAddr) - await this.upgrader.upgradeInbound(maConn) + log('new inbound transient connection %a', maConn.remoteAddr) + await this.upgrader.upgradeInbound(maConn, { + transient: true + }) log('%s connection %a upgraded', 'inbound', maConn.remoteAddr) } } diff --git a/packages/libp2p/src/connection/index.ts b/packages/libp2p/src/connection/index.ts index 94241bd49c..53b1a704b3 100644 --- a/packages/libp2p/src/connection/index.ts +++ b/packages/libp2p/src/connection/index.ts @@ -1,8 +1,9 @@ import { setMaxListeners } from 'events' -import { type Direction, symbol, type Connection, type Stream, type ConnectionTimeline, type ConnectionStatus } from '@libp2p/interface/connection' +import { symbol } from '@libp2p/interface/connection' import { CodeError } from '@libp2p/interface/errors' import { logger } from '@libp2p/logger' import type { AbortOptions } from '@libp2p/interface' +import type { Direction, Connection, Stream, ConnectionTimeline, ConnectionStatus, NewStreamOptions } from '@libp2p/interface/connection' import type { PeerId } from '@libp2p/interface/peer-id' import type { Multiaddr } from '@multiformats/multiaddr' @@ -22,6 +23,7 @@ interface ConnectionInit { timeline: ConnectionTimeline multiplexer?: string encryption?: string + transient?: boolean } /** @@ -49,6 +51,7 @@ export class ConnectionImpl implements Connection { public multiplexer?: string public encryption?: string public status: ConnectionStatus + public transient: boolean /** * User provided tags @@ -59,7 +62,7 @@ export class ConnectionImpl implements Connection { /** * Reference to the new stream function of the multiplexer */ - private readonly _newStream: (protocols: string[], options?: AbortOptions) => Promise + private readonly _newStream: (protocols: string[], options?: NewStreamOptions) => Promise /** * Reference to the close function of the raw connection @@ -88,6 +91,7 @@ export class ConnectionImpl implements Connection { this.timeline = init.timeline this.multiplexer = init.multiplexer this.encryption = init.encryption + this.transient = init.transient ?? false this._newStream = newStream this._close = close @@ -110,7 +114,7 @@ export class ConnectionImpl implements Connection { /** * Create a new stream from this connection */ - async newStream (protocols: string | string[], options?: AbortOptions): Promise { + async newStream (protocols: string | string[], options?: NewStreamOptions): Promise { if (this.status === 'closing') { throw new CodeError('the connection is being closed', 'ERR_CONNECTION_BEING_CLOSED') } @@ -123,6 +127,10 @@ export class ConnectionImpl implements Connection { protocols = [protocols] } + if (this.transient && options?.runOnTransientConnection !== true) { + throw new CodeError('Cannot open protocol stream on transient connection', 'ERR_TRANSIENT_CONNECTION') + } + const stream = await this._newStream(protocols, options) stream.direction = 'outbound' diff --git a/packages/libp2p/src/identify/identify.ts b/packages/libp2p/src/identify/identify.ts index 351074786b..caa5248b45 100644 --- a/packages/libp2p/src/identify/identify.ts +++ b/packages/libp2p/src/identify/identify.ts @@ -45,7 +45,8 @@ const defaultValues = { maxPushOutgoingStreams: 1, maxObservedAddresses: 10, maxIdentifyMessageSize: 8192, - runOnConnectionOpen: true + runOnConnectionOpen: true, + runOnTransientConnection: true } export class DefaultIdentifyService implements Startable, IdentifyService { @@ -70,6 +71,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService { private readonly maxIdentifyMessageSize: number private readonly maxObservedAddresses: number private readonly events: EventEmitter + private readonly runOnTransientConnection: boolean constructor (components: IdentifyServiceComponents, init: IdentifyServiceInit) { this.started = false @@ -89,6 +91,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService { this.maxPushOutgoingStreams = init.maxPushOutgoingStreams ?? defaultValues.maxPushOutgoingStreams this.maxIdentifyMessageSize = init.maxIdentifyMessageSize ?? defaultValues.maxIdentifyMessageSize this.maxObservedAddresses = init.maxObservedAddresses ?? defaultValues.maxObservedAddresses + this.runOnTransientConnection = init.runOnTransientConnection ?? defaultValues.runOnTransientConnection // Store self host metadata this.host = { @@ -141,7 +144,8 @@ export class DefaultIdentifyService implements Startable, IdentifyService { }) }, { maxInboundStreams: this.maxInboundStreams, - maxOutboundStreams: this.maxOutboundStreams + maxOutboundStreams: this.maxOutboundStreams, + runOnTransientConnection: this.runOnTransientConnection }) await this.registrar.handle(this.identifyPushProtocolStr, (data) => { void this._handlePush(data).catch(err => { @@ -149,7 +153,8 @@ export class DefaultIdentifyService implements Startable, IdentifyService { }) }, { maxInboundStreams: this.maxPushIncomingStreams, - maxOutboundStreams: this.maxPushOutgoingStreams + maxOutboundStreams: this.maxPushOutgoingStreams, + runOnTransientConnection: this.runOnTransientConnection }) this.started = true @@ -189,7 +194,8 @@ export class DefaultIdentifyService implements Startable, IdentifyService { try { stream = await connection.newStream([this.identifyPushProtocolStr], { - signal + signal, + runOnTransientConnection: this.runOnTransientConnection }) const pb = pbStream(stream, { @@ -257,7 +263,10 @@ export class DefaultIdentifyService implements Startable, IdentifyService { options.signal = options.signal ?? AbortSignal.timeout(this.timeout) try { - stream = await connection.newStream([this.identifyProtocolStr], options) + stream = await connection.newStream([this.identifyProtocolStr], { + ...options, + runOnTransientConnection: this.runOnTransientConnection + }) const pb = pbStream(stream, { maxDataLength: this.maxIdentifyMessageSize ?? MAX_IDENTIFY_MESSAGE_SIZE diff --git a/packages/libp2p/src/identify/index.ts b/packages/libp2p/src/identify/index.ts index 95855d61b6..a61309b3d3 100644 --- a/packages/libp2p/src/identify/index.ts +++ b/packages/libp2p/src/identify/index.ts @@ -45,6 +45,11 @@ export interface IdentifyServiceInit { * Whether to automatically dial identify on newly opened connections (default: true) */ runOnConnectionOpen?: boolean + + /** + * Whether to run on connections with data or duration limits (default: true) + */ + runOnTransientConnection?: boolean } export interface IdentifyServiceComponents { diff --git a/packages/libp2p/src/libp2p.ts b/packages/libp2p/src/libp2p.ts index 206c0c45d7..d81099a184 100644 --- a/packages/libp2p/src/libp2p.ts +++ b/packages/libp2p/src/libp2p.ts @@ -30,7 +30,7 @@ import { DefaultUpgrader } from './upgrader.js' import type { Components } from './components.js' import type { Libp2p, Libp2pInit, Libp2pOptions } from './index.js' import type { Libp2pEvents, PendingDial, ServiceMap, AbortOptions } from '@libp2p/interface' -import type { Connection, Stream } from '@libp2p/interface/connection' +import type { Connection, NewStreamOptions, Stream } from '@libp2p/interface/connection' import type { KeyChain } from '@libp2p/interface/keychain' import type { Metrics } from '@libp2p/interface/metrics' import type { PeerId } from '@libp2p/interface/peer-id' @@ -283,7 +283,7 @@ export class Libp2pNode> extends return this.components.connectionManager.openConnection(peer, options) } - async dialProtocol (peer: PeerId | Multiaddr | Multiaddr[], protocols: string | string[], options: AbortOptions = {}): Promise { + async dialProtocol (peer: PeerId | Multiaddr | Multiaddr[], protocols: string | string[], options: NewStreamOptions = {}): Promise { if (protocols == null) { throw new CodeError('no protocols were provided to open a stream', codes.ERR_INVALID_PROTOCOLS_FOR_STREAM) } diff --git a/packages/libp2p/src/ping/index.ts b/packages/libp2p/src/ping/index.ts index d189d41202..82c36c3aab 100644 --- a/packages/libp2p/src/ping/index.ts +++ b/packages/libp2p/src/ping/index.ts @@ -25,6 +25,7 @@ export interface PingServiceInit { protocolPrefix?: string maxInboundStreams?: number maxOutboundStreams?: number + runOnTransientConnection?: boolean /** * How long we should wait for a ping response @@ -44,6 +45,7 @@ class DefaultPingService implements Startable, PingService { private readonly timeout: number private readonly maxInboundStreams: number private readonly maxOutboundStreams: number + private readonly runOnTransientConnection: boolean constructor (components: PingServiceComponents, init: PingServiceInit) { this.components = components @@ -52,12 +54,14 @@ class DefaultPingService implements Startable, PingService { this.timeout = init.timeout ?? TIMEOUT this.maxInboundStreams = init.maxInboundStreams ?? MAX_INBOUND_STREAMS this.maxOutboundStreams = init.maxOutboundStreams ?? MAX_OUTBOUND_STREAMS + this.runOnTransientConnection = init.runOnTransientConnection ?? true } async start (): Promise { await this.components.registrar.handle(this.protocol, this.handleMessage, { maxInboundStreams: this.maxInboundStreams, - maxOutboundStreams: this.maxOutboundStreams + maxOutboundStreams: this.maxOutboundStreams, + runOnTransientConnection: this.runOnTransientConnection }) this.started = true } @@ -108,7 +112,10 @@ class DefaultPingService implements Startable, PingService { options.signal = options.signal ?? AbortSignal.timeout(this.timeout) try { - stream = await connection.newStream([this.protocol], options) + stream = await connection.newStream(this.protocol, { + ...options, + runOnTransientConnection: this.runOnTransientConnection + }) // make stream abortable const source = abortableDuplex(stream, options.signal) diff --git a/packages/libp2p/src/upgrader.ts b/packages/libp2p/src/upgrader.ts index f40e1cdd52..f77dcf6319 100644 --- a/packages/libp2p/src/upgrader.ts +++ b/packages/libp2p/src/upgrader.ts @@ -18,9 +18,9 @@ import type { Metrics } from '@libp2p/interface/metrics' import type { PeerId } from '@libp2p/interface/peer-id' import type { PeerStore } from '@libp2p/interface/peer-store' import type { StreamMuxer, StreamMuxerFactory } from '@libp2p/interface/stream-muxer' +import type { Upgrader, UpgraderOptions } from '@libp2p/interface/transport' import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager' import type { Registrar } from '@libp2p/interface-internal/registrar' -import type { Upgrader, UpgraderOptions } from '@libp2p/interface-internal/upgrader' import type { Duplex, Source } from 'it-stream-types' const log = logger('libp2p:upgrader') @@ -32,6 +32,7 @@ interface CreateConnectionOptions { upgradedConn: Duplex, Source, Promise> remotePeer: PeerId muxerFactory?: StreamMuxerFactory + transient?: boolean } interface OnStreamOptions { @@ -251,7 +252,8 @@ export class DefaultUpgrader implements Upgrader { maConn, upgradedConn, muxerFactory, - remotePeer + remotePeer, + transient: opts?.transient }) } finally { this.components.connectionManager.afterUpgradeInbound() @@ -348,7 +350,8 @@ export class DefaultUpgrader implements Upgrader { maConn, upgradedConn, muxerFactory, - remotePeer + remotePeer, + transient: opts?.transient }) } @@ -362,7 +365,8 @@ export class DefaultUpgrader implements Upgrader { maConn, upgradedConn, remotePeer, - muxerFactory + muxerFactory, + transient } = opts let muxer: StreamMuxer | undefined @@ -541,6 +545,7 @@ export class DefaultUpgrader implements Upgrader { timeline: maConn.timeline, multiplexer: muxer?.protocol, encryption: cryptoProtocol, + transient, newStream: newStream ?? errConnectionNotMultiplexed, getStreams: () => { if (muxer != null) { return muxer.streams } else { return [] } }, close: async (options?: AbortOptions) => { @@ -571,7 +576,11 @@ export class DefaultUpgrader implements Upgrader { */ _onStream (opts: OnStreamOptions): void { const { connection, stream, protocol } = opts - const { handler } = this.components.registrar.getHandler(protocol) + const { handler, options } = this.components.registrar.getHandler(protocol) + + if (connection.transient && options.runOnTransientConnection !== true) { + throw new CodeError('Cannot open protocol stream on transient connection', 'ERR_TRANSIENT_CONNECTION') + } handler({ connection, stream }) } diff --git a/packages/libp2p/test/circuit-relay/relay.node.ts b/packages/libp2p/test/circuit-relay/relay.node.ts index ae2786fea9..24b843bd3d 100644 --- a/packages/libp2p/test/circuit-relay/relay.node.ts +++ b/packages/libp2p/test/circuit-relay/relay.node.ts @@ -8,6 +8,7 @@ import { Circuit } from '@multiformats/mafmt' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import delay from 'delay' +import { pipe } from 'it-pipe' import { pbStream } from 'it-protobuf-stream' import defer from 'p-defer' import pWaitFor from 'p-wait-for' @@ -624,7 +625,9 @@ describe('circuit-relay', () => { const ma = getRelayAddress(relay1) // open hop stream and try to connect to remote - const stream = await local.dialProtocol(ma, RELAY_V2_HOP_CODEC) + const stream = await local.dialProtocol(ma, RELAY_V2_HOP_CODEC, { + runOnTransientConnection: true + }) const hopStream = pbStream(stream).pb(HopMessage) @@ -671,6 +674,93 @@ describe('circuit-relay', () => { expect(events[0].detail.remotePeer.toString()).to.equal(remote.peerId.toString()) expect(events[1].detail.remotePeer.toString()).to.equal(relay1.peerId.toString()) }) + + it('should mark a relayed connection as transient', async () => { + // discover relay and make reservation + const connectionToRelay = await remote.dial(relay1.getMultiaddrs()[0]) + + // connection to relay should not be marked transient + expect(connectionToRelay).to.have.property('transient', false) + + await usingAsRelay(remote, relay1) + + // dial the remote through the relay + const ma = getRelayAddress(remote) + const connection = await local.dial(ma) + + // connection to remote through relay should be marked transient + expect(connection).to.have.property('transient', true) + }) + + it('should not open streams on a transient connection', async () => { + // discover relay and make reservation + await remote.dial(relay1.getMultiaddrs()[0]) + await usingAsRelay(remote, relay1) + + // dial the remote through the relay + const ma = getRelayAddress(remote) + const connection = await local.dial(ma) + + // connection should be marked transient + expect(connection).to.have.property('transient', true) + + await expect(connection.newStream('/my-protocol/1.0.0')) + .to.eventually.be.rejected.with.property('code', 'ERR_TRANSIENT_CONNECTION') + }) + + it('should not allow incoming streams on a transient connection', async () => { + const protocol = '/my-protocol/1.0.0' + + // remote registers handler, disallow running over transient streams + await remote.handle(protocol, ({ stream }) => { + void pipe(stream, stream) + }, { + runOnTransientConnection: false + }) + + // discover relay and make reservation + await remote.dial(relay1.getMultiaddrs()[0]) + await usingAsRelay(remote, relay1) + + // dial the remote through the relay + const ma = getRelayAddress(remote) + const connection = await local.dial(ma) + + // connection should be marked transient + expect(connection).to.have.property('transient', true) + + await expect(connection.newStream('/my-protocol/1.0.0', { + runOnTransientConnection: false + })) + .to.eventually.be.rejected.with.property('code', 'ERR_TRANSIENT_CONNECTION') + }) + + it('should open streams on a transient connection when told to do so', async () => { + const protocol = '/my-protocol/1.0.0' + + // remote registers handler, allow running over transient streams + await remote.handle(protocol, ({ stream }) => { + void pipe(stream, stream) + }, { + runOnTransientConnection: true + }) + + // discover relay and make reservation + await remote.dial(relay1.getMultiaddrs()[0]) + await usingAsRelay(remote, relay1) + + // dial the remote through the relay + const ma = getRelayAddress(remote) + const connection = await local.dial(ma) + + // connection should be marked transient + expect(connection).to.have.property('transient', true) + + await expect(connection.newStream('/my-protocol/1.0.0', { + runOnTransientConnection: true + })) + .to.eventually.be.ok() + }) }) describe('flows with data limit', () => { @@ -913,13 +1003,17 @@ describe('circuit-relay', () => { } } catch {} }) + }, { + runOnTransientConnection: true }) // dial the remote from the local through the relay const ma = getRelayAddress(remote) try { - const stream = await local.dialProtocol(ma, protocol) + const stream = await local.dialProtocol(ma, protocol, { + runOnTransientConnection: true + }) await stream.sink(async function * () { while (true) { diff --git a/packages/libp2p/test/ping/ping.node.ts b/packages/libp2p/test/ping/ping.node.ts index d5c1d18840..aba6537aa7 100644 --- a/packages/libp2p/test/ping/ping.node.ts +++ b/packages/libp2p/test/ping/ping.node.ts @@ -90,6 +90,8 @@ describe('ping', () => { }, stream ) + }, { + runOnTransientConnection: true }) const latency = await nodes[0].services.ping.ping(nodes[1].peerId) diff --git a/packages/transport-webrtc/src/private-to-private/transport.ts b/packages/transport-webrtc/src/private-to-private/transport.ts index 67b5b38abf..e9d2f83bdb 100644 --- a/packages/transport-webrtc/src/private-to-private/transport.ts +++ b/packages/transport-webrtc/src/private-to-private/transport.ts @@ -49,6 +49,8 @@ export class WebRTCTransport implements Transport, Startable { async start (): Promise { await this.components.registrar.handle(SIGNALING_PROTO_ID, (data: IncomingStreamData) => { this._onProtocol(data).catch(err => { log.error('failed to handle incoming connect from %p', data.connection.remotePeer, err) }) + }, { + runOnTransientConnection: true }) this._started = true } @@ -90,7 +92,10 @@ export class WebRTCTransport implements Transport, Startable { } const connection = await this.components.transportManager.dial(baseAddr, options) - const signalingStream = await connection.newStream([SIGNALING_PROTO_ID], options) + const signalingStream = await connection.newStream(SIGNALING_PROTO_ID, { + ...options, + runOnTransientConnection: true + }) try { const { pc, muxerFactory, remoteAddress } = await initiateConnection({ diff --git a/packages/transport-webrtc/test/basics.spec.ts b/packages/transport-webrtc/test/basics.spec.ts index a241d15c54..03d89a1c8b 100644 --- a/packages/transport-webrtc/test/basics.spec.ts +++ b/packages/transport-webrtc/test/basics.spec.ts @@ -69,6 +69,8 @@ describe('basics', () => { stream, stream ) + }, { + runOnTransientConnection: true }) const connection = await localNode.dial(remoteAddr) @@ -99,7 +101,9 @@ describe('basics', () => { const connection = await connectNodes() // open a stream on the echo protocol - const stream = await connection.newStream(echo) + const stream = await connection.newStream(echo, { + runOnTransientConnection: true + }) // send and receive some data const input = new Array(5).fill(0).map(() => new Uint8Array(10)) @@ -118,7 +122,9 @@ describe('basics', () => { const connection = await connectNodes() // open a stream on the echo protocol - const stream = await connection.newStream(echo) + const stream = await connection.newStream(echo, { + runOnTransientConnection: true + }) // send and receive some data const input = new Array(5).fill(0).map(() => new Uint8Array(1024 * 1024))