Skip to content

Commit

Permalink
Add hotswaps (#597)
Browse files Browse the repository at this point in the history
* wip

* gogo hotswaps

* missing file

* comment

* fix hans bug

* Add metric and test

---------

Co-authored-by: HDegroote <75906619+HDegroote@users.noreply.github.com>
  • Loading branch information
mafintosh and HDegroote authored Nov 18, 2024
1 parent 69d15e9 commit b771d91
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 5 deletions.
1 change: 0 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,6 @@ module.exports = class Hypercore extends EventEmitter {
if (this.opened === false) await this.opening

const activeRequests = (range && range.activeRequests) || this.activeRequests

return this.replicator.addRange(activeRequests, range)
}

Expand Down
60 changes: 60 additions & 0 deletions lib/hotswap-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
const TICKS = 16

module.exports = class HotswapQueue {
constructor () {
this.priorities = [[], [], []]
}

* pick (peer) {
for (let i = 0; i < this.priorities.length; i++) {
// try first one more than second one etc etc
let ticks = (this.priorities.length - i) * TICKS
const queue = this.priorities[i]

for (let j = 0; j < queue.length; j++) {
const r = j + Math.floor(Math.random() * queue.length - j)
const a = queue[j]
const b = queue[r]

if (r !== j) {
queue[(b.hotswap.index = j)] = b
queue[(a.hotswap.index = r)] = a
}

if (hasInflight(b, peer)) continue

yield b

if (--ticks <= 0) break
}
}
}

add (block) {
if (block.hotswap !== null) this.remove(block)
if (block.inflight.length === 0 || block.inflight.length >= 3) return

// TODO: also use other stuff to determine queue prio
const queue = this.priorities[block.inflight.length - 1]

const index = queue.push(block) - 1
block.hotswap = { ref: this, queue, index }
}

remove (block) {
const hotswap = block.hotswap
if (hotswap === null) return

block.hotswap = null
const head = hotswap.queue.pop()
if (head === block) return
hotswap.queue[(head.hotswap.index = hotswap.index)] = head
}
}

function hasInflight (block, peer) {
for (let j = 0; j < block.inflight.length; j++) {
if (block.inflight[j].peer === peer) return true
}
return false
}
51 changes: 49 additions & 2 deletions lib/replicator.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const safetyCatch = require('safety-catch')
const RandomIterator = require('random-array-iterator')
const flatTree = require('flat-tree')
const ReceiverQueue = require('./receiver-queue')
const HotswapQueue = require('./hotswap-queue')
const RemoteBitfield = require('./remote-bitfield')
const { REQUEST_CANCELLED, REQUEST_TIMEOUT, INVALID_CAPABILITY, SNAPSHOT_NOT_AVAILABLE } = require('hypercore-errors')
const m = require('./messages')
Expand Down Expand Up @@ -139,6 +140,7 @@ class BlockRequest extends Attachable {
this.priority = priority
this.inflight = []
this.queued = false
this.hotswap = null
this.tracker = tracker
}

Expand All @@ -150,6 +152,7 @@ class BlockRequest extends Attachable {
}

this.tracker.remove(this.index)
removeHotswap(this)
}
}

Expand Down Expand Up @@ -368,7 +371,8 @@ class Peer {
wireWant: { tx: 0, rx: 0 },
wireBitfield: { tx: 0, rx: 0 },
wireRange: { tx: 0, rx: 0 },
wireExtension: { tx: 0, rx: 0 }
wireExtension: { tx: 0, rx: 0 },
hotswaps: 0
}

this.receiverQueue = new ReceiverQueue()
Expand Down Expand Up @@ -428,6 +432,11 @@ class Peer {
return Math.max(this.inflightRange[0], Math.round(Math.min(this.inflightRange[1], this.inflightRange[0] * scale)))
}

getMaxHotswapInflight () {
const inf = this.getMaxInflight()
return Math.max(16, inf / 2)
}

signalUpgrade () {
if (this._shouldUpdateCanUpgrade() === true) this._updateCanUpgradeAndSync()
else this.sendSync()
Expand Down Expand Up @@ -1187,6 +1196,7 @@ class Peer {
this.replicator._markInflight(b.index)

b.inflight.push(req)
this.replicator.hotswaps.add(b)
this._send(req)
}

Expand Down Expand Up @@ -1421,6 +1431,7 @@ module.exports = class Replicator {
this.downloading = false
this.activeSessions = 0

this.hotswaps = new HotswapQueue()
this.inflightRange = inflightRange || DEFAULT_MAX_INFLIGHT

// Note: nodata and unwant not currently tracked
Expand All @@ -1433,7 +1444,8 @@ module.exports = class Replicator {
wireWant: { tx: 0, rx: 0 },
wireBitfield: { tx: 0, rx: 0 },
wireRange: { tx: 0, rx: 0 },
wireExtension: { tx: 0, rx: 0 }
wireExtension: { tx: 0, rx: 0 },
hotswaps: 0
}

this._attached = new Set()
Expand Down Expand Up @@ -1826,6 +1838,7 @@ module.exports = class Replicator {
// sure to clear them afterwards.
for (const b of clear) {
this._blocks.remove(b.index)
removeHotswap(b)
}
}

Expand All @@ -1834,10 +1847,18 @@ module.exports = class Replicator {
if (b === null) return false

removeInflight(b.inflight, req)
removeHotswap(b)
b.queued = false

b.resolve(value)

if (b.inflight.length > 0) { // if anything is still inflight, cancel it
for (let i = b.inflight.length - 1; i >= 0; i--) {
const req = b.inflight[i]
req.peer._cancelRequest(req)
}
}

return true
}

Expand Down Expand Up @@ -1868,6 +1889,10 @@ module.exports = class Replicator {

if (b === null || removeInflight(b.inflight, req) === false) return

if (removeHotswap(b) === true && b.inflight.length > 0) {
this.hotswaps.add(b)
}

if (b.refs.length > 0 && isBlock === true) {
this._queueBlock(b)
return
Expand Down Expand Up @@ -2162,6 +2187,18 @@ module.exports = class Replicator {
return false
}

_updateHotswap (peer) {
const maxHotswaps = peer.getMaxHotswapInflight()
if (!peer.isActive() || peer.inflight >= maxHotswaps) return

for (const b of this.hotswaps.pick(peer)) {
if (peer._requestBlock(b) === false) continue
peer.stats.hotswaps++
peer.replicator.stats.hotswaps++
if (peer.inflight >= maxHotswaps) break
}
}

_updatePeer (peer) {
if (!peer.isActive() || peer.inflight >= peer.getMaxInflight()) {
return false
Expand Down Expand Up @@ -2231,6 +2268,10 @@ module.exports = class Replicator {
while (this._updatePeer(peer) === true);
while (this._updatePeerNonPrimary(peer) === true);

if (this.peers.length > 1 && this._blocks.isEmpty() === false) {
this._updateHotswap(peer)
}

this._checkUpgradeIfAvailable()
this._maybeResolveIfAvailableRanges()
}
Expand Down Expand Up @@ -2407,6 +2448,12 @@ function matchingRequest (req, data) {
return req.fork === data.fork
}

function removeHotswap (block) {
if (block.hotswap === null) return false
block.hotswap.ref.remove(block)
return true
}

function removeInflight (inf, req) {
const i = inf.indexOf(req)
if (i === -1) return false
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
},
"devDependencies": {
"brittle": "^3.0.0",
"debugging-stream": "^3.1.0",
"hyperswarm": "^4.3.6",
"rache": "^1.0.0",
"random-access-memory": "^6.1.0",
Expand Down
33 changes: 33 additions & 0 deletions test/helpers/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const Hypercore = require('../../')
const RAM = require('random-access-memory')
const DebuggingStream = require('debugging-stream')

exports.create = async function create (...args) {
const core = new Hypercore(RAM, ...args)
Expand Down Expand Up @@ -56,6 +57,38 @@ exports.unreplicate = function unreplicate (streams) {
}))
}

exports.replicateDebugStream = function replicate (a, b, t, opts = {}) {
const { latency, speed, jitter } = opts

const s1 = a.replicate(true, { keepAlive: false, ...opts })
const s2Base = b.replicate(false, { keepAlive: false, ...opts })
const s2 = new DebuggingStream(s2Base, { latency, speed, jitter })

s1.on('error', err => t.comment(`replication stream error (initiator): ${err}`))
s2.on('error', err => t.comment(`replication stream error (responder): ${err}`))

if (opts.teardown !== false) {
t.teardown(async function () {
let missing = 2
await new Promise(resolve => {
s1.on('close', onclose)
s1.destroy()

s2.on('close', onclose)
s2.destroy()

function onclose () {
if (--missing === 0) resolve()
}
})
})
}

s1.pipe(s2).pipe(s1)

return [s1, s2]
}

exports.eventFlush = async function eventFlush () {
await new Promise(resolve => setImmediate(resolve))
}
43 changes: 41 additions & 2 deletions test/replicate.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const test = require('brittle')
const b4a = require('b4a')
const RAM = require('random-access-memory')
const NoiseSecretStream = require('@hyperswarm/secret-stream')
const { create, replicate, unreplicate, eventFlush } = require('./helpers')
const { create, replicate, unreplicate, eventFlush, replicateDebugStream } = require('./helpers')
const { makeStreamPair } = require('./helpers/networking.js')
const Hypercore = require('../')

Expand Down Expand Up @@ -51,9 +51,10 @@ test('basic replication stats', async function (t) {
t.is(aStats.wireExtension.tx, 0, 'wireExtension init 0')
t.is(aStats.wireCancel.rx, 0, 'wireCancel init 0')
t.is(aStats.wireCancel.tx, 0, 'wireCancel init 0')
t.is(aStats.hotswaps, 0, 'hotswaps init 0')

const initStatsLength = [...Object.keys(aStats)].length
t.is(initStatsLength, 8, 'Expected amount of stats')
t.is(initStatsLength, 9, 'Expected amount of stats')

replicate(a, b, t)

Expand Down Expand Up @@ -1778,6 +1779,44 @@ test('replication count should never go negative', async function (t) {
}
})

test('uses hotswaps to avoid long download tail', async t => {
const core = await create()
const slowCore = await create(core.key)

const batch = []
while (batch.length < 100) {
batch.push(Buffer.allocUnsafe(60000))
}
await core.append(batch)

replicate(core, slowCore, t)
await slowCore.download({ start: 0, end: core.length }).done()

t.is(slowCore.contiguousLength, 100, 'sanity check')

const peerCore = await create(core.key)
await peerCore.ready()
const [fastStream] = replicateDebugStream(core, peerCore, t, { speed: 10_000_000 })
const [slowStream] = replicateDebugStream(slowCore, peerCore, t, { speed: 1_000_000 })
const fastKey = fastStream.publicKey
const slowKey = slowStream.publicKey
const peerKey = fastStream.remotePublicKey
t.alike(peerKey, slowStream.remotePublicKey, 'sanity check')

await peerCore.download({ start: 0, end: core.length }).done()

const fastPeer = peerCore.replicator.peers.filter(
p => b4a.equals(p.stream.remotePublicKey, fastKey))[0]
const slowPeer = peerCore.replicator.peers.filter(
p => b4a.equals(p.stream.remotePublicKey, slowKey))[0]

t.ok(fastPeer.stats.hotswaps > 0, 'hotswaps happened for fast peer')
t.ok(fastPeer.stats.hotswaps > 0, 'No hotswaps happened for slow peer')
t.ok(slowPeer.stats.wireCancel.tx > 0, 'slow peer cancelled requests')
t.ok(fastPeer.stats.wireData.rx > slowPeer.stats.wireData.rx, 'sanity check: received more data from fast peer')
t.ok(slowPeer.stats.wireData.rx > 0, 'sanity check: still received data from slow peer')
})

async function waitForRequestBlock (core) {
while (true) {
const reqBlock = core.replicator._inflight._requests.find(req => req && req.block)
Expand Down

0 comments on commit b771d91

Please sign in to comment.