Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Hold off on closing channels for now
Browse files Browse the repository at this point in the history
kasperisager committed Sep 7, 2023

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 2e4fe6f commit af335c1
Showing 4 changed files with 115 additions and 36 deletions.
10 changes: 5 additions & 5 deletions lib/connect.js
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ const {
SERVER_ERROR,
SERVER_INCOMPATIBLE
} = require('./errors')
const { ConnectionRelayClient } = require('./connection-relay')
const connectionRelay = require('./connection-relay')

module.exports = function connect (dht, publicKey, opts = {}) {
const pool = opts.pool || null
@@ -53,7 +53,7 @@ module.exports = function connect (dht, publicKey, opts = {}) {
requesting: false,
lan: opts.localConnection !== false,
firewall: FIREWALL.UNKNOWN,
rawStream: dht._rawStreams.add({ framed: true, firewall }),
rawStream: dht.createRawStream({ framed: true, firewall }),
connect: null,
query: null,
puncher: null,
@@ -64,7 +64,7 @@ module.exports = function connect (dht, publicKey, opts = {}) {
onsocket: null,
sleeper: new Sleeper(),
relayThrough: opts.relayThrough || null,
relayToken: opts.relayThrough ? ConnectionRelayClient.token() : null,
relayToken: opts.relayThrough ? connectionRelay.token() : null,
encryptedSocket
}

@@ -389,8 +389,8 @@ async function connectThroughNode (c, address, socket) {
token = c.relayToken
}

const request = ConnectionRelayClient
.attachTo(c.dht.connect(publicKey, { pool: c.pool }))
const request = connectionRelay.Client
.from(c.dht.connect(publicKey, { pool: c.pool }))
.rendezvous(isInitiator, token, c.rawStream, c.onsocket)

const onerror = () => request.destroy()
52 changes: 27 additions & 25 deletions lib/connection-relay.js
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ const sodium = require('sodium-universal')
const b4a = require('b4a')
const c = require('compact-encoding')

exports.ConnectionRelayServer = class ConnectionRelayServer {
exports.Server = class ConnectionRelayServer {
constructor (dht) {
this.dht = dht
this.pairs = new Map()
@@ -15,11 +15,7 @@ exports.ConnectionRelayServer = class ConnectionRelayServer {

this.sessions.add(session)

stream
.on('close', () => {
session.close()
this.sessions.delete(session)
})
stream.on('close', () => session.close())
}
}

@@ -30,7 +26,8 @@ class ConnectionRelaySession {

this.channel = this.mux.createChannel({
protocol: 'hyperdht/relay',
id: stream.remotePublicKey
id: stream.remotePublicKey,
onclose: this._onclose.bind(this)
})

this.messages = {
@@ -47,6 +44,10 @@ class ConnectionRelaySession {
return this.server.dht
}

_onclose () {
this.server.sessions.delete(this)
}

_onrendezvous ({ isInitiator, token, id: remoteId }) {
const keyString = token.toString('hex')

@@ -70,7 +71,7 @@ class ConnectionRelaySession {
for (const session of pair.sessions) {
const { session: { dht }, remoteId } = session

const rawStream = dht._rawStreams.add({
const rawStream = dht.createRawStream({
framed: true,
firewall (socket, port, host) {
rawStream.connect(socket, remoteId, port, host)
@@ -97,7 +98,8 @@ class ConnectionRelaySession {
}

close () {
this.channel.close()
// TODO
// this.channel.close()
}
}

@@ -115,9 +117,17 @@ class ConnectionRelaySessionPair {
}
}

exports.ConnectionRelayClient = class ConnectionRelayClient {
exports.Client = class ConnectionRelayClient {
static clients = new WeakMap()

static from (stream) {
let client = this.clients.get(stream)
if (client) return client
client = new ConnectionRelayClient(stream)
this.clients.set(stream, client)
return client
}

constructor (stream) {
this.mux = Protomux.from(stream)
this.requests = new Map()
@@ -182,26 +192,13 @@ exports.ConnectionRelayClient = class ConnectionRelayClient {
}

_closeMaybe () {
if (this.requests.size === 0) this.close()
// TODO
// if (this.requests.size === 0) this.close()
}

close () {
this.channel.close()
}

static attachTo (stream) {
let client = this.clients.get(stream)
if (client) return client
client = new ConnectionRelayClient(stream)
this.clients.set(stream, client)
return client
}

static token () {
const token = b4a.allocUnsafe(32)
sodium.randombytes_buf(token)
return token
}
}

class ConnectionRelayRequest {
@@ -219,6 +216,11 @@ class ConnectionRelayRequest {
}
}

exports.token = function token (buf = b4a.allocUnsafe(32)) {
sodium.randombytes_buf(buf)
return buf
}

const m = exports.messages = {}

m.rendezvous = {
12 changes: 6 additions & 6 deletions lib/server.js
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ const { FIREWALL, ERROR } = require('./constants')
const { hash } = require('./crypto')
const SecurePayload = require('./secure-payload')
const Holepuncher = require('./holepuncher')
const { ConnectionRelayServer, ConnectionRelayClient } = require('./connection-relay')
const connectionRelay = require('./connection-relay')
const DebuggingStream = require('debugging-stream')
const { isPrivate } = require('bogon')
const { ALREADY_LISTENING, NODE_DESTROYED } = require('./errors')
@@ -37,7 +37,7 @@ module.exports = class Server extends EventEmitter {
this._keyPair = null
this._announcer = null
this._connects = new Map()
this._connectionRelay = opts.connectionRelay ? new ConnectionRelayServer(dht) : null
this._connectionRelay = opts.connectionRelay ? new connectionRelay.Server(dht) : null
this._holepunches = []
this._listening = false
this._closing = null
@@ -187,7 +187,7 @@ module.exports = class Server extends EventEmitter {
if (ourLocalAddrs) addresses.push(...ourLocalAddrs)

if (error === ERROR.NONE) {
hs.rawStream = this.dht._rawStreams.add({
hs.rawStream = this.dht.createRawStream({
framed: true,
firewall (socket, port, host) {
hs.onsocket(socket, port, host)
@@ -238,7 +238,7 @@ module.exports = class Server extends EventEmitter {
}
}

if (this.relayThrough) hs.relayToken = ConnectionRelayClient.token()
if (this.relayThrough) hs.relayToken = connectionRelay.token()

try {
hs.reply = await handshake.send({
@@ -290,8 +290,8 @@ module.exports = class Server extends EventEmitter {
token = remotePayload.relayThrough.token
}

const request = ConnectionRelayClient
.attachTo(this.dht.connect(publicKey, { pool: this.pool }))
const request = connectionRelay.Client
.from(this.dht.connect(publicKey, { pool: this.pool }))
.rendezvous(isInitiator, token, hs.rawStream, hs.onsocket)

const onerror = () => request.destroy()
77 changes: 77 additions & 0 deletions test/relaying.js
Original file line number Diff line number Diff line change
@@ -177,6 +177,83 @@ test('relay connections through node, client and server side', async function (t
await c.destroy()
})

test('relay several connections through node with pool', async function (t) {
const { bootstrap } = await swarm(t)

const a = new DHT({ bootstrap, quickFirewall: false, ephemeral: true })
const b = new DHT({ bootstrap, quickFirewall: false, ephemeral: true })
const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true })

const lc = t.test('socket lifecycle')
lc.plan(10)

const aServer = a.createServer({
shareLocalAddress: false,
holepunch () {
return false
}
}, function (socket) {
lc.pass('server socket opened')
socket
.on('data', (data) => {
lc.alike(data, Buffer.from('hello world'))
})
.on('close', () => {
lc.pass('server socket closed')
})
.end()
})

await aServer.listen()

const bServer = b.createServer({
shareLocalAddress: false,
connectionRelay: true
})

await bServer.listen()

const pool = c.pool()

const aSocket = c.connect(aServer.publicKey, {
fastOpen: false,
localConnection: false,
relayThrough: bServer.publicKey,
pool
})

aSocket
.on('open', () => {
lc.pass('client socket opened')
})
.on('close', () => {
lc.pass('client socket closed')

const aSocket = c.connect(aServer.publicKey, {
fastOpen: false,
localConnection: false,
relayThrough: bServer.publicKey,
pool
})

aSocket
.on('open', () => {
lc.pass('client socket opened')
})
.on('close', () => {
lc.pass('client socket closed')
})
.end('hello world')
})
.end('hello world')

await lc

await a.destroy()
await b.destroy()
await c.destroy()
})

test('server does not support connection relaying', async function (t) {
const { bootstrap } = await swarm(t)

0 comments on commit af335c1

Please sign in to comment.