Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server relay addresses #151

Merged
merged 4 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 29 additions & 20 deletions lib/announcer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ const m = require('./messages')
const Persistent = require('./persistent')
const { COMMANDS } = require('./constants')

const MIN_ACTIVE = 3

module.exports = class Announcer {
constructor (dht, keyPair, target, opts = {}) {
this.dht = dht
this.keyPair = keyPair
this.target = target
this.relays = []
this.relayAddresses = []
this.stopped = false
this.suspended = false
this.record = c.encode(m.peer, { publicKey: keyPair.publicKey, relayAddresses: [] })
Expand Down Expand Up @@ -103,8 +106,8 @@ module.exports = class Announcer {
pings.push(this.dht.ping(node))
}

const pongs = await allFastest(pings)
if (pongs.length < 3) this.refresh() // we lost too many relay nodes, retry all
const active = await resolved(pings)
if (active < MIN_ACTIVE) this.refresh() // we lost too many relay nodes, retry all

if (this.stopped) return

Expand All @@ -129,8 +132,6 @@ module.exports = class Announcer {
async _update () {
while (this._unannouncing) await this._unannouncing

const relays = []

this._cycle()

const q = this._activeQuery = this.dht.findPeer(this.target, { hash: false, nodes: this._closestNodes })
Expand All @@ -146,17 +147,26 @@ module.exports = class Announcer {
if (this.stopped || this.suspended) return

const ann = []
const top = q.closestReplies.slice(0, 5)
const replies = pickBest(q.closestReplies)

const relays = []
const relayAddresses = []

if (!this.dht.firewalled) {
const addr = this.dht.remoteAddress()
if (addr) relayAddresses.push(addr)
}

for (const msg of top) {
ann.push(this._commit(msg, relays))
for (const msg of replies) {
ann.push(this._commit(msg, relays, relayAddresses))
}

await Promise.allSettled(ann)
if (this.stopped || this.suspended) return

this._closestNodes = q.closestNodes
this.relays = relays
this.relayAddresses = relayAddresses

const removed = []
for (const [key, value] of this._serverRelays[1]) {
Expand Down Expand Up @@ -201,7 +211,7 @@ module.exports = class Announcer {
}, to)
}

async _commit (msg, relays) {
async _commit (msg, relays, relayAddresses) {
const ann = {
peer: {
publicKey: this.keyPair.publicKey,
Expand All @@ -222,15 +232,10 @@ module.exports = class Announcer {

if (res.error !== 0) return

this._serverRelays[2].set(msg.from.host + ':' + msg.from.port, msg.from)

if (relays.length < 3) {
relays.push({ relayAddress: msg.from, peerAddress: msg.to })
}
if (relayAddresses.length < 3) relayAddresses.push({ host: msg.from.host, port: msg.from.port })
relays.push({ relayAddress: msg.from, peerAddress: msg.to })

if (relays.length === 3) {
this.relays = relays
}
this._serverRelays[2].set(msg.from.host + ':' + msg.from.port, msg.from)
}

_cycle () {
Expand All @@ -242,21 +247,25 @@ module.exports = class Announcer {
}
}

function allFastest (ps) {
const result = []
function resolved (ps) {
let replied = 0
let ticks = ps.length + 1

return new Promise((resolve) => {
for (const p of ps) p.then(push, tick)
tick()

function push (v) {
result.push(v)
replied++
tick()
}

function tick () {
if (--ticks === 0) resolve(result)
if (--ticks === 0) resolve(replied)
}
})
}

function pickBest (replies) { // TODO: pick the ones closest to us RTT wise
return replies.slice(0, 3)
}
47 changes: 30 additions & 17 deletions lib/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ module.exports = function connect (dht, publicKey, opts = {}) {
const c = {
dht,
session: dht.session(),
relayAddresses: opts.relayAddresses || [],
pool,
round: 0,
target: hash(publicKey),
Expand Down Expand Up @@ -284,39 +285,51 @@ async function holepunch (c, opts) {
}

async function findAndConnect (c, opts) {
c.query = c.dht.findPeer(c.target, { hash: false, session: c.session })
let attempts = 0
let closestNodes = (opts.relayAddresses && opts.relayAddresses.length) ? opts.relayAddresses : null

if (c.dht._persistent) { // check if we know the route ourself...
const route = c.dht._router.get(c.target)
if (route && route.relay !== null) closestNodes = [{ host: route.relay.host, port: route.relay.port }]
}

// 2 is how many parallel connect attempts we want to do, we can make this configurable
const sem = new Semaphore(2)
let attempts = 0
const signal = sem.signal.bind(sem)
const tries = closestNodes !== null ? 2 : 1

try {
for await (const data of c.query) {
await sem.wait()
if (isDone(c)) return
for (let i = 0; i < tries && !isDone(c) && !c.connect; i++) {
c.query = c.dht.findPeer(c.target, { hash: false, session: c.session, closestNodes, onlyClosestNodes: closestNodes !== null })

if (c.connect) {
sem.signal()
break
for await (const data of c.query) {
await sem.wait()
if (isDone(c)) return

if (c.connect) {
sem.signal()
break
}

attempts++
connectThroughNode(c, data.from, null).then(signal, signal)
}

attempts++
connectThroughNode(c, data.from, null).then(signal, signal)
closestNodes = null
}

c.query = null
if (isDone(c)) return

// flush the semaphore
await sem.flush()
if (isDone(c)) return
} catch (err) {
c.query = null
maybeDestroyEncryptedSocket(c, err)
return
}

c.query = null
if (isDone(c)) return

// flush the semaphore
await sem.flush()
if (isDone(c)) return

if (!c.connect) {
maybeDestroyEncryptedSocket(c, attempts ? PEER_CONNECTION_FAILED() : PEER_NOT_FOUND())
}
Expand Down
5 changes: 4 additions & 1 deletion lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ module.exports = class Server extends EventEmitter {
this.dht = dht
this.target = null

this.relayAddresses = null // TODO: populate this
this.closed = false
this.firewall = opts.firewall || (() => false)
this.holepunch = opts.holepunch || (() => true)
Expand All @@ -47,6 +46,10 @@ module.exports = class Server extends EventEmitter {
return this._keyPair && this._keyPair.publicKey
}

get relayAddresses () {
return this._announcer ? this._announcer.relayAddresses : []
}

onconnection (encryptedSocket) {
this.emit('connection', encryptedSocket)
}
Expand Down
56 changes: 56 additions & 0 deletions test/announces.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,59 @@ test('server suspends and resumes', async function (t) {

t.ok((await toArray(b.findPeer(server.publicKey))).length > 0)
})

test('server announces relay addrs', async function (t) {
const [, a, b] = await swarm(t)

// ensure dht is fully connected...
await a.findNode(a.id).finished()
await b.findNode(b.id).finished()

const server = await a.createServer().listen()
const q = b.findPeer(server.publicKey)
const nodes = await toArray(q)

for (const addr of server.relayAddresses) {
let found = false

for (const node of nodes) {
found = node.from.port === addr.port && node.from.host === addr.host
if (found) break
}

if (!found) {
const { host, port } = b.remoteAddress()
found = port === addr.port && host === addr.host
}

if (!found) {
const { host, port } = a.remoteAddress()
found = port === addr.port && host === addr.host
}

t.ok(found, 'found addr')
}
})

test('connect when we relay ourself', async function (t) {
const testnet = await swarm(t)

const server = await testnet.nodes[1].createServer(function (sock) {
sock.resume()
sock.end()
}).listen()

const addr = server.relayAddresses[server.relayAddresses.length - 1]

for (const node of testnet.nodes) {
const { host, port } = node.remoteAddress()
if (addr.port === port && addr.host === host) {
const sock = node.connect(server.publicKey)
await sock.opened
t.pass('worked')
sock.end()
await new Promise(resolve => sock.once('close', resolve))
break
}
}
})