diff --git a/lib/announcer.js b/lib/announcer.js index 3d48cf79..e2e8bba6 100644 --- a/lib/announcer.js +++ b/lib/announcer.js @@ -1,5 +1,6 @@ const safetyCatch = require('safety-catch') const c = require('compact-encoding') +const Signal = require('signal-promise') const Sleeper = require('./sleeper') const m = require('./messages') const Persistent = require('./persistent') @@ -12,14 +13,19 @@ module.exports = class Announcer { this.target = target this.relays = [] this.stopped = false + this.suspended = false this.record = c.encode(m.peer, { publicKey: keyPair.publicKey, relayAddresses: [] }) this._refreshing = false this._closestNodes = null this._active = null this._sleeper = new Sleeper() + this._resumed = new Signal() this._signAnnounce = opts.signAnnounce || Persistent.signAnnounce this._signUnannounce = opts.signUnannounce || Persistent.signUnannounce + this._updating = null + this._activeQuery = null + this._unannouncing = null this._serverRelays = [ new Map(), @@ -34,6 +40,28 @@ module.exports = class Announcer { return a.has(id) || b.has(id) || c.has(id) } + async suspend () { + if (this.suspended) return + this.suspended = true + + if (this._activeQuery) this._activeQuery.destroy() + + this._sleeper.resume() + if (this._updating) await this._updating + + if (this.suspended === false || this.stopped) return + await this._unannounceCurrent() + } + + resume () { + if (!this.suspended) return + this.suspended = false + + this.refresh() + this._sleeper.resume() + this._resumed.notify() + } + refresh () { if (this.stopped) return this._refreshing = true @@ -41,7 +69,7 @@ module.exports = class Announcer { async start () { if (this.stopped) return - this._active = this._update() + this._active = this._runUpdate() await this._active if (this.stopped) return this._active = this._background() @@ -50,8 +78,16 @@ module.exports = class Announcer { async stop () { this.stopped = true this._sleeper.resume() + this._resumed.notify() await this._active - await this._unannounceAll(this._serverRelays[2].values()) + await this._unannounceCurrent() + } + + async _unannounceCurrent () { + while (this._unannouncing !== null) await this._unannouncing + const un = this._unannouncing = this._unannounceAll(this._serverRelays[2].values()) + await this._unannouncing + if (un === this._unannouncing) this._unannouncing = null } async _background () { @@ -60,7 +96,7 @@ module.exports = class Announcer { this._refreshing = false // ~5min +- - for (let i = 0; i < 100 && !this.stopped && !this._refreshing; i++) { + for (let i = 0; i < 100 && !this.stopped && !this._refreshing && !this.suspended; i++) { const pings = [] for (const node of this._serverRelays[2].values()) { @@ -68,29 +104,36 @@ module.exports = class Announcer { } const pongs = await allFastest(pings) + if (pongs.length < 3) this.refresh() // we lost too many relay nodes, retry all + if (this.stopped) return - const relays = [] + if (!this.suspended && !this._refreshing) await this._sleeper.pause(3000) + } - for (let i = 0; i < pongs.length && relays.length < 3; i++) { - relays.push(pongs[i].from) - } + while (!this.stopped && this.suspended) await this._resumed.wait() - await this._sleeper.pause(3000) - } - if (!this.stopped) await this._update() + if (!this.stopped) await this._runUpdate() } catch (err) { safetyCatch(err) } } } + async _runUpdate () { + this._updating = this._update() + await this._updating + this._updating = null + } + async _update () { + while (this._unannouncing) await this._unannouncing + const relays = [] this._cycle() - const q = this.dht.findPeer(this.target, { hash: false, nodes: this._closestNodes }) + const q = this._activeQuery = this.dht.findPeer(this.target, { hash: false, nodes: this._closestNodes }) try { await q.finished() @@ -98,7 +141,9 @@ module.exports = class Announcer { // ignore failures... } - if (this.stopped) return + this._activeQuery = null + + if (this.stopped || this.suspended) return const ann = [] const top = q.closestReplies.slice(0, 5) @@ -108,7 +153,7 @@ module.exports = class Announcer { } await Promise.allSettled(ann) - if (this.stopped) return + if (this.stopped || this.suspended) return this._closestNodes = q.closestNodes this.relays = relays diff --git a/lib/server.js b/lib/server.js index d1feb7ce..9e007fbd 100644 --- a/lib/server.js +++ b/lib/server.js @@ -31,6 +31,7 @@ module.exports = class Server extends EventEmitter { this.pool = opts.pool || null this.createHandshake = opts.createHandshake || defaultCreateHandshake this.createSecretStream = opts.createSecretStream || defaultCreateSecretStream + this.suspended = false this._shareLocalAddress = opts.shareLocalAddress !== false this._reusableSocket = !!opts.reusableSocket @@ -50,6 +51,16 @@ module.exports = class Server extends EventEmitter { this.emit('connection', encryptedSocket) } + suspend () { + this.suspended = true + return this._announcer ? this._announcer.suspend() : Promise.resolve() + } + + resume () { + this.suspended = false + return this._announcer ? this._announcer.resume() : Promise.resolve() + } + address () { if (!this._keyPair) return null @@ -122,6 +133,8 @@ module.exports = class Server extends EventEmitter { throw err } + if (this.suspended) await this._announcer.suspend() + if (this.dht.destroyed) throw NODE_DESTROYED() this.dht.listening.add(this) diff --git a/package.json b/package.json index 5f86cce1..573d3574 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "noise-handshake": "^3.0.0", "record-cache": "^1.1.1", "safety-catch": "^1.0.1", + "signal-promise": "^1.0.3", "sodium-universal": "^4.0.0", "xache": "^1.1.0" }, diff --git a/test/announces.js b/test/announces.js index dae625ad..aebe8bfd 100644 --- a/test/announces.js +++ b/test/announces.js @@ -84,3 +84,21 @@ test('server listen returns server', async function (t) { t.ok(result.length > 0, 'has at least one result') t.alike(result[0].peer.publicKey, server.publicKey) }) + +test('server suspends and resumes', async function (t) { + const [a, b] = await swarm(t) + const server = await a.createServer().listen() + + t.ok((await toArray(b.findPeer(server.publicKey))).length > 0) + + await server.suspend() + + t.ok((await toArray(b.findPeer(server.publicKey))).length === 0) + + server.resume() + + // be nice to have an api for the next announce cycle here... + await new Promise((resolve) => setTimeout(resolve, 1000)) + + t.ok((await toArray(b.findPeer(server.publicKey))).length > 0) +})