Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rocksdb: decouple session state from bitfield #601

Merged
merged 13 commits into from
Nov 27, 2024
17 changes: 11 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class Hypercore extends EventEmitter {
if (opts.userData) {
const batch = this.state.storage.createWriteBatch()
for (const [key, value] of Object.entries(opts.userData)) {
this.core.setUserData(batch, key, value)
batch.setUserData(key, value)
}
await batch.flush()
}
Expand Down Expand Up @@ -612,10 +612,15 @@ class Hypercore extends EventEmitter {
if (this.opened === false) await this.opening
if (!isValidIndex(start) || !isValidIndex(end)) throw ASSERTION('has range is invalid')

if (end === start + 1) return this.state.bitfield.get(start)
let count = 0

const i = this.state.bitfield.firstUnset(start)
return i === -1 || i >= end
const stream = this.state.storage.createBlockStream({ gte: start, lt: end })
for await (const block of stream) {
if (block === null) return false
count++
}

return count === (end - start)
}

async get (index, opts) {
Expand Down Expand Up @@ -690,7 +695,7 @@ class Hypercore extends EventEmitter {

// lets check the bitfield to see if we got it during the above async calls
// this is the last resort before replication, so always safe.
if (this.core.state.bitfield.get(index)) {
if (this.core.bitfield.get(index)) {
return readBlock(this.state.storage.createReadBatch(), index)
}

Expand Down Expand Up @@ -954,7 +959,7 @@ function initOnce (session, storage, key, opts) {
notDownloadingLinger: opts.notDownloadingLinger,
allowFork: opts.allowFork !== false,
inflightRange: opts.inflightRange,
compat: opts.compat,
compat: opts.compat === true,
force: opts.force,
createIfMissing: opts.createIfMissing,
discoveryKey: opts.discoveryKey,
Expand Down
68 changes: 34 additions & 34 deletions lib/audit.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
const hypercoreCrypto = require('hypercore-crypto')
const flat = require('flat-tree')
const c = require('compact-encoding')
const b4a = require('b4a')

const BitInterlude = require('./bit-interlude')

// this is optimised for speed over mem atm
// can be tweaked in the future

module.exports = async function auditCore (core, update) {
module.exports = async function auditCore (core, storage) {
const corrections = {
tree: 0,
blocks: 0
}

const length = core.header.tree.length

const data = await readFullStorage(core.blocks.storage)
const tree = await readFullStorage(core.tree.storage)
const bitfield = new BitInterlude()

const data = await readAllBlocks(core.storage)
const tree = await readAllTreeNodes(core.tree.storage)

const valid = new Uint8Array(Math.ceil(tree.byteLength / 40))
const stack = []
Expand All @@ -30,8 +33,8 @@ module.exports = async function auditCore (core, update) {
if ((node.index & 1) === 0) continue

const [left, right] = flat.children(node.index)
const leftNode = getNode(left)
const rightNode = getNode(right)
const leftNode = tree.get(left)
const rightNode = tree.get(right)

stack.push(leftNode, rightNode)

Expand All @@ -44,8 +47,8 @@ module.exports = async function auditCore (core, update) {
}
}

if (leftNode.size) clearNode(leftNode)
if (rightNode.size) clearNode(rightNode)
if (leftNode.size) clearNode(left)
if (rightNode.size) clearNode(right)
}

let i = 0
Expand All @@ -65,55 +68,52 @@ module.exports = async function auditCore (core, update) {
try {
nextOffset = await core.tree.byteOffset(i * 2)
} catch {
update.bitfield.set(i, false)
storage.deleteBlock(i)
bitfield.set(i, false)
corrections.blocks++
i++
continue
}
}

const node = getNode(i * 2)
const blk = data.subarray(nextOffset, nextOffset + node.size)
const node = tree.get(i * 2)
const blk = data.get(i)
const hash = hypercoreCrypto.data(blk)

nextOffset += blk.byteLength

if (!b4a.equals(hash, node.hash)) {
update.bitfield.set(i, false)
storage.deleteBlock(i)
bitfield.set(i, false)
corrections.blocks++
}

i++
}

return corrections
bitfield.flush(storage, core.bitfield)

function getNode (index) {
const state = { start: index * 40, end: index * 40 + 40, buffer: tree }
const size = c.uint64.decode(state)
const hash = c.fixed32.decode(state)
return { index, size, hash }
}
return corrections

function clearNode (node) {
valid[node.index] = 0
storage.deleteTreeNode(node.index)
corrections.tree++
}
}

if (node.size) {
b4a.fill(tree, 0, node.index * 40, node.index * 40 + 40)
core.tree.unflushed.set(node.index, core.tree.blankNode(node.index))
corrections.tree++
}
async function readAllBlocks (storage) {
const data = new Map()
for await (const block of storage.createBlockStream()) {
data.set(block.index, block.value)
}
return data
}

function readFullStorage (storage) {
return new Promise((resolve, reject) => {
storage.stat((_, st) => {
if (!st) return resolve(b4a.alloc(0))
storage.read(0, st.size, (err, data) => {
if (err) reject(err)
else resolve(data)
})
})
})
async function readAllTreeNodes (storage) {
const nodes = new Map()
for await (const node of storage.createTreeNodeStream()) {
nodes.set(node.index, node)
}
return nodes
}
13 changes: 6 additions & 7 deletions lib/bit-interlude.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ const b4a = require('b4a')
const quickbit = require('./compat').quickbit

module.exports = class BitInterlude {
constructor (bitfield) {
this.bitfield = bitfield
constructor () {
this.ranges = []
}

Expand Down Expand Up @@ -40,7 +39,7 @@ module.exports = class BitInterlude {
return r.value
}

return this.bitfield.get(index)
return false
}

setRange (start, end, value) {
Expand Down Expand Up @@ -110,7 +109,7 @@ module.exports = class BitInterlude {
this.ranges.push({ start, end, value })
}

flush (writer, debug) {
flush (writer, bitfield) {
if (!this.ranges.length) return []

let index = this.ranges[0].start
Expand All @@ -119,10 +118,10 @@ module.exports = class BitInterlude {
let i = 0

while (index < final) {
const page = this.bitfield.getBitfield(index) // read only
const pageIndex = page ? page.index : this.bitfield.getPageIndex(index)
const page = bitfield.getBitfield(index) // read only
const pageIndex = page ? page.index : bitfield.getPageIndex(index)

const buf = b4a.allocUnsafe(this.bitfield.getPageByteLength())
const buf = b4a.allocUnsafe(bitfield.getPageByteLength())

const view = new DataView(
buf.buffer,
Expand Down
3 changes: 1 addition & 2 deletions lib/block-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ module.exports = class BlockStore {
}
}

clear (writer, start, length = -1) {
const end = length === -1 ? -1 : start + length
clear (writer, start, end = -1) {
writer.deleteBlockRange(start, end)
}
}
Loading