Skip to content

Commit

Permalink
support suspend/resume on servers (#150)
Browse files Browse the repository at this point in the history
* support suspend/resume on servers

* only one activeQuery at the time

* add test
  • Loading branch information
mafintosh authored Nov 8, 2023
1 parent 487e493 commit 7afd303
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 13 deletions.
71 changes: 58 additions & 13 deletions lib/announcer.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const safetyCatch = require('safety-catch')
const c = require('compact-encoding')
const Signal = require('signal-promise')
const Sleeper = require('./sleeper')
const m = require('./messages')
const Persistent = require('./persistent')
Expand All @@ -12,14 +13,19 @@ module.exports = class Announcer {
this.target = target
this.relays = []
this.stopped = false
this.suspended = false
this.record = c.encode(m.peer, { publicKey: keyPair.publicKey, relayAddresses: [] })

this._refreshing = false
this._closestNodes = null
this._active = null
this._sleeper = new Sleeper()
this._resumed = new Signal()
this._signAnnounce = opts.signAnnounce || Persistent.signAnnounce
this._signUnannounce = opts.signUnannounce || Persistent.signUnannounce
this._updating = null
this._activeQuery = null
this._unannouncing = null

this._serverRelays = [
new Map(),
Expand All @@ -34,14 +40,36 @@ module.exports = class Announcer {
return a.has(id) || b.has(id) || c.has(id)
}

async suspend () {
if (this.suspended) return
this.suspended = true

if (this._activeQuery) this._activeQuery.destroy()

this._sleeper.resume()
if (this._updating) await this._updating

if (this.suspended === false || this.stopped) return
await this._unannounceCurrent()
}

resume () {
if (!this.suspended) return
this.suspended = false

this.refresh()
this._sleeper.resume()
this._resumed.notify()
}

refresh () {
if (this.stopped) return
this._refreshing = true
}

async start () {
if (this.stopped) return
this._active = this._update()
this._active = this._runUpdate()
await this._active
if (this.stopped) return
this._active = this._background()
Expand All @@ -50,8 +78,16 @@ module.exports = class Announcer {
async stop () {
this.stopped = true
this._sleeper.resume()
this._resumed.notify()
await this._active
await this._unannounceAll(this._serverRelays[2].values())
await this._unannounceCurrent()
}

async _unannounceCurrent () {
while (this._unannouncing !== null) await this._unannouncing
const un = this._unannouncing = this._unannounceAll(this._serverRelays[2].values())
await this._unannouncing
if (un === this._unannouncing) this._unannouncing = null
}

async _background () {
Expand All @@ -60,45 +96,54 @@ module.exports = class Announcer {
this._refreshing = false

// ~5min +-
for (let i = 0; i < 100 && !this.stopped && !this._refreshing; i++) {
for (let i = 0; i < 100 && !this.stopped && !this._refreshing && !this.suspended; i++) {
const pings = []

for (const node of this._serverRelays[2].values()) {
pings.push(this.dht.ping(node))
}

const pongs = await allFastest(pings)
if (pongs.length < 3) this.refresh() // we lost too many relay nodes, retry all

if (this.stopped) return

const relays = []
if (!this.suspended && !this._refreshing) await this._sleeper.pause(3000)
}

for (let i = 0; i < pongs.length && relays.length < 3; i++) {
relays.push(pongs[i].from)
}
while (!this.stopped && this.suspended) await this._resumed.wait()

await this._sleeper.pause(3000)
}
if (!this.stopped) await this._update()
if (!this.stopped) await this._runUpdate()
} catch (err) {
safetyCatch(err)
}
}
}

async _runUpdate () {
this._updating = this._update()
await this._updating
this._updating = null
}

async _update () {
while (this._unannouncing) await this._unannouncing

const relays = []

this._cycle()

const q = this.dht.findPeer(this.target, { hash: false, nodes: this._closestNodes })
const q = this._activeQuery = this.dht.findPeer(this.target, { hash: false, nodes: this._closestNodes })

try {
await q.finished()
} catch {
// ignore failures...
}

if (this.stopped) return
this._activeQuery = null

if (this.stopped || this.suspended) return

const ann = []
const top = q.closestReplies.slice(0, 5)
Expand All @@ -108,7 +153,7 @@ module.exports = class Announcer {
}

await Promise.allSettled(ann)
if (this.stopped) return
if (this.stopped || this.suspended) return

this._closestNodes = q.closestNodes
this.relays = relays
Expand Down
13 changes: 13 additions & 0 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module.exports = class Server extends EventEmitter {
this.pool = opts.pool || null
this.createHandshake = opts.createHandshake || defaultCreateHandshake
this.createSecretStream = opts.createSecretStream || defaultCreateSecretStream
this.suspended = false

this._shareLocalAddress = opts.shareLocalAddress !== false
this._reusableSocket = !!opts.reusableSocket
Expand All @@ -50,6 +51,16 @@ module.exports = class Server extends EventEmitter {
this.emit('connection', encryptedSocket)
}

suspend () {
this.suspended = true
return this._announcer ? this._announcer.suspend() : Promise.resolve()
}

resume () {
this.suspended = false
return this._announcer ? this._announcer.resume() : Promise.resolve()
}

address () {
if (!this._keyPair) return null

Expand Down Expand Up @@ -122,6 +133,8 @@ module.exports = class Server extends EventEmitter {
throw err
}

if (this.suspended) await this._announcer.suspend()

if (this.dht.destroyed) throw NODE_DESTROYED()

this.dht.listening.add(this)
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"noise-handshake": "^3.0.0",
"record-cache": "^1.1.1",
"safety-catch": "^1.0.1",
"signal-promise": "^1.0.3",
"sodium-universal": "^4.0.0",
"xache": "^1.1.0"
},
Expand Down
18 changes: 18 additions & 0 deletions test/announces.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,21 @@ test('server listen returns server', async function (t) {
t.ok(result.length > 0, 'has at least one result')
t.alike(result[0].peer.publicKey, server.publicKey)
})

test('server suspends and resumes', async function (t) {
const [a, b] = await swarm(t)
const server = await a.createServer().listen()

t.ok((await toArray(b.findPeer(server.publicKey))).length > 0)

await server.suspend()

t.ok((await toArray(b.findPeer(server.publicKey))).length === 0)

server.resume()

// be nice to have an api for the next announce cycle here...
await new Promise((resolve) => setTimeout(resolve, 1000))

t.ok((await toArray(b.findPeer(server.publicKey))).length > 0)
})

0 comments on commit 7afd303

Please sign in to comment.