Skip to content

Commit 987c738

Browse files
authored
feat: allow skipping upgrade steps for incoming connections (#1502)
We already allow skipping upgrade steps for outgoing connections, this PR adds the same capability of incoming connections. This is to support listeners for new transports like webtransport and webrtc that manage their own encryption and multiplexing.
1 parent d01c37e commit 987c738

File tree

2 files changed

+125
-26
lines changed

2 files changed

+125
-26
lines changed

src/upgrader.ts

+37-19
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
131131
/**
132132
* Upgrades an inbound connection
133133
*/
134-
async upgradeInbound (maConn: MultiaddrConnection): Promise<Connection> {
134+
async upgradeInbound (maConn: MultiaddrConnection, opts?: UpgraderOptions): Promise<Connection> {
135135
const accept = await this.components.connectionManager.acceptIncomingConnection(maConn)
136136

137137
if (!accept) {
@@ -166,38 +166,56 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
166166

167167
// Protect
168168
let protectedConn = maConn
169-
const protector = this.components.connectionProtector
170169

171-
if (protector != null) {
172-
log('protecting the inbound connection')
173-
protectedConn = await protector.protect(maConn)
170+
if (opts?.skipProtection !== true) {
171+
const protector = this.components.connectionProtector
172+
173+
if (protector != null) {
174+
log('protecting the inbound connection')
175+
protectedConn = await protector.protect(maConn)
176+
}
174177
}
175178

176179
try {
177180
// Encrypt the connection
178-
({
179-
conn: encryptedConn,
180-
remotePeer,
181-
protocol: cryptoProtocol
182-
} = await this._encryptInbound(protectedConn))
181+
encryptedConn = protectedConn
182+
if (opts?.skipEncryption !== true) {
183+
({
184+
conn: encryptedConn,
185+
remotePeer,
186+
protocol: cryptoProtocol
187+
} = await this._encryptInbound(protectedConn))
188+
189+
if (await this.components.connectionGater.denyInboundEncryptedConnection(remotePeer, {
190+
...protectedConn,
191+
...encryptedConn
192+
})) {
193+
throw errCode(new Error('The multiaddr connection is blocked by gater.acceptEncryptedConnection'), codes.ERR_CONNECTION_INTERCEPTED)
194+
}
195+
} else {
196+
const idStr = maConn.remoteAddr.getPeerId()
183197

184-
if (await this.components.connectionGater.denyInboundEncryptedConnection(remotePeer, {
185-
...protectedConn,
186-
...encryptedConn
187-
})) {
188-
throw errCode(new Error('The multiaddr connection is blocked by gater.acceptEncryptedConnection'), codes.ERR_CONNECTION_INTERCEPTED)
198+
if (idStr == null) {
199+
throw errCode(new Error('inbound connection that skipped encryption must have a peer id'), codes.ERR_INVALID_MULTIADDR)
200+
}
201+
202+
const remotePeerId = peerIdFromString(idStr)
203+
204+
cryptoProtocol = 'native'
205+
remotePeer = remotePeerId
189206
}
190207

191-
// Multiplex the connection
192-
if (this.muxers.size > 0) {
208+
upgradedConn = encryptedConn
209+
if (opts?.muxerFactory != null) {
210+
muxerFactory = opts.muxerFactory
211+
} else if (this.muxers.size > 0) {
212+
// Multiplex the connection
193213
const multiplexed = await this._multiplexInbound({
194214
...protectedConn,
195215
...encryptedConn
196216
}, this.muxers)
197217
muxerFactory = multiplexed.muxerFactory
198218
upgradedConn = multiplexed.stream
199-
} else {
200-
upgradedConn = encryptedConn
201219
}
202220
} catch (err: any) {
203221
log.error('Failed to upgrade inbound connection', err)

test/upgrading/upgrader.spec.ts

+88-7
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
1313
import swarmKey from '../fixtures/swarm.key.js'
1414
import { DefaultUpgrader } from '../../src/upgrader.js'
1515
import { codes } from '../../src/errors.js'
16-
import { mockConnectionGater, mockConnectionManager, mockMultiaddrConnPair, mockRegistrar, mockStream } from '@libp2p/interface-mocks'
16+
import { mockConnectionGater, mockConnectionManager, mockMultiaddrConnPair, mockRegistrar, mockStream, mockMuxer } from '@libp2p/interface-mocks'
1717
import Peers from '../fixtures/peers.js'
1818
import type { Upgrader } from '@libp2p/interface-transport'
1919
import type { PeerId } from '@libp2p/interface-peer-id'
2020
import { createFromJSON } from '@libp2p/peer-id-factory'
2121
import { plaintext } from '../../src/insecure/index.js'
2222
import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface-connection-encrypter'
2323
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface-stream-muxer'
24-
import type { Stream } from '@libp2p/interface-connection'
24+
import type { ConnectionProtector, Stream } from '@libp2p/interface-connection'
2525
import pDefer from 'p-defer'
2626
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
2727
import { pEvent } from 'p-event'
@@ -32,6 +32,7 @@ import { Uint8ArrayList } from 'uint8arraylist'
3232
import { PersistentPeerStore } from '@libp2p/peer-store'
3333
import { MemoryDatastore } from 'datastore-core'
3434
import { DefaultComponents } from '../../src/components.js'
35+
import { StubbedInstance, stubInterface } from 'sinon-ts'
3536

3637
const addrs = [
3738
multiaddr('/ip4/127.0.0.1/tcp/0'),
@@ -41,7 +42,12 @@ const addrs = [
4142
describe('Upgrader', () => {
4243
let localUpgrader: Upgrader
4344
let localMuxerFactory: StreamMuxerFactory
45+
let localConnectionEncrypter: ConnectionEncrypter
46+
let localConnectionProtector: StubbedInstance<ConnectionProtector>
4447
let remoteUpgrader: Upgrader
48+
let remoteMuxerFactory: StreamMuxerFactory
49+
let remoteConnectionEncrypter: ConnectionEncrypter
50+
let remoteConnectionProtector: StubbedInstance<ConnectionProtector>
4551
let localPeer: PeerId
4652
let remotePeer: PeerId
4753
let localComponents: DefaultComponents
@@ -56,39 +62,50 @@ describe('Upgrader', () => {
5662
createFromJSON(Peers[1])
5763
]))
5864

65+
localConnectionProtector = stubInterface<ConnectionProtector>()
66+
localConnectionProtector.protect.resolvesArg(0)
67+
5968
localComponents = new DefaultComponents({
6069
peerId: localPeer,
6170
connectionGater: mockConnectionGater(),
6271
registrar: mockRegistrar(),
63-
datastore: new MemoryDatastore()
72+
datastore: new MemoryDatastore(),
73+
connectionProtector: localConnectionProtector
6474
})
6575
localComponents.peerStore = new PersistentPeerStore(localComponents)
6676
localComponents.connectionManager = mockConnectionManager(localComponents)
6777
localMuxerFactory = mplex()()
78+
localConnectionEncrypter = plaintext()()
6879
localUpgrader = new DefaultUpgrader(localComponents, {
6980
connectionEncryption: [
70-
plaintext()()
81+
localConnectionEncrypter
7182
],
7283
muxers: [
7384
localMuxerFactory
7485
],
7586
inboundUpgradeTimeout: 1000
7687
})
7788

89+
remoteConnectionProtector = stubInterface<ConnectionProtector>()
90+
remoteConnectionProtector.protect.resolvesArg(0)
91+
7892
remoteComponents = new DefaultComponents({
7993
peerId: remotePeer,
8094
connectionGater: mockConnectionGater(),
8195
registrar: mockRegistrar(),
82-
datastore: new MemoryDatastore()
96+
datastore: new MemoryDatastore(),
97+
connectionProtector: remoteConnectionProtector
8398
})
8499
remoteComponents.peerStore = new PersistentPeerStore(remoteComponents)
85100
remoteComponents.connectionManager = mockConnectionManager(remoteComponents)
101+
remoteMuxerFactory = mplex()()
102+
remoteConnectionEncrypter = plaintext()()
86103
remoteUpgrader = new DefaultUpgrader(remoteComponents, {
87104
connectionEncryption: [
88-
plaintext()()
105+
remoteConnectionEncrypter
89106
],
90107
muxers: [
91-
mplex()()
108+
remoteMuxerFactory
92109
],
93110
inboundUpgradeTimeout: 1000
94111
})
@@ -451,6 +468,70 @@ describe('Upgrader', () => {
451468
expect(connections[0].streams).to.have.lengthOf(0)
452469
expect(connections[1].streams).to.have.lengthOf(0)
453470
})
471+
472+
it('should allow skipping encryption, protection and muxing', async () => {
473+
const localStreamMuxerFactorySpy = sinon.spy(localMuxerFactory, 'createStreamMuxer')
474+
const localMuxerFactoryOverride = mockMuxer()
475+
const localStreamMuxerFactoryOverrideSpy = sinon.spy(localMuxerFactoryOverride, 'createStreamMuxer')
476+
const localConnectionEncrypterSpy = sinon.spy(localConnectionEncrypter, 'secureOutbound')
477+
478+
const remoteStreamMuxerFactorySpy = sinon.spy(remoteMuxerFactory, 'createStreamMuxer')
479+
const remoteMuxerFactoryOverride = mockMuxer()
480+
const remoteStreamMuxerFactoryOverrideSpy = sinon.spy(remoteMuxerFactoryOverride, 'createStreamMuxer')
481+
const remoteConnectionEncrypterSpy = sinon.spy(remoteConnectionEncrypter, 'secureInbound')
482+
483+
const { inbound, outbound } = mockMultiaddrConnPair({
484+
addrs: [
485+
multiaddr('/ip4/127.0.0.1/tcp/0').encapsulate(`/p2p/${remotePeer.toString()}`),
486+
multiaddr('/ip4/127.0.0.1/tcp/0')
487+
],
488+
remotePeer
489+
})
490+
491+
const connections = await Promise.all([
492+
localUpgrader.upgradeOutbound(outbound, {
493+
skipEncryption: true,
494+
skipProtection: true,
495+
muxerFactory: localMuxerFactoryOverride
496+
}),
497+
remoteUpgrader.upgradeInbound(inbound, {
498+
skipEncryption: true,
499+
skipProtection: true,
500+
muxerFactory: remoteMuxerFactoryOverride
501+
})
502+
])
503+
504+
expect(connections).to.have.length(2)
505+
506+
const stream = await connections[0].newStream('/echo/1.0.0')
507+
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
508+
509+
const hello = uint8ArrayFromString('hello there!')
510+
const result = await pipe(
511+
[hello],
512+
stream,
513+
function toBuffer (source) {
514+
return (async function * () {
515+
for await (const val of source) yield val.slice()
516+
})()
517+
},
518+
async (source) => await all(source)
519+
)
520+
521+
expect(result).to.eql([hello])
522+
523+
expect(localStreamMuxerFactorySpy.callCount).to.equal(0, 'did not use passed stream muxer factory')
524+
expect(localStreamMuxerFactoryOverrideSpy.callCount).to.equal(1, 'did not use passed stream muxer factory')
525+
526+
expect(remoteStreamMuxerFactorySpy.callCount).to.equal(0, 'did not use passed stream muxer factory')
527+
expect(remoteStreamMuxerFactoryOverrideSpy.callCount).to.equal(1, 'did not use passed stream muxer factory')
528+
529+
expect(localConnectionEncrypterSpy.callCount).to.equal(0, 'used local connection encrypter')
530+
expect(remoteConnectionEncrypterSpy.callCount).to.equal(0, 'used remote connection encrypter')
531+
532+
expect(localConnectionProtector.protect.callCount).to.equal(0, 'used local connection protector')
533+
expect(remoteConnectionProtector.protect.callCount).to.equal(0, 'used remote connection protector')
534+
})
454535
})
455536

456537
describe('libp2p.upgrader', () => {

0 commit comments

Comments
 (0)