Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement blind relay support #135

Merged
merged 26 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ef128b6
Implement bridging relay support
kasperisager Aug 29, 2023
2e6bc66
Add missing dependency
kasperisager Sep 8, 2023
ec85c84
Make sure relay examples bypass local connections
kasperisager Sep 12, 2023
2f4b06a
Accept `relayThrough` function option
kasperisager Sep 14, 2023
406bbdd
Update `protomux-bridging-relay`
kasperisager Sep 20, 2023
6e5e529
Don't throw if hole punching succeeds before pairing
kasperisager Sep 21, 2023
44edce7
Unpair if hole punching succeeds before pairing
kasperisager Sep 21, 2023
12acf3a
Continue with normal hole punching
kasperisager Sep 21, 2023
5894ab8
Guard against multiple sockets succeeding
kasperisager Sep 21, 2023
fd12755
Continue on relayed connection even if hole punching is aborted or fails
kasperisager Sep 22, 2023
1fcff53
Close relay sockets when raw stream closes
kasperisager Sep 22, 2023
3750392
Review from @mafintosh
kasperisager Sep 22, 2023
a5c447e
Merge branch 'main' into connection-relaying
kasperisager Sep 22, 2023
eeefdb3
Version relay handshake information
kasperisager Sep 22, 2023
dd00ae7
Review from @mafintosh
kasperisager Sep 22, 2023
d85edc5
Update `udx-native`
kasperisager Sep 22, 2023
9740438
Update `udx-native`
kasperisager Sep 26, 2023
a8d3fbc
Merge branch 'main' into connection-relaying
mafintosh Sep 26, 2023
7308c08
Handle relay data races between client and server
kasperisager Sep 27, 2023
ad8dd52
Don't bail before hole punching even starts
kasperisager Sep 27, 2023
9c22d61
Move relay setup to helper functions
kasperisager Sep 29, 2023
720cb47
Switch to `protomux-blind-relay`
kasperisager Sep 29, 2023
66e22c4
Switch to `blind-relay`
kasperisager Sep 29, 2023
c16a66b
Update `udx-native`
kasperisager Sep 29, 2023
22fc148
Remove unused dependencies
kasperisager Sep 29, 2023
36d2648
Kill timeout when relaying connection
kasperisager Sep 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions examples/connection-relaying/client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
const DHT = require('../..')

const [relay, server] = process.argv.slice(2)

const dht = new DHT()

const socket = dht.connect(Buffer.from(server, 'hex'), {
localConnection: false,
relayThrough: Buffer.from(relay, 'hex')
})

console.log('Client connecting from', socket.publicKey.toString('hex'))

socket.end('Hello!')
23 changes: 23 additions & 0 deletions examples/connection-relaying/relay.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
const RelayServer = require('blind-relay').Server
const DHT = require('../..')

const dht = new DHT()

const relay = new RelayServer({
createStream (opts) {
return dht.createRawStream({ ...opts, framed: true })
}
})

const server = dht.createServer({ shareLocalAddress: false }, (socket) => {
console.log('Connection from', socket.remotePublicKey.toString('hex'))
const session = relay.accept(socket, { id: socket.remotePublicKey })
session
.on('pair', (isInitiator, token, stream, remoteId) => {
console.log('Pair isInitiator =', isInitiator, 'token =', token.toString('hex'))
})
})

server
.listen()
.then(() => console.log('Relay listening on', server.publicKey.toString('hex')))
14 changes: 14 additions & 0 deletions examples/connection-relaying/server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
const DHT = require('../..')

const dht = new DHT()

const server = dht.createServer({ shareLocalAddress: false }, (socket) => {
console.log('Connection from', socket.remotePublicKey.toString('hex'))
socket
.on('data', (data) => console.log(data.toString()))
.end()
})

server
.listen()
.then(() => console.log('Server listening on', server.publicKey.toString('hex')))
5 changes: 5 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const connect = require('./lib/connect')
const { FIREWALL, BOOTSTRAP_NODES, COMMANDS } = require('./lib/constants')
const { hash, createKeyPair } = require('./lib/crypto')
const RawStreamSet = require('./lib/raw-stream-set')
const ConnectionPool = require('./lib/connection-pool')
const { STREAM_NOT_CONNECTED } = require('./lib/errors')

const maxSize = 65536
Expand Down Expand Up @@ -60,6 +61,10 @@ class HyperDHT extends DHT {
return s
}

pool () {
return new ConnectionPool(this)
}

async destroy ({ force } = {}) {
if (!force) {
const closing = []
Expand Down
111 changes: 98 additions & 13 deletions lib/connect.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const NoiseSecretStream = require('@hyperswarm/secret-stream')
const b4a = require('b4a')
const relay = require('blind-relay')
const DebuggingStream = require('debugging-stream')
const { isPrivate } = require('bogon')
const Semaphore = require('./semaphore')
Expand All @@ -26,16 +27,23 @@ const {
} = require('./errors')

module.exports = function connect (dht, publicKey, opts = {}) {
const pool = opts.pool || null

if (pool && pool.has(publicKey)) return pool.get(publicKey)

const keyPair = opts.keyPair || dht.defaultKeyPair
const encryptedSocket = (opts.createSecretStream || defaultCreateSecretStream)(true, null, {
publicKey: keyPair.publicKey,
remotePublicKey: publicKey,
autoStart: false
})

if (pool) pool._attachStream(encryptedSocket, false)

const c = {
dht,
session: dht.session(),
pool,
round: 0,
target: hash(publicKey),
remotePublicKey: publicKey,
Expand All @@ -45,7 +53,7 @@ module.exports = function connect (dht, publicKey, opts = {}) {
requesting: false,
lan: opts.localConnection !== false,
firewall: FIREWALL.UNKNOWN,
rawStream: dht._rawStreams.add({ framed: true, firewall }),
rawStream: dht.createRawStream({ framed: true, firewall }),
connect: null,
query: null,
puncher: null,
Expand All @@ -55,7 +63,14 @@ module.exports = function connect (dht, publicKey, opts = {}) {
serverAddress: null,
onsocket: null,
sleeper: new Sleeper(),
encryptedSocket
encryptedSocket,

// Relay state
relayThrough: opts.relayThrough || null,
relayToken: opts.relayThrough ? relay.token() : null,
relaySocket: null,
relayClient: null,
relayPaired: false
}

// If the raw stream receives an error signal pre connect (ie from the firewall hook), make sure
Expand Down Expand Up @@ -86,6 +101,12 @@ module.exports = function connect (dht, publicKey, opts = {}) {
}

function firewall (socket, port, host) {
// Check if the traffic originated from the socket on which we're expecting relay traffic. If so,
// we haven't hole punched yet and the other side is just sending us traffic through the relay.
if (c.relaySocket && c.relaySocket.rawStream && c.relaySocket.rawStream.socket === socket) {
return false
}

if (c.onsocket) {
c.onsocket(socket, port, host)
} else {
Expand Down Expand Up @@ -156,7 +177,9 @@ async function holepunch (c, opts) {
// TODO: check all addresses also obvs
}

const onabort = () => destroyEncryptedSocket(c, HOLEPUNCH_ABORTED())
const onabort = () => {
if (c.relayToken === null) destroyEncryptedSocket(c, HOLEPUNCH_ABORTED())
}

if (c.firewall === FIREWALL.OPEN) {
c.passiveConnectTimeout = setTimeout(onabort, 10000)
Expand Down Expand Up @@ -294,7 +317,10 @@ async function connectThroughNode (c, address, socket) {
id: c.rawStream.id,
seq: 0
},
secretStream: {}
secretStream: {},
relayThrough: c.relayThrough
? { publicKey: c.relayThrough, token: c.relayToken }
: null
})
if (isDone(c)) return
}
Expand Down Expand Up @@ -336,12 +362,17 @@ async function connectThroughNode (c, address, socket) {
c.onsocket = function (socket, port, host) {
if (c.rawStream === null) return // Already hole punched

const rawStream = c.dht._debugStream !== null
? new DebuggingStream(c.rawStream, c.dht._debugStream)
: c.rawStream
if (c.rawStream.connected) {
c.rawStream.changeRemote(socket, c.connect.payload.udx.id, port, host)
} else {
c.rawStream.connect(socket, c.connect.payload.udx.id, port, host)

const rawStream = c.dht._debugStream !== null
? new DebuggingStream(c.rawStream, c.dht._debugStream)
: c.rawStream

c.rawStream.connect(socket, c.connect.payload.udx.id, port, host)
c.encryptedSocket.start(rawStream, { handshake: hs })
c.encryptedSocket.start(rawStream, { handshake: hs })
}

if (c.reusableSocket && payload.udx.reusableSocket) {
c.dht._socketPool.routes.add(c.remotePublicKey, c.rawStream)
Expand All @@ -359,6 +390,10 @@ async function connectThroughNode (c, address, socket) {
c.rawStream = null
}

if (payload.relayThrough || c.relayThrough) {
relayConnection(c, c.relayThrough, payload, hs)
}

if (c.serverSocket) {
c.onsocket(c.serverSocket, c.serverAddress.port, c.serverAddress.host)
return
Expand Down Expand Up @@ -387,7 +422,8 @@ async function updateHolepunch (c, peerAddress, relayAddr, payload) {

const { error, firewall, punching, addresses, remoteToken } = remotePayload
if (error !== ERROR.NONE) {
throw REMOTE_ABORTED('Remote aborted with error code ' + error)
// Don't throw if we're being relayed
if (!c.relayToken) throw REMOTE_ABORTED('Remote aborted with error code ' + error)
}

const echoed = !!(remoteToken && payload.token && b4a.equals(remoteToken, payload.token))
Expand Down Expand Up @@ -485,11 +521,13 @@ async function roundPunch (c, serverAddress, remoteToken, clientRelay) {
})

if (!c.puncher.remoteHolepunching) {
throw REMOTE_NOT_HOLEPUNCHING()
// Don't throw if we're being relayed
if (!c.relayToken) throw REMOTE_NOT_HOLEPUNCHING()
}

if (!await c.puncher.punch()) {
throw REMOTE_NOT_HOLEPUNCHABLE()
// Don't throw if we're being relayed
if (!c.relayToken) throw REMOTE_NOT_HOLEPUNCHABLE()
}
}

Expand All @@ -512,7 +550,54 @@ async function abort (c, { peerAddress, relayAddress }, err) {
remoteToken: null
})
} catch {}
destroyEncryptedSocket(c, err)

if (c.relayToken === null) destroyEncryptedSocket(c, err)
}

function relayConnection (c, relayThrough, payload, hs) {
if (c.passiveConnectTimeout) clearPassiveConnectTimeout(c)

let isInitiator
let publicKey
let token

if (payload.relayThrough) {
isInitiator = false
publicKey = payload.relayThrough.publicKey
token = payload.relayThrough.token
} else {
isInitiator = true
publicKey = relayThrough
token = c.relayToken
}

c.relayToken = token
c.relaySocket = c.dht.connect(publicKey)
c.relayClient = relay.Client.from(c.relaySocket, { id: c.relaySocket.publicKey })

c.relayClient
.pair(isInitiator, token, c.rawStream)
.on('error', () => c.relaySocket.destroy())
.on('data', (remoteId) => {
if (c.rawStream === null) return
c.relayPaired = true

const {
remotePort,
remoteHost,
socket
} = c.relaySocket.rawStream

c.rawStream
.on('close', () => c.relaySocket.destroy())
.connect(socket, remoteId, remotePort, remoteHost)

const rawStream = c.dht._debugStream !== null
? new DebuggingStream(c.rawStream, c.dht._debugStream)
: c.rawStream

c.encryptedSocket.start(rawStream, { handshake: hs })
})
}

function clearPassiveConnectTimeout (c) {
Expand Down
Loading
Loading