Skip to content

Commit

Permalink
Test lack of relay support on chosen relay server
Browse files Browse the repository at this point in the history
  • Loading branch information
kasperisager committed Sep 6, 2023
1 parent 881819d commit f5544aa
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 46 deletions.
31 changes: 19 additions & 12 deletions lib/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ module.exports = function connect (dht, publicKey, opts = {}) {
autoStart: false
})

if (pool) pool._attachSocket(encryptedSocket, false)
if (pool) pool._attachStream(encryptedSocket, false)

const c = {
dht,
Expand Down Expand Up @@ -377,21 +377,28 @@ async function connectThroughNode (c, address, socket) {
c.rawStream = null
}

if (payload.relayThrough) {
const { publicKey, token } = payload.relayThrough
if (payload.relayThrough || c.relayThrough) {
let isInitiator, publicKey, token

const relay = ConnectionRelayClient.attachTo(c.dht.connect(publicKey, { pool: c.pool }))

relay.rendezvous(false, token, c.rawStream, c.onsocket)
if (payload.relayThrough) {
isInitiator = false
publicKey = payload.relayThrough.publicKey
token = payload.relayThrough.token
} else {
isInitiator = true
publicKey = c.relayThrough
token = c.relayToken
}

// TODO
return
}
const request = ConnectionRelayClient
.attachTo(c.dht.connect(publicKey, { pool: c.pool }))
.rendezvous(isInitiator, token, c.rawStream, c.onsocket)

if (c.relayThrough) {
const relay = ConnectionRelayClient.attachTo(c.dht.connect(c.relayThrough, { pool: c.pool }))
const onerror = () => request.destroy()

relay.rendezvous(true, c.relayToken, c.rawStream, c.onsocket)
c.encryptedSocket
.on('error', onerror)
.on('open', () => c.encryptedSocket.off('error', onerror))

// TODO
return
Expand Down
30 changes: 15 additions & 15 deletions lib/connection-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ module.exports = class ConnectionPool extends EventEmitter {
this._servers.delete(keyString)
})
.on('connection', (socket) => {
this._attachSocket(socket, true)
this._attachStream(socket, true)
})
}

_attachSocket (socket, opened) {
const existing = this.get(socket.remotePublicKey)
_attachStream (stream, opened) {
const existing = this.get(stream.remotePublicKey)

if (existing) {
const keepNew = socket.isInitiator === existing.isInitiator || b4a.compare(socket.publicKey, socket.remotePublicKey) > 0
const keepNew = stream.isInitiator === existing.isInitiator || b4a.compare(stream.publicKey, stream.remotePublicKey) > 0

if (keepNew) {
let closed = false
Expand All @@ -44,42 +44,42 @@ module.exports = class ConnectionPool extends EventEmitter {
.on('close', () => {
if (closed) return

socket
stream
.off('error', noop)
.off('close', onclose)

this._attachSocket(socket, opened)
this._attachStream(stream, opened)
})
.destroy(errors.DUPLICATE_CONNECTION())

socket
stream
.on('error', noop)
.on('close', onclose)
} else {
socket
stream
.on('error', noop)
.destroy(errors.DUPLICATE_CONNECTION())
}

return
}

const session = new ConnectionSession(this, socket)
const session = new ConnectionSession(this, stream)

const keyString = socket.remotePublicKey.toString('hex')
const keyString = stream.remotePublicKey.toString('hex')

if (opened) {
this._connections.set(keyString, session)

socket.on('close', () => {
stream.on('close', () => {
this._connections.delete(keyString)
})

this.emit('connection', socket, session)
this.emit('connection', stream, session)
} else {
this._connecting.set(keyString, session)

socket
stream
.on('error', noop)
.on('close', () => {
if (opened) this._connections.delete(keyString)
Expand All @@ -91,9 +91,9 @@ module.exports = class ConnectionPool extends EventEmitter {
this._connecting.delete(keyString)
this._connections.set(keyString, session)

socket.off('error', noop)
stream.off('error', noop)

this.emit('connection', socket, session)
this.emit('connection', stream, session)
})
}

Expand Down
22 changes: 15 additions & 7 deletions lib/connection-relay.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,16 @@ class ConnectionRelaySessionPair {
}

exports.ConnectionRelayClient = class ConnectionRelayClient {
static clients = new WeakMap()

constructor (stream) {
this.mux = Protomux.from(stream)
this.requests = new Map()

this.channel = this.mux.createChannel({
protocol: 'hyperdht/relay',
id: stream.publicKey
id: stream.publicKey,
onclose: this._onclose.bind(this)
})

this.messages = {
Expand All @@ -143,6 +146,10 @@ exports.ConnectionRelayClient = class ConnectionRelayClient {
return this.mux.stream
}

_onclose () {
ConnectionRelayClient.clients.delete(this.stream)
}

_onrendezvous ({ isInitiator, token, id: remoteId }) {
const keyString = token.toString('hex')

Expand All @@ -160,7 +167,7 @@ exports.ConnectionRelayClient = class ConnectionRelayClient {

request.cb(socket, remotePort, remoteHost)

this.requests.delete(keyString)
request.destroy()
}

rendezvous (isInitiator, token, rawStream, cb) {
Expand All @@ -178,12 +185,14 @@ exports.ConnectionRelayClient = class ConnectionRelayClient {
return request
}

_closeMaybe () {
if (this.requests.size === 0) this.close()
}

close () {
this.channel.close()
}

static clients = new WeakMap()

static attachTo (stream) {
let client = this.clients.get(stream)
if (client) return client
Expand All @@ -206,12 +215,11 @@ class ConnectionRelayRequest {
this.token = token
this.rawStream = rawStream
this.cb = cb
this.destroyed = false
}

destroy () {
if (this.destroyed) return
this.destroyed = true
this.client.requests.delete(this.token.toString('hex'))
this.client._closeMaybe()
}
}

Expand Down
31 changes: 19 additions & 12 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -277,21 +277,28 @@ module.exports = class Server extends EventEmitter {
return hs
}

if (this.relayThrough) {
const relay = ConnectionRelayClient.attachTo(this.dht.connect(this.relayThrough, { pool: this.pool }))

relay.rendezvous(true, hs.relayToken, hs.rawStream, hs.onsocket)

// TODO
return hs
}
if (this.relayThrough || remotePayload.relayThrough) {
let isInitiator, publicKey, token

if (this.relayThrough) {
isInitiator = true
publicKey = this.relayThrough
token = hs.relayToken
} else {
isInitiator = false
publicKey = remotePayload.relayThrough.publicKey
token = remotePayload.relayThrough.token
}

if (remotePayload.relayThrough) {
const { publicKey, token } = remotePayload.relayThrough
const request = ConnectionRelayClient
.attachTo(this.dht.connect(publicKey, { pool: this.pool }))
.rendezvous(isInitiator, token, hs.rawStream, hs.onsocket)

const relay = ConnectionRelayClient.attachTo(this.dht.connect(publicKey, { pool: this.pool }))
const onerror = () => request.destroy()

relay.rendezvous(false, token, hs.rawStream, hs.onsocket)
hs.rawStream
.on('error', onerror)
.on('open', () => hs.rawStream.off('error', onerror))

// TODO
return hs
Expand Down
46 changes: 46 additions & 0 deletions test/relaying.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,49 @@ test('relay connections through node, client and server side', async function (t
await b.destroy()
await c.destroy()
})

test('server does not support connection relaying', async function (t) {
const { bootstrap } = await swarm(t)

const a = new DHT({ bootstrap, quickFirewall: false, ephemeral: true })
const b = new DHT({ bootstrap, quickFirewall: false, ephemeral: true })
const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true })

const lc = t.test('socket lifecycle')
lc.plan(3)

const aServer = a.createServer({
shareLocalAddress: false
}, function () {
t.fail()
})

await aServer.listen()

const bServer = b.createServer({
shareLocalAddress: false
}, function (socket) {
lc.pass('server socket opened')
socket.on('close', () => {
t.pass('server socket closed') // TODO Should close before `server.destroy()`
})
})

await bServer.listen()

const aSocket = c.connect(aServer.publicKey, {
fastOpen: false,
localConnection: false,
relayThrough: bServer.publicKey
})

aSocket.on('error', (err) => {
lc.pass('client socket timed out')
})

await lc

await a.destroy()
await b.destroy()
await c.destroy()
})

0 comments on commit f5544aa

Please sign in to comment.