diff --git a/lib/announcer.js b/lib/announcer.js index e2e8bba6..6c0a0cc6 100644 --- a/lib/announcer.js +++ b/lib/announcer.js @@ -6,12 +6,15 @@ const m = require('./messages') const Persistent = require('./persistent') const { COMMANDS } = require('./constants') +const MIN_ACTIVE = 3 + module.exports = class Announcer { constructor (dht, keyPair, target, opts = {}) { this.dht = dht this.keyPair = keyPair this.target = target this.relays = [] + this.relayAddresses = [] this.stopped = false this.suspended = false this.record = c.encode(m.peer, { publicKey: keyPair.publicKey, relayAddresses: [] }) @@ -103,8 +106,8 @@ module.exports = class Announcer { pings.push(this.dht.ping(node)) } - const pongs = await allFastest(pings) - if (pongs.length < 3) this.refresh() // we lost too many relay nodes, retry all + const active = await resolved(pings) + if (active < MIN_ACTIVE) this.refresh() // we lost too many relay nodes, retry all if (this.stopped) return @@ -129,8 +132,6 @@ module.exports = class Announcer { async _update () { while (this._unannouncing) await this._unannouncing - const relays = [] - this._cycle() const q = this._activeQuery = this.dht.findPeer(this.target, { hash: false, nodes: this._closestNodes }) @@ -146,10 +147,18 @@ module.exports = class Announcer { if (this.stopped || this.suspended) return const ann = [] - const top = q.closestReplies.slice(0, 5) + const replies = pickBest(q.closestReplies) + + const relays = [] + const relayAddresses = [] + + if (!this.dht.firewalled) { + const addr = this.dht.remoteAddress() + if (addr) relayAddresses.push(addr) + } - for (const msg of top) { - ann.push(this._commit(msg, relays)) + for (const msg of replies) { + ann.push(this._commit(msg, relays, relayAddresses)) } await Promise.allSettled(ann) @@ -157,6 +166,7 @@ module.exports = class Announcer { this._closestNodes = q.closestNodes this.relays = relays + this.relayAddresses = relayAddresses const removed = [] for (const [key, value] of this._serverRelays[1]) { @@ -201,7 +211,7 @@ module.exports = class Announcer { }, to) } - async _commit (msg, relays) { + async _commit (msg, relays, relayAddresses) { const ann = { peer: { publicKey: this.keyPair.publicKey, @@ -222,15 +232,10 @@ module.exports = class Announcer { if (res.error !== 0) return - this._serverRelays[2].set(msg.from.host + ':' + msg.from.port, msg.from) - - if (relays.length < 3) { - relays.push({ relayAddress: msg.from, peerAddress: msg.to }) - } + if (relayAddresses.length < 3) relayAddresses.push({ host: msg.from.host, port: msg.from.port }) + relays.push({ relayAddress: msg.from, peerAddress: msg.to }) - if (relays.length === 3) { - this.relays = relays - } + this._serverRelays[2].set(msg.from.host + ':' + msg.from.port, msg.from) } _cycle () { @@ -242,8 +247,8 @@ module.exports = class Announcer { } } -function allFastest (ps) { - const result = [] +function resolved (ps) { + let replied = 0 let ticks = ps.length + 1 return new Promise((resolve) => { @@ -251,12 +256,16 @@ function allFastest (ps) { tick() function push (v) { - result.push(v) + replied++ tick() } function tick () { - if (--ticks === 0) resolve(result) + if (--ticks === 0) resolve(replied) } }) } + +function pickBest (replies) { // TODO: pick the ones closest to us RTT wise + return replies.slice(0, 3) +} diff --git a/lib/connect.js b/lib/connect.js index 17658031..a9500315 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -45,6 +45,7 @@ module.exports = function connect (dht, publicKey, opts = {}) { const c = { dht, session: dht.session(), + relayAddresses: opts.relayAddresses || [], pool, round: 0, target: hash(publicKey), @@ -284,39 +285,51 @@ async function holepunch (c, opts) { } async function findAndConnect (c, opts) { - c.query = c.dht.findPeer(c.target, { hash: false, session: c.session }) + let attempts = 0 + let closestNodes = (opts.relayAddresses && opts.relayAddresses.length) ? opts.relayAddresses : null + + if (c.dht._persistent) { // check if we know the route ourself... + const route = c.dht._router.get(c.target) + if (route && route.relay !== null) closestNodes = [{ host: route.relay.host, port: route.relay.port }] + } // 2 is how many parallel connect attempts we want to do, we can make this configurable const sem = new Semaphore(2) - let attempts = 0 const signal = sem.signal.bind(sem) + const tries = closestNodes !== null ? 2 : 1 try { - for await (const data of c.query) { - await sem.wait() - if (isDone(c)) return + for (let i = 0; i < tries && !isDone(c) && !c.connect; i++) { + c.query = c.dht.findPeer(c.target, { hash: false, session: c.session, closestNodes, onlyClosestNodes: closestNodes !== null }) - if (c.connect) { - sem.signal() - break + for await (const data of c.query) { + await sem.wait() + if (isDone(c)) return + + if (c.connect) { + sem.signal() + break + } + + attempts++ + connectThroughNode(c, data.from, null).then(signal, signal) } - attempts++ - connectThroughNode(c, data.from, null).then(signal, signal) + closestNodes = null } + + c.query = null + if (isDone(c)) return + + // flush the semaphore + await sem.flush() + if (isDone(c)) return } catch (err) { c.query = null maybeDestroyEncryptedSocket(c, err) return } - c.query = null - if (isDone(c)) return - - // flush the semaphore - await sem.flush() - if (isDone(c)) return - if (!c.connect) { maybeDestroyEncryptedSocket(c, attempts ? PEER_CONNECTION_FAILED() : PEER_NOT_FOUND()) } diff --git a/lib/server.js b/lib/server.js index 4a0e9811..bda29d6a 100644 --- a/lib/server.js +++ b/lib/server.js @@ -23,7 +23,6 @@ module.exports = class Server extends EventEmitter { this.dht = dht this.target = null - this.relayAddresses = null // TODO: populate this this.closed = false this.firewall = opts.firewall || (() => false) this.holepunch = opts.holepunch || (() => true) @@ -47,6 +46,10 @@ module.exports = class Server extends EventEmitter { return this._keyPair && this._keyPair.publicKey } + get relayAddresses () { + return this._announcer ? this._announcer.relayAddresses : [] + } + onconnection (encryptedSocket) { this.emit('connection', encryptedSocket) } diff --git a/test/announces.js b/test/announces.js index aebe8bfd..077d01b8 100644 --- a/test/announces.js +++ b/test/announces.js @@ -102,3 +102,59 @@ test('server suspends and resumes', async function (t) { t.ok((await toArray(b.findPeer(server.publicKey))).length > 0) }) + +test('server announces relay addrs', async function (t) { + const [, a, b] = await swarm(t) + + // ensure dht is fully connected... + await a.findNode(a.id).finished() + await b.findNode(b.id).finished() + + const server = await a.createServer().listen() + const q = b.findPeer(server.publicKey) + const nodes = await toArray(q) + + for (const addr of server.relayAddresses) { + let found = false + + for (const node of nodes) { + found = node.from.port === addr.port && node.from.host === addr.host + if (found) break + } + + if (!found) { + const { host, port } = b.remoteAddress() + found = port === addr.port && host === addr.host + } + + if (!found) { + const { host, port } = a.remoteAddress() + found = port === addr.port && host === addr.host + } + + t.ok(found, 'found addr') + } +}) + +test('connect when we relay ourself', async function (t) { + const testnet = await swarm(t) + + const server = await testnet.nodes[1].createServer(function (sock) { + sock.resume() + sock.end() + }).listen() + + const addr = server.relayAddresses[server.relayAddresses.length - 1] + + for (const node of testnet.nodes) { + const { host, port } = node.remoteAddress() + if (addr.port === port && addr.host === host) { + const sock = node.connect(server.publicKey) + await sock.opened + t.pass('worked') + sock.end() + await new Promise(resolve => sock.once('close', resolve)) + break + } + } +})