diff --git a/index.js b/index.js index 97a815ea..73b6f206 100644 --- a/index.js +++ b/index.js @@ -323,7 +323,7 @@ class Hypercore extends EventEmitter { if (opts.userData) { const batch = this.state.storage.createWriteBatch() for (const [key, value] of Object.entries(opts.userData)) { - this.core.setUserData(batch, key, value) + batch.setUserData(key, value) } await batch.flush() } @@ -612,10 +612,30 @@ class Hypercore extends EventEmitter { if (this.opened === false) await this.opening if (!isValidIndex(start) || !isValidIndex(end)) throw ASSERTION('has range is invalid') - if (end === start + 1) return this.state.bitfield.get(start) + if (this.state.isDefault()) { + if (end === start + 1) return this.core.bitfield.get(start) - const i = this.state.bitfield.firstUnset(start) - return i === -1 || i >= end + const i = this.core.bitfield.firstUnset(start) + return i === -1 || i >= end + } + + if (end === start + 1) { + const reader = this.state.storage.createReadBatch() + const block = reader.getBlock(start) + reader.tryFlush() + + return (await block) !== null + } + + let count = 0 + + const stream = this.state.storage.createBlockStream({ gte: start, lt: end }) + for await (const block of stream) { + if (block === null) return false + count++ + } + + return count === (end - start) } async get (index, opts) { @@ -690,7 +710,7 @@ class Hypercore extends EventEmitter { // lets check the bitfield to see if we got it during the above async calls // this is the last resort before replication, so always safe. - if (this.core.state.bitfield.get(index)) { + if (this.core.bitfield.get(index)) { return readBlock(this.state.storage.createReadBatch(), index) } @@ -954,7 +974,7 @@ function initOnce (session, storage, key, opts) { notDownloadingLinger: opts.notDownloadingLinger, allowFork: opts.allowFork !== false, inflightRange: opts.inflightRange, - compat: opts.compat, + compat: opts.compat === true, force: opts.force, createIfMissing: opts.createIfMissing, discoveryKey: opts.discoveryKey, diff --git a/lib/audit.js b/lib/audit.js index 52c4781b..ad552c28 100644 --- a/lib/audit.js +++ b/lib/audit.js @@ -1,14 +1,13 @@ const hypercoreCrypto = require('hypercore-crypto') const flat = require('flat-tree') -const c = require('compact-encoding') const b4a = require('b4a') -const empty = b4a.alloc(32) +const BitInterlude = require('./bit-interlude') // this is optimised for speed over mem atm // can be tweaked in the future -module.exports = async function auditCore (core, update) { +module.exports = async function auditCore (core, storage) { const corrections = { tree: 0, blocks: 0 @@ -16,8 +15,10 @@ module.exports = async function auditCore (core, update) { const length = core.header.tree.length - const data = await readFullStorage(core.blocks.storage) - const tree = await readFullStorage(core.tree.storage) + const bitfield = new BitInterlude() + + const data = await readAllBlocks(core.storage) + const tree = await readAllTreeNodes(core.tree.storage) const valid = new Uint8Array(Math.ceil(tree.byteLength / 40)) const stack = [] @@ -32,8 +33,8 @@ module.exports = async function auditCore (core, update) { if ((node.index & 1) === 0) continue const [left, right] = flat.children(node.index) - const leftNode = getNode(left) - const rightNode = getNode(right) + const leftNode = tree.get(left) + const rightNode = tree.get(right) if (!rightNode && !leftNode) continue @@ -48,8 +49,8 @@ module.exports = async function auditCore (core, update) { } } - if (leftNode.size) clearNode(leftNode) - if (rightNode.size) clearNode(rightNode) + if (leftNode.size) clearNode(left) + if (rightNode.size) clearNode(right) } let i = 0 @@ -69,57 +70,52 @@ module.exports = async function auditCore (core, update) { try { nextOffset = await core.tree.byteOffset(i * 2) } catch { - update.bitfield.set(i, false) + storage.deleteBlock(i) + bitfield.set(i, false) corrections.blocks++ i++ continue } } - const node = getNode(i * 2) - const blk = data.subarray(nextOffset, nextOffset + node.size) + const node = tree.get(i * 2) + const blk = data.get(i) const hash = hypercoreCrypto.data(blk) nextOffset += blk.byteLength if (!b4a.equals(hash, node.hash)) { - update.bitfield.set(i, false) + storage.deleteBlock(i) + bitfield.set(i, false) corrections.blocks++ } i++ } - return corrections + bitfield.flush(storage, core.bitfield) - function getNode (index) { - if (index * 40 + 40 > tree.byteLength) return null - const state = { start: index * 40, end: index * 40 + 40, buffer: tree } - const size = c.uint64.decode(state) - const hash = c.fixed32.decode(state) - if (size === 0 && hash.equals(empty)) return null - return { index, size, hash } - } + return corrections function clearNode (node) { valid[node.index] = 0 + storage.deleteTreeNode(node.index) + corrections.tree++ + } +} - if (node.size) { - b4a.fill(tree, 0, node.index * 40, node.index * 40 + 40) - core.tree.unflushed.set(node.index, core.tree.blankNode(node.index)) - corrections.tree++ - } +async function readAllBlocks (storage) { + const data = new Map() + for await (const block of storage.createBlockStream()) { + data.set(block.index, block.value) } + return data } -function readFullStorage (storage) { - return new Promise((resolve, reject) => { - storage.stat((_, st) => { - if (!st) return resolve(b4a.alloc(0)) - storage.read(0, st.size, (err, data) => { - if (err) reject(err) - else resolve(data) - }) - }) - }) +async function readAllTreeNodes (storage) { + const nodes = new Map() + for await (const node of storage.createTreeNodeStream()) { + nodes.set(node.index, node) + } + return nodes } diff --git a/lib/bit-interlude.js b/lib/bit-interlude.js index e0aab44a..792def1b 100644 --- a/lib/bit-interlude.js +++ b/lib/bit-interlude.js @@ -2,8 +2,7 @@ const b4a = require('b4a') const quickbit = require('./compat').quickbit module.exports = class BitInterlude { - constructor (bitfield) { - this.bitfield = bitfield + constructor () { this.ranges = [] } @@ -40,7 +39,7 @@ module.exports = class BitInterlude { return r.value } - return this.bitfield.get(index) + return false } setRange (start, end, value) { @@ -110,7 +109,7 @@ module.exports = class BitInterlude { this.ranges.push({ start, end, value }) } - flush (writer, debug) { + flush (writer, bitfield) { if (!this.ranges.length) return [] let index = this.ranges[0].start @@ -119,10 +118,10 @@ module.exports = class BitInterlude { let i = 0 while (index < final) { - const page = this.bitfield.getBitfield(index) // read only - const pageIndex = page ? page.index : this.bitfield.getPageIndex(index) + const page = bitfield.getBitfield(index) // read only + const pageIndex = page ? page.index : bitfield.getPageIndex(index) - const buf = b4a.allocUnsafe(this.bitfield.getPageByteLength()) + const buf = b4a.allocUnsafe(bitfield.getPageByteLength()) const view = new DataView( buf.buffer, diff --git a/lib/block-store.js b/lib/block-store.js index 8833175b..fbe44298 100644 --- a/lib/block-store.js +++ b/lib/block-store.js @@ -19,8 +19,7 @@ module.exports = class BlockStore { } } - clear (writer, start, length = -1) { - const end = length === -1 ? -1 : start + length + clear (writer, start, end = -1) { writer.deleteBlockRange(start, end) } } diff --git a/lib/core.js b/lib/core.js index bc3d70b4..5d0ee4f2 100644 --- a/lib/core.js +++ b/lib/core.js @@ -1,9 +1,6 @@ const crypto = require('hypercore-crypto') const b4a = require('b4a') -const assert = require('nanoassert') const unslab = require('unslab') -const flat = require('flat-tree') -const quickbit = require('quickbit-universal') const z32 = require('z32') const Mutex = require('./mutex') const MerkleTree = require('./merkle-tree') @@ -11,598 +8,13 @@ const BlockStore = require('./block-store') const BitInterlude = require('./bit-interlude') const Bitfield = require('./bitfield') const RemoteBitfield = require('./remote-bitfield') -// const Info = require('./info') const { BAD_ARGUMENT, STORAGE_EMPTY, STORAGE_CONFLICT, INVALID_OPERATION, INVALID_SIGNATURE, INVALID_CHECKSUM } = require('hypercore-errors') const Verifier = require('./verifier') const audit = require('./audit') const copyPrologue = require('./copy-prologue') +const SessionState = require('./session-state') const Replicator = require('./replicator') -const HEAD = Symbol.for('head') -const CONTIG = Symbol.for('contig') -const TREE = Symbol.for('tree') -const BITFIELD = Symbol.for('bitfield') -const USER_DATA = Symbol.for('user-data') -const DEPENDENCY = Symbol.for('dependency') - -class Update { - constructor (batch, bitfield, header, state) { - this.batch = batch - this.bitfield = new BitInterlude(bitfield) - - this.state = state - - this.contiguousLength = header.hints.contiguousLength - - this.tree = null - - this.updates = [] - this._coreUpdates = [] - } - - async flushBitfield () { - const update = await this.bitfield.flush(this.batch) - if (update) this.updates.push({ type: BITFIELD, update }) - } - - flushTreeBatch (batch) { - if (batch.commitable()) { - const update = batch.commit(this.batch) - this.updates.push({ type: TREE, update }) - } - - if (batch.upgraded) { - this.tree = { - fork: batch.fork, - length: batch.length, - rootHash: batch.hash(), - signature: batch.signature - } - } - } - - setUserData (key, value) { - this.updates.push({ type: USER_DATA, update: { key, value } }) - this.batch.setUserData(key, value) - } - - updateDependency (length) { - const dependencies = this.state.storage.dependencies - if (!dependencies.length) return // skip default state and overlays of default - - let target = dependencies[dependencies.length - 1] - - for (const dep of dependencies) { - if (dep.length < length) break - target = dep - } - - const dependency = { data: target.data, length } - - this.batch.setDataDependency(dependency) - this.updates.push({ type: DEPENDENCY, update: dependency }) - } - - updateContig (bitfield) { - const contig = updateContigBatch(this.contiguousLength, bitfield, this.bitfield) - - if (contig.length > this.contiguousLength || (bitfield.drop && contig.length < this.contiguousLength)) { - this.contiguousLength = contig.length - this._coreUpdates.push({ type: CONTIG, update: contig.length }) - } - } - - async flush () { - await this.flushBitfield() - - if (this.tree) { - this.batch.setCoreHead(this.tree) - this.updates.push({ type: HEAD, update: this.tree }) - } - - // bitfield flushed before core updates - for (const upd of this._coreUpdates) { - this.updates.push(upd) - } - - await this.batch.flush() - - return this.updates - } - - async truncate (batch, from) { - const bitfield = { - drop: true, - start: batch.ancestors, - length: batch.treeLength - batch.ancestors - } - - this.bitfield.setRange(batch.ancestors, batch.treeLength, false) - this.batch.deleteBlockRange(bitfield.start, bitfield.start + bitfield.length) - - this.flushTreeBatch(batch) - this.updateContig(bitfield) - } -} - -class SessionState { - constructor (core, storage, blocks, tree, bitfield, snapshotLength, name) { - this.core = core - this.index = this.core.sessionStates.push(this) - 1 - - this.storage = storage - this.name = name - this.sessions = [] - - this.mutex = new Mutex() - - this.blocks = blocks - this.tree = tree - this.bitfield = bitfield - this.snapshotLength = snapshotLength - this.snapshotCompatLength = snapshotLength - - this.active = 0 - - this._onflush = null - this._flushing = null - this._activeBatch = null - } - - isSnapshot () { - return this.storage.snapshotted - } - - isDefault () { - return this.core.state === this - } - - addSession (s) { - s._stateIndex = this.sessions.push(s) - 1 - } - - removeSession (s) { - if (s._stateIndex === -1) return - const head = this.sessions.pop() - if (head !== s) this.sessions[(head._stateIndex = s._stateIndex)] = head - s._stateIndex = -1 - } - - flushedLength () { - if (this.isDefault()) return this.tree.length - if (this.isSnapshot()) return this.snapshotLength - - return this.storage.dependencyLength() - } - - unref () { - if (--this.active > 0) return - this.destroy() - } - - ref () { - this.active++ - return this - } - - destroy () { - if (this.index === -1) return - - this.active = 0 - this.storage.destroy() - this.mutex.destroy(new Error('Closed')).catch(noop) - - const head = this.core.sessionStates.pop() - if (head !== this) this.core.sessionStates[(head.index = this.index)] = head - - this.index = -1 - } - - snapshot () { - const s = new SessionState( - this.core, - this.storage.snapshot(), - this.blocks, - this.tree, // todo: should clone also but too many changes atm - this.bitfield, - this.tree.length, - this.name - ) - - return s - } - - memoryOverlay () { - const storage = this.storage.createMemoryOverlay() - const s = new SessionState( - this.core, - storage, - this.blocks, - this.tree.clone(storage), - this.bitfield, - -1, - this.name - ) - - return s - } - - _updateDependencies (dependency) { - assert(!this.isDefault(), 'Default state should have no dependencies') - - const deps = this.storage.dependencies - - for (let i = deps.length - 1; i >= 0; i--) { - if (deps[i].data === dependency.data) { - const updated = deps.slice(0, i) - updated.push(dependency) - - this.storage.dependencies = updated - return - } - } - - throw new Error('Dependency not found') - } - - _clearActiveBatch (err) { - if (!this._activeBatch) return - this._activeBatch.destroy() - - if (this._onflush) this._onflush(err) - - this._onflush = null - this._flushing = null - - this._activeBatch = null - } - - _unlock () { - this._clearActiveBatch() - this.mutex.unlock() - this.core.checkIfIdle() - } - - createUpdate () { - assert(!this._activeBatch && !this.storage.snapshotted) - - this._activeBatch = this.overlay ? this.overlay.createWriteBatch() : this.storage.createWriteBatch() - return new Update(this._activeBatch, this.bitfield, this.core.header, this) - } - - async flushUpdate (u) { - const flushing = this._flushUpdateBatch(u) - - try { - if (!this._flushing) this._flushing = flushing - - await flushing - } finally { - this._clearActiveBatch(this) - } - } - - flushed () { - if (!this._activeBatch) return - - if (this._flushing) return this._flushing - - this._flushing = new Promise(resolve => { - this._onflush = resolve - }) - - return this._flushing - } - - async _flushUpdateBatch (u) { - await u.flush() - - if (!u.updates.length) return - - for (const { type, update } of u.updates) { - switch (type) { - case TREE: // tree - if (!this.isDefault()) this.tree.onupdate(update) - break - - case BITFIELD: // bitfield - this.bitfield.onupdate(update) - break - - case DEPENDENCY: // dependency - this._updateDependencies(update) - break - } - } - - if (!this.isDefault()) return - - this.core._processUpdates(u.updates) - } - - async setUserData (key, value) { - await this.mutex.lock() - - try { - const update = this.createUpdate() - update.setUserData(key, value) - - return await this.flushUpdate(update) - } finally { - this._unlock() - } - } - - async truncate (length, fork, { signature, keyPair } = {}) { - if (this.tree.prologue && length < this.tree.prologue.length) { - throw INVALID_OPERATION('Truncation breaks prologue') - } - - if (!keyPair && this.isDefault()) keyPair = this.core.header.keyPair - - await this.mutex.lock() - - try { - const batch = await this.tree.truncate(length, fork) - - if (!signature && keyPair && length > 0) signature = this.core.verifier.sign(batch, keyPair) - if (signature) batch.signature = signature - - const update = this.createUpdate() - - // upsert compat manifest - if (this.core.verifier === null && keyPair) this.core._setManifest(update, null, keyPair) - - await update.truncate(batch, null) - - if (batch.length < this.flushedLength()) update.updateDependency(batch.length) - - await this.flushUpdate(update) - - this.ontruncate(length, batch.treeLength, fork) - } finally { - this._unlock() - } - } - - async clear (start, end, cleared) { - await this.mutex.lock() - - try { - const bitfield = { - start, - length: end - start, - drop: true - } - - const update = this.createUpdate() - - update.bitfield.setRange(start, end, false) - - end = this.bitfield.firstSet(end) - - // TODO: verify this: - // start = state.bitfield.lastSet(start) + 1 - // end = state.bitfield.firstSet(end) - - if (end === -1) end = this.tree.length - if (start === -1 || start >= this.tree.length) return - - this.blocks.clear(update.batch, start, end - start) - update.updateContig(bitfield) - - if (start < this.flushedLength()) update.updateDependency(start) - - await this.flushUpdate(update) - - if (this.isDefault()) { - this.core.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop) - } - } finally { - this._unlock() - } - } - - async append (values, { signature, keyPair, preappend } = {}) { - if (!keyPair && this.isDefault()) keyPair = this.core.header.keyPair - - await this.mutex.lock() - - try { - const update = this.createUpdate() - - // upsert compat manifest - if (this.core.verifier === null && keyPair) this.core._setManifest(update, null, keyPair) - - if (preappend) await preappend(values) - - if (!values.length) { - await this.flushUpdate(update) - return { length: this.tree.length, byteLength: this.tree.byteLength } - } - - const batch = this.tree.batch() - for (const val of values) batch.append(val) - - // only multisig can have prologue so signature is always present - if (this.tree.prologue && batch.length < this.tree.prologue.length) { - throw INVALID_OPERATION('Append is not consistent with prologue') - } - - if (!signature && keyPair) signature = this.core.verifier.sign(batch, keyPair) - if (signature) batch.signature = signature - - update.flushTreeBatch(batch) - update.bitfield.setRange(batch.ancestors, batch.length, true) - - this.blocks.putBatch(update.batch, this.tree.length, values) - - const bitfield = { - drop: false, - start: batch.ancestors, - length: values.length - } - - update.updateContig(bitfield) - - await this.flushUpdate(update) - - this.onappend(bitfield) - return { length: batch.length, byteLength: batch.byteLength } - } finally { - this._unlock() - } - } - - onappend (bitfield) { - if (this.isDefault()) { - if (bitfield) { - this.core.replicator.cork() - this.core.replicator.onupgrade() - this.core.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop) - this.core.replicator.uncork() - } else { - this.core.replicator.onupgrade() - } - } - - for (let i = this.sessions.length - 1; i >= 0; i--) { - this.sessions[i].emit('append') - } - } - - ontruncate (to, from, fork) { - if (this.isDefault()) { - const length = from - to - - this.core.replicator.cork() - this.core.replicator.ontruncate(to, length) - this.core.replicator.onhave(to, length, true) - this.core.replicator.onupgrade() - this.core.replicator.uncork() - - for (const sessionState of this.core.sessionStates) { - if (to < sessionState.snapshotCompatLength) sessionState.snapshotCompatLength = to - } - } - - for (let i = this.sessions.length - 1; i >= 0; i--) { - this.sessions[i].emit('truncate', to, fork) - } - } - - async _overwrite (source, length, treeLength, signature) { - const blockPromises = [] - const treePromises = [] - const rootPromises = [] - - const reader = source.storage.createReadBatch() - - for (const root of flat.fullRoots(length * 2)) { - rootPromises.push(reader.getTreeNode(root)) - } - - for (const index of flat.patch(treeLength * 2, length * 2)) { - treePromises.push(reader.getTreeNode(index)) - } - - for (let i = treeLength; i < length; i++) { - treePromises.push(reader.getTreeNode(i * 2)) - treePromises.push(reader.getTreeNode(i * 2 + 1)) - blockPromises.push(reader.getBlock(i)) - } - - reader.tryFlush() - - const blocks = await Promise.all(blockPromises) - const nodes = await Promise.all(treePromises) - const roots = await Promise.all(rootPromises) - - if (signature) { - const batch = this.tree.batch() - batch.roots = roots - batch.length = length - - if (!this.core.verifier.verify(batch, signature)) { - throw INVALID_SIGNATURE('Signature is not valid over committed tree') - } - } - - const writer = this.storage.createWriteBatch() - - // truncate existing tree - if (treeLength < this.tree.length) { - writer.deleteBlockRange(treeLength, this.tree.length) - } - - for (const node of nodes) { - if (node !== null) writer.putTreeNode(node) - } - - for (let i = 0; i < blocks.length; i++) { - writer.putBlock(i + treeLength, blocks[i]) - } - - const totalLength = Math.max(length, this.tree.length) - - const firstPage = getBitfieldPage(treeLength) - const lastPage = getBitfieldPage(totalLength) - - let index = treeLength - - for (let i = firstPage; i <= lastPage; i++) { - const page = b4a.alloc(Bitfield.BYTES_PER_PAGE) - writer.putBitfieldPage(i, page) - - if (index < length) { - index = fillBitfieldPage(page, index, length, i, true) - if (index < length) continue - } - - if (index < this.tree.length) { - index = fillBitfieldPage(page, index, this.tree.length, i, false) - } - } - - const tree = { - fork: this.tree.fork, - length, - rootHash: crypto.tree(roots), - signature - } - - const upgraded = treeLength < this.tree.length || this.tree.length < length - - if (upgraded) { - writer.setCoreHead(tree) - this.tree.setRoots(roots, signature) - } - - await writer.flush() - - return tree - } - - async overwrite (state, { length = state.tree.length, treeLength = state.flushedLength() } = {}) { - assert(!this.isDefault(), 'Cannot overwrite signed state') // TODO: make this check better - - await this.mutex.lock() - - try { - await this._overwrite(state, length, treeLength, null) - - return { - length: this.tree.length, - byteLength: this.tree.byteLength - } - } finally { - this._clearActiveBatch() - this.updating = false - this.mutex.unlock() - } - } -} - module.exports = class Core { constructor (db, opts = {}) { this.db = db @@ -842,7 +254,7 @@ module.exports = class Core { this.blocks = blocks this.bitfield = bitfield this.verifier = verifier - this.state = new SessionState(this, storage, this.blocks, tree, bitfield, -1, null) + this.state = new SessionState(this, storage, this.blocks, tree, -1, null) this.state.ref() @@ -900,22 +312,24 @@ module.exports = class Core { prologue: this.tree.prologue }) - return new SessionState(this, storage, this.blocks, tree, bitfield, -1, name) + return new SessionState(this, storage, this.blocks, tree, -1, name) } - async audit (state = this.state) { - await state.mutex.lock() + async audit () { + await this.state.mutex.lock() try { - const update = state.createUpdate() - const corrections = await audit(this, update) + const storage = this.state.createWriteBatch() + + // TODO: refactor audit + const corrections = await audit(this, storage) if (corrections.blocks || corrections.tree) { - await state.flushUpdate(update) + await this.state.flushUpdate(storage) } return corrections } finally { - state._unlock() + this.state._unlock() } } @@ -926,10 +340,10 @@ module.exports = class Core { if (manifest && this.header.manifest === null) { if (!Verifier.isValidManifest(this.header.key, manifest)) throw INVALID_CHECKSUM('Manifest hash does not match') - const update = this.state.createUpdate() - this._setManifest(update, Verifier.createManifest(manifest), null) + const storage = this.state.createWriteBatch() + this._setManifest(storage, Verifier.createManifest(manifest), null) - await this.state.flushUpdate(update) + await this.state.flushWriteBatch(storage) this.replicator.onupgrade() } } finally { @@ -937,7 +351,7 @@ module.exports = class Core { } } - _setManifest (update, manifest, keyPair) { + _setManifest (storage, manifest, keyPair) { if (!manifest && b4a.equals(keyPair.publicKey, this.header.key)) manifest = Verifier.defaultSignerManifest(this.header.key) if (!manifest) return @@ -947,7 +361,7 @@ module.exports = class Core { this.manifest = this.header.manifest = manifest - update.batch.setCoreAuth({ key: this.header.key, manifest: Verifier.encodeManifest(manifest) }) + storage.setCoreAuth({ key: this.header.key, manifest: Verifier.encodeManifest(manifest) }) this.compat = verifier.compat this.verifier = verifier @@ -995,62 +409,6 @@ module.exports = class Core { return this.state.flushed() } - async _processUpdates (updates) { - for (const { type, update } of updates) { - switch (type) { - case HEAD: { - this.header.tree = update - break - } - - case CONTIG: { // contig - this.header.hints.contiguousLength = update - break - } - - case TREE: // tree - if (update.truncated) addReorgHint(this.header.hints.reorgs, this.tree, update) - this.tree.onupdate(update) - break - - case BITFIELD: // bitfield - if (this.skipBitfield !== null) this._updateSkipBitfield(update) - break - - case USER_DATA: { // user data - let exists = false - for (const entry of this.header.userData) { - if (entry.key !== update.key) continue - - entry.value = update.value - exists = true - break - } - - if (exists) continue - - this.header.userData.push({ key: update.key, value: unslab(update.value) }) - break - } - } - } - } - - _writeBlock (writer, index, value) { - this.blocks.put(writer, index, value) - } - - userData (key, value) { - const update = this.state.createUpdate() - this.setUserData(update, key, value) - - return this.state.flushUpdate(update) - } - - setUserData (update, key, value) { - return update.setUserData(key, value) - } - async commit (state, { signature, keyPair = this.header.keyPair, length = state.tree.length, treeLength = state.flushedLength(), overwrite = false } = {}) { let sourceLocked = false @@ -1085,13 +443,14 @@ module.exports = class Core { // gc blocks from source if (treeLength < length) { - const source = state.createUpdate() + const storage = state.createWriteBatch() - state.blocks.clear(source.batch, treeLength, length - treeLength) + state.blocks.clear(storage, treeLength, length) + const dependency = state.updateDependency(storage, length) - source.updateDependency(length) + await state.flushWriteBatch(storage) - await state.flushUpdate(source) + if (dependency) state.refreshDependencies(dependency) } if (this.header.hints.contiguousLength === treeLength) { @@ -1100,7 +459,7 @@ module.exports = class Core { } // update in memory bitfield - this.bitfield.setRange(treeLength, length, true) + this._setBitfieldRanges(treeLength, length, true) if (this.header.tree.length < tree.length || treeLength < this.header.tree.length) { this.header.tree = tree @@ -1126,7 +485,7 @@ module.exports = class Core { } } - _verifyBatchUpgrade (update, batch, manifest) { + _verifyBatchUpgrade (batch, manifest) { if (!this.header.manifest) { if (!manifest && this.compat) manifest = Verifier.defaultSignerManifest(this.header.key) @@ -1142,39 +501,18 @@ module.exports = class Core { if (!verifier.verify(batch, batch.signature)) { throw INVALID_SIGNATURE('Proof contains an invalid signature') } - - if (!this.header.manifest && update !== null) this._setManifest(update, manifest, null) } - async _verifyExclusive ({ batch, bitfield, value, manifest, from }) { - await this.state.mutex.lock() - - const update = this.state.createUpdate() - - try { - this._verifyBatchUpgrade(update, batch, manifest) - - if (!batch.commitable()) return false - this.updating = true - - if (this.preupdate !== null) await this.preupdate(batch, this.header.key) - if (bitfield) this._writeBlock(update.batch, bitfield.start, value) - - if (bitfield) { - update.bitfield.setRange(bitfield.start, bitfield.start + 1, true) - update.updateContig(bitfield) - } + async _verifyExclusive ({ batch, bitfield, value, manifest }) { + this._verifyBatchUpgrade(batch, manifest) + if (!batch.commitable()) return false - update.flushTreeBatch(batch) + if (this.preupdate !== null) await this.preupdate(batch, this.header.key) - await this.state.flushUpdate(update) + await this.state._verifyBlock(batch, bitfield, value, this.header.manifest ? null : manifest) - if (batch.upgraded) this.state.onappend(bitfield) - else if (bitfield) this.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop) - } finally { - this.state._clearActiveBatch() - this.updating = false - this.state.mutex.unlock() + if (!batch.upgraded && bitfield) { + this.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop) } return true @@ -1185,7 +523,7 @@ module.exports = class Core { await this.state.mutex.lock() - const update = this.state.createUpdate() + const storage = this.state.createWriteBatch() const verifies = this._verifies this._verifies = null @@ -1196,10 +534,13 @@ module.exports = class Core { if (!batch.commitable()) continue if (bitfield) { - this._writeBlock(update.batch, bitfield.start, value) + storage.putBlock(bitfield.start, value) } } + const bits = new BitInterlude() + const treeUpdates = [] + for (let i = 0; i < verifies.length; i++) { const { batch, bitfield, manifest } = verifies[i] @@ -1209,25 +550,36 @@ module.exports = class Core { } if (bitfield) { - update.bitfield.setRange(bitfield.start, bitfield.start + 1, true) + bits.setRange(bitfield.start, bitfield.start + 1, true) } // if we got a manifest AND its strictly a non compat one, lets store it if (manifest && this.header.manifest === null) { if (!Verifier.isValidManifest(this.header.key, manifest)) throw INVALID_CHECKSUM('Manifest hash does not match') - this._setManifest(update, manifest, null) + this._setManifest(storage, manifest, null) } - if (bitfield) update.updateContig(bitfield) + if (batch.commitable()) treeUpdates.push(batch.commit(storage)) + } + + const ranges = bits.flush(storage, this.bitfield) + + await this.state.flushWriteBatch(storage) - update.flushTreeBatch(batch) + for (const batch of treeUpdates) { + this.state.tree.onupdate(batch) } - await this.state.flushUpdate(update) + for (const { start, end, value } of ranges) { + this._setBitfieldRanges(start, end, value) + } for (let i = 0; i < verifies.length; i++) { const bitfield = verifies[i] && verifies[i].bitfield - if (bitfield) this.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop) + if (bitfield) { + this.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop) + this.updateContiguousLength(bitfield) + } } } finally { this.state._clearActiveBatch() @@ -1245,15 +597,21 @@ module.exports = class Core { const batch = this.tree.verifyFullyRemote(proof) + try { + this._verifyBatchUpgrade(batch, proof.manifest) + } catch { + return true + } + await this.state.mutex.lock() try { - const update = this.state.createUpdate() - this._verifyBatchUpgrade(update, batch, proof.manifest) + const storage = this.state.createWriteBatch() + if (this.header.manifest === null && proof.manifest) { + this._setManifest(storage, proof.manifest, null) + } - await this.state.flushUpdate(update) - } catch { - return true + await this.state.flushWriteBatch(storage) } finally { this.state.mutex.unlock() } @@ -1269,7 +627,7 @@ module.exports = class Core { async verifyReorg (proof) { const batch = await this.tree.reorg(proof) - this._verifyBatchUpgrade(null, batch, proof.manifest) + this._verifyBatchUpgrade(batch, proof.manifest) return batch } @@ -1308,25 +666,15 @@ module.exports = class Core { return this._verified } - async reorg (batch, from) { + async reorg (batch) { if (!batch.commitable()) return false this.truncating++ - await this.state.mutex.lock() try { - if (!batch.commitable()) return false - - const update = this.state.createUpdate() - await update.truncate(batch, from) - - await this.state.flushUpdate(update) - - this.state.ontruncate(batch.ancestors, batch.treeLength, batch.fork) + await this.state.reorg(batch) } finally { - this.state._clearActiveBatch() this.truncating-- - this.state.mutex.unlock() } return true @@ -1341,10 +689,9 @@ module.exports = class Core { return this.skipBitfield } - _updateSkipBitfield (ranges) { - for (const { start, end, value } of ranges) { - this.skipBitfield.setRange(start, end, value) - } + _setBitfieldRanges (start, end, value) { + this.bitfield.setRange(start, end, value) + if (this.skipBitfield !== null) this.skipBitfield.setRange(start, end, value) } close () { @@ -1352,6 +699,52 @@ module.exports = class Core { return this.closing } + updateContiguousLength (bitfield) { + const contig = updateContigBatch(this.header.hints.contiguousLength, bitfield, this.bitfield) + + if (contig.length !== -1 && contig.length !== this.header.hints.contiguousLength) { + this.header.hints.contiguousLength = contig.length + } + } + + onappend (tree, bitfield) { + this.header.tree = tree + + if (!bitfield) { + this.replicator.onupgrade() + return + } + + this.replicator.cork() + + this._setBitfieldRanges(bitfield.start, bitfield.start + bitfield.length, true) + this.updateContiguousLength(bitfield) + + this.replicator.onupgrade() + this.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop) + this.replicator.uncork() + } + + ontruncate (tree, to, from) { + if (tree) this.header.tree = tree + + this.replicator.cork() + + const length = from - to + + this.replicator.ontruncate(to, length) + this.replicator.onhave(to, length, true) + this.replicator.onupgrade() + this.replicator.uncork() + + for (const sessionState of this.sessionStates) { + if (to < sessionState.snapshotCompatLength) sessionState.snapshotCompatLength = to + } + + this._setBitfieldRanges(to, from, false) + this.updateContiguousLength({ start: to, length, drop: true }) + } + async _onconflict (proof, from) { await this.replicator.onconflict(from) @@ -1411,7 +804,7 @@ function updateContigBatch (start, upd, bitfield) { if (c === start) { return { - length: null + length: -1 } } @@ -1426,18 +819,6 @@ function updateContigBatch (start, upd, bitfield) { } } -function addReorgHint (list, tree, batch) { - if (tree.length === 0 || tree.fork === batch.fork) return - - while (list.length >= 4) list.shift() // 4 here is arbitrary, just want it to be small (hints only) - while (list.length > 0) { - if (list[list.length - 1].ancestors > batch.ancestors) list.pop() - else break - } - - list.push({ from: tree.fork, to: batch.fork, ancestors: batch.ancestors }) -} - function getDefaultTree () { return { fork: 0, @@ -1490,23 +871,3 @@ async function getCoreInfo (storage) { head: await head } } - -function getBitfieldPage (index) { - return Math.floor(index / Bitfield.BITS_PER_PAGE) -} - -function getBitfieldOffset (index) { - return index & (Bitfield.BITS_PER_PAGE - 1) -} - -function fillBitfieldPage (page, start, end, pageIndex, value) { - const last = ((pageIndex + 1) * Bitfield.BITS_PER_PAGE) - 1 - const from = getBitfieldOffset(start) - - const index = last < end ? last : end - const to = getBitfieldOffset(index) - - quickbit.fill(page, value, from, to) - - return index -} diff --git a/lib/session-state.js b/lib/session-state.js new file mode 100644 index 00000000..b5a89fc4 --- /dev/null +++ b/lib/session-state.js @@ -0,0 +1,592 @@ +const crypto = require('hypercore-crypto') +const b4a = require('b4a') +const assert = require('nanoassert') +const flat = require('flat-tree') +const quickbit = require('quickbit-universal') +const Mutex = require('./mutex') +const Bitfield = require('./bitfield') +const { INVALID_OPERATION, INVALID_SIGNATURE } = require('hypercore-errors') + +module.exports = class SessionState { + constructor (core, storage, blocks, tree, snapshotLength, name) { + this.core = core + this.index = this.core.sessionStates.push(this) - 1 + + this.storage = storage + this.name = name + this.sessions = [] + + this.mutex = new Mutex() + + this.blocks = blocks + this.tree = tree + this.snapshotLength = snapshotLength + this.snapshotCompatLength = snapshotLength + + this.active = 0 + + this._onflush = null + this._flushing = null + this._activeBatch = null + } + + isSnapshot () { + return this.storage.snapshotted + } + + isDefault () { + return this.core.state === this + } + + addSession (s) { + s._stateIndex = this.sessions.push(s) - 1 + } + + removeSession (s) { + if (s._stateIndex === -1) return + const head = this.sessions.pop() + if (head !== s) this.sessions[(head._stateIndex = s._stateIndex)] = head + s._stateIndex = -1 + } + + flushedLength () { + if (this.isDefault()) return this.tree.length + if (this.isSnapshot()) return this.snapshotLength + + return this.storage.dependencyLength() + } + + unref () { + if (--this.active > 0) return + this.destroy() + } + + ref () { + this.active++ + return this + } + + destroy () { + if (this.index === -1) return + + this.active = 0 + this.storage.destroy() + this.mutex.destroy(new Error('Closed')).catch(noop) + + const head = this.core.sessionStates.pop() + if (head !== this) this.core.sessionStates[(head.index = this.index)] = head + + this.index = -1 + } + + snapshot () { + const s = new SessionState( + this.core, + this.storage.snapshot(), + this.blocks, + this.tree, // todo: should clone also but too many changes atm + this.tree.length, + this.name + ) + + return s + } + + memoryOverlay () { + const storage = this.storage.createMemoryOverlay() + const s = new SessionState( + this.core, + storage, + this.blocks, + this.tree.clone(storage), + -1, + this.name + ) + + return s + } + + updateDependency (storage, length) { + const dependency = updateDependency(this, length) + if (dependency) storage.setDataDependency(dependency) + + return dependency + } + + refreshDependencies (dependency) { + assert(!this.isDefault(), 'Default state should have no dependencies') + + const deps = this.storage.dependencies + + for (let i = deps.length - 1; i >= 0; i--) { + if (deps[i].data === dependency.data) { + const updated = deps.slice(0, i) + updated.push(dependency) + + this.storage.dependencies = updated + return + } + } + + throw new Error('Dependency not found') + } + + _clearActiveBatch (err) { + if (!this._activeBatch) return + this._activeBatch.destroy() + + if (this._onflush) this._onflush(err) + + this._onflush = null + this._flushing = null + + this._activeBatch = null + } + + createWriteBatch () { + assert(!this._activeBatch && !this.storage.snapshotted) + + this._activeBatch = this.storage.createWriteBatch() + return this._activeBatch + } + + _unlock () { + this._clearActiveBatch() + this.mutex.unlock() + this.core.checkIfIdle() + } + + async flushWriteBatch (writer) { + const flushing = writer.flush() + + try { + if (!this._flushing) this._flushing = flushing + + await flushing + } finally { + this._clearActiveBatch() + } + } + + flushed () { + if (!this._activeBatch) return + + if (this._flushing) return this._flushing + + this._flushing = new Promise(resolve => { + this._onflush = resolve + }) + + return this._flushing + } + + async setUserData (key, value) { + await this.mutex.lock() + + try { + const storage = this.createWriteBatch() + storage.setUserData(key, value) + + return await this.flushWriteBatch(storage) + } finally { + this._unlock() + } + } + + async _verifyBlock (batch, bitfield, value, manifest, from) { + await this.mutex.lock() + + try { + const storage = this.createWriteBatch() + this.updating = true + + if (bitfield) this.blocks.put(storage, bitfield.start, value) + + if (bitfield && this.isDefault()) { + await storeBitfieldRange(this.storage, this.storage, storage, bitfield.start, bitfield.start + 1, true) + } + + if (manifest) this.core._setManifest(storage, manifest, null) + + const treeUpdate = batch.commitable() ? batch.commit(storage) : null + + const tree = { + fork: batch.fork, + length: batch.length, + rootHash: batch.hash(), + signature: batch.signature + } + + if (batch.upgraded) storage.setCoreHead(tree) + + await this.flushWriteBatch(storage) + + if (treeUpdate) this.tree.onupdate(treeUpdate) + + if (batch.upgraded) this.onappend(tree, bitfield) + } finally { + this._clearActiveBatch() + this.updating = false + this.mutex.unlock() + } + } + + async truncate (length, fork, { signature, keyPair } = {}) { + if (this.tree.prologue && length < this.tree.prologue.length) { + throw INVALID_OPERATION('Truncation breaks prologue') + } + + if (!keyPair && this.isDefault()) keyPair = this.core.header.keyPair + + await this.mutex.lock() + + try { + const batch = await this.tree.truncate(length, fork) + + if (!signature && keyPair && length > 0) signature = this.core.verifier.sign(batch, keyPair) + if (signature) batch.signature = signature + + const storage = this.createWriteBatch() + + // upsert compat manifest + if (this.core.verifier === null && keyPair) this.core._setManifest(storage, null, keyPair) + + const { dependency, tree, treeUpdate } = await this._truncate(storage, batch) + + await this.flushWriteBatch(storage) + + if (dependency) this.refreshDependencies(dependency) + this.tree.onupdate(treeUpdate) + + this.ontruncate(tree, tree.length, batch.treeLength) + } finally { + this._unlock() + } + } + + async reorg (batch) { + await this.mutex.lock() + + const storage = this.createWriteBatch() + + try { + if (!batch.commitable()) return false + + const { dependency, tree, treeUpdate } = await this._truncate(storage, batch) + + await this.flushWriteBatch(storage) + + if (dependency) this.refreshDependencies(dependency) + this.tree.onupdate(treeUpdate) + + this.ontruncate(tree, batch.ancestors, batch.treeLength) + } finally { + this._unlock() + } + } + + async _truncate (storage, batch) { + storage.deleteBlockRange(batch.ancestors, batch.treeLength) + + const treeUpdate = batch.commitable() ? batch.commit(storage) : null + + const tree = { + fork: batch.fork, + length: batch.length, + rootHash: batch.hash(), + signature: batch.signature + } + + if (tree) storage.setCoreHead(tree) + + const truncated = batch.length < this.flushedLength() + const dependency = truncated ? updateDependency(this, batch.length) : null + + if (this.isDefault()) { + await storeBitfieldRange(this.storage, storage, batch.ancestors, batch.treeLength, false) + } + + if (dependency) storage.setDataDependency(dependency) + + return { dependency, tree, treeUpdate } + } + + async clear (start, end, cleared) { + await this.mutex.lock() + + try { + const storage = this.createWriteBatch() + + if (this.isDefault()) await storeBitfieldRange(this.storage, storage, start, end, false) + + this.blocks.clear(storage, start, end) + + const dependency = start < this.flushedLength() ? updateDependency(this, start) : null + + await this.flushWriteBatch(storage) + + if (dependency) this.refreshDependencies(dependency) + + if (this.isDefault()) { + const length = end - start + this.core.updateContiguousLength({ start, length, drop: true }) + this.core._setBitfieldRanges(start, end, false) + this.core.replicator.onhave(start, length, true) + } + } finally { + this._unlock() + } + } + + async append (values, { signature, keyPair, preappend } = {}) { + if (!keyPair && this.isDefault()) keyPair = this.core.header.keyPair + + await this.mutex.lock() + + try { + const storage = this.createWriteBatch() + + // upsert compat manifest + if (this.core.verifier === null && keyPair) this.core._setManifest(storage, null, keyPair) + + if (preappend) await preappend(values) + + if (!values.length) { + await this.flushWriteBatch(storage) + return { length: this.tree.length, byteLength: this.tree.byteLength } + } + + const batch = this.tree.batch() + for (const val of values) batch.append(val) + + // only multisig can have prologue so signature is always present + if (this.tree.prologue && batch.length < this.tree.prologue.length) { + throw INVALID_OPERATION('Append is not consistent with prologue') + } + + if (!signature && keyPair) signature = this.core.verifier.sign(batch, keyPair) + if (signature) batch.signature = signature + + const treeUpdate = batch.commitable() ? batch.commit(storage) : null + + const tree = { + fork: batch.fork, + length: batch.length, + rootHash: batch.hash(), + signature: batch.signature + } + + storage.setCoreHead(tree) + + if (this.isDefault()) await storeBitfieldRange(this.storage, storage, batch.ancestors, batch.length, true) + + this.blocks.putBatch(storage, this.tree.length, values) + + const bitfield = { + drop: false, + start: batch.ancestors, + length: values.length + } + + await this.flushWriteBatch(storage) + + this.tree.onupdate(treeUpdate) + this.onappend(tree, bitfield) + + return { length: batch.length, byteLength: batch.byteLength } + } finally { + this._unlock() + } + } + + onappend (tree, bitfield) { + if (this.isDefault()) this.core.onappend(tree, bitfield) + + for (let i = this.sessions.length - 1; i >= 0; i--) { + this.sessions[i].emit('append') + } + } + + ontruncate (tree, to, from) { + if (this.isDefault()) this.core.ontruncate(tree, to, from) + + for (let i = this.sessions.length - 1; i >= 0; i--) { + this.sessions[i].emit('truncate', to, tree.fork) + } + } + + async _overwrite (source, length, treeLength, signature) { + const blockPromises = [] + const treePromises = [] + const rootPromises = [] + + const reader = source.storage.createReadBatch() + + for (const root of flat.fullRoots(length * 2)) { + rootPromises.push(reader.getTreeNode(root)) + } + + for (const index of flat.patch(treeLength * 2, length * 2)) { + treePromises.push(reader.getTreeNode(index)) + } + + for (let i = treeLength; i < length; i++) { + treePromises.push(reader.getTreeNode(i * 2)) + treePromises.push(reader.getTreeNode(i * 2 + 1)) + blockPromises.push(reader.getBlock(i)) + } + + reader.tryFlush() + + const blocks = await Promise.all(blockPromises) + const nodes = await Promise.all(treePromises) + const roots = await Promise.all(rootPromises) + + if (signature) { + const batch = this.tree.batch() + batch.roots = roots + batch.length = length + + if (!this.core.verifier.verify(batch, signature)) { + throw INVALID_SIGNATURE('Signature is not valid over committed tree') + } + } + + const writer = this.createWriteBatch() + + // truncate existing tree + if (treeLength < this.tree.length) { + writer.deleteBlockRange(treeLength, this.tree.length) + } + + for (const node of nodes) { + if (node !== null) writer.putTreeNode(node) + } + + for (let i = 0; i < blocks.length; i++) { + writer.putBlock(i + treeLength, blocks[i]) + } + + const totalLength = Math.max(length, this.tree.length) + + const firstPage = getBitfieldPage(treeLength) + const lastPage = getBitfieldPage(totalLength) + + let index = treeLength + + for (let i = firstPage; i <= lastPage; i++) { + const page = b4a.alloc(Bitfield.BYTES_PER_PAGE) + writer.putBitfieldPage(i, page) + + if (index < length) { + index = fillBitfieldPage(page, index, length, i, true) + if (index < length) continue + } + + if (index < this.tree.length) { + index = fillBitfieldPage(page, index, this.tree.length, i, false) + } + } + + const tree = { + fork: this.tree.fork, + length, + rootHash: crypto.tree(roots), + signature + } + + const upgraded = treeLength < this.tree.length || this.tree.length < length + + if (upgraded) { + writer.setCoreHead(tree) + this.tree.setRoots(roots, signature) + } + + await this.flushWriteBatch(writer) + + return tree + } + + async overwrite (state, { length = state.tree.length, treeLength = state.flushedLength() } = {}) { + assert(!this.isDefault(), 'Cannot overwrite signed state') // TODO: make this check better + + await this.mutex.lock() + + try { + await this._overwrite(state, length, treeLength, null) + + return { + length: this.tree.length, + byteLength: this.tree.byteLength + } + } finally { + this._clearActiveBatch() + this.updating = false + this.mutex.unlock() + } + } +} + +function noop () {} + +function getBitfieldPage (index) { + return Math.floor(index / Bitfield.BITS_PER_PAGE) +} + +function getBitfieldOffset (index) { + return index & (Bitfield.BITS_PER_PAGE - 1) +} + +function fillBitfieldPage (page, start, end, pageIndex, value) { + const last = ((pageIndex + 1) * Bitfield.BITS_PER_PAGE) - 1 + const from = getBitfieldOffset(start) + + const index = last < end ? last : end + const to = getBitfieldOffset(index) + + quickbit.fill(page, value, from, to) + + return index +} + +async function storeBitfieldRange (storage, writer, from, to, value) { + const firstPage = getBitfieldPage(from) + const lastPage = getBitfieldPage(to) + + let index = from + + const reader = storage.createReadBatch() + + const promises = [] + for (let i = firstPage; i <= lastPage; i++) { + promises.push(reader.getBitfieldPage(i)) + } + + reader.tryFlush() + const pages = await Promise.all(promises) + + for (let i = 0; i <= lastPage - firstPage; i++) { + const pageIndex = i + firstPage + if (!pages[i]) pages[i] = b4a.alloc(Bitfield.BYTES_PER_PAGE) + + index = fillBitfieldPage(pages[i], index, to, pageIndex, true) + writer.putBitfieldPage(pageIndex, pages[i]) + } +} + +function updateDependency (state, length) { + const dependencies = state.storage.dependencies + if (!dependencies.length) return null // skip default state and overlays of default + + let target = dependencies[dependencies.length - 1] + + for (const dep of dependencies) { + if (dep.length < length) break + target = dep + } + + return { + data: target.data, + length + } +} diff --git a/test/bitfield.js b/test/bitfield.js index c7c14eb0..13f0f6c0 100644 --- a/test/bitfield.js +++ b/test/bitfield.js @@ -58,11 +58,11 @@ test('bitfield - reload', async function (t) { { const storage = await createStorage(t, dir) const bitfield = await Bitfield.open(storage, 0) - const b = new BitInterlude(bitfield) + const b = new BitInterlude() b.setRange(142, 143, true) b.setRange(40000, 40001, true) b.setRange(1424242424, 1424242425, true) - await flush(storage, b) + await flush(storage, b, bitfield) await storage.db.close() } @@ -282,8 +282,8 @@ async function createStorage (t, dir) { return (await db.resume(dkey)) || (await db.create({ key: dkey, discoveryKey: dkey })) } -async function flush (s, b) { +async function flush (s, b, bitfield) { const w = s.createWriteBatch() - b.flush(w) + b.flush(w, bitfield) await w.flush() } diff --git a/test/core.js b/test/core.js index 5b55ab66..2224a537 100644 --- a/test/core.js +++ b/test/core.js @@ -60,7 +60,6 @@ test('core - append and truncate', async function (t) { t.is(core.tree.length, 3) t.is(core.tree.byteLength, 12) t.is(core.tree.fork, 1) - t.alike(core.header.hints.reorgs, [{ from: 0, to: 1, ancestors: 3 }]) await core.state.append([ b4a.from('a'), @@ -74,12 +73,9 @@ test('core - append and truncate', async function (t) { t.is(core.tree.length, 3) t.is(core.tree.byteLength, 12) t.is(core.tree.fork, 2) - t.alike(core.header.hints.reorgs, [{ from: 0, to: 1, ancestors: 3 }, { from: 1, to: 2, ancestors: 3 }]) await core.state.truncate(2, 3) - t.alike(core.header.hints.reorgs, [{ from: 2, to: 3, ancestors: 2 }]) - await core.state.append([b4a.from('a')]) await core.state.truncate(2, 4) @@ -92,8 +88,6 @@ test('core - append and truncate', async function (t) { await core.state.append([b4a.from('a')]) await core.state.truncate(2, 7) - t.is(core.header.hints.reorgs.length, 4) - // check that it was persisted const coreReopen = await reopen() @@ -103,39 +97,8 @@ test('core - append and truncate', async function (t) { // t.is(coreReopen.header.hints.reorgs.length, 4) }) -test('core - user data', async function (t) { - const { core, reopen } = await create(t) - - await setUserData(core, 'hello', b4a.from('world')) - t.alike(await getUserData(core.storage, 'hello'), b4a.from('world')) - - await setUserData(core, 'hej', b4a.from('verden')) - t.alike(await getUserData(core.storage, 'hello'), b4a.from('world')) - t.alike(await getUserData(core.storage, 'hej'), b4a.from('verden')) - - await setUserData(core, 'hello', null) - t.alike(await getUserData(core.storage, 'hello'), null) - t.alike(await getUserData(core.storage, 'hej'), b4a.from('verden')) - - await setUserData(core, 'hej', b4a.from('world')) - t.alike(await getUserData(core.storage, 'hej'), b4a.from('world')) - - // check that it was persisted - const coreReopen = await reopen() - - t.alike(await getUserData(coreReopen.storage, 'hej'), b4a.from('world')) - - function getUserData (storage, key) { - const b = storage.createReadBatch() - const p = b.getUserData(key) - b.tryFlush() - return p - } -}) - test('core - header does not retain slabs', async function (t) { const { core, reopen } = await create(t) - await setUserData(core, 'hello', b4a.from('world')) t.is(core.header.key.buffer.byteLength, 32, 'unslabbed key') t.is(core.header.keyPair.publicKey.buffer.byteLength, 32, 'unslabbed public key') @@ -143,8 +106,6 @@ test('core - header does not retain slabs', async function (t) { t.is(core.header.manifest.signers[0].namespace.buffer.byteLength, 32, 'unslabbed signers namespace') t.is(core.header.manifest.signers[0].publicKey.buffer.byteLength, 32, 'unslabbed signers publicKey') - t.is(core.header.userData[0].value.buffer.byteLength, 5, 'unslabbed the userdata value') - // check the different code path when re-opening const coreReopen = await reopen() @@ -154,8 +115,6 @@ test('core - header does not retain slabs', async function (t) { t.is(coreReopen.header.manifest.signers[0].namespace.buffer.byteLength, 32, 'reopen unslabbed signers namespace') t.is(coreReopen.header.manifest.signers[0].publicKey.buffer.byteLength, 32, 'reopen unslabbed signers publicKey') - t.is(coreReopen.header.userData[0].value.buffer.byteLength, 5, 'reopen unslabbed the userdata value') - await coreReopen.close() }) @@ -213,8 +172,6 @@ test('core - verify parallel upgrades', async function (t) { test('core - clone', async function (t) { const { core } = await create(t) - await setUserData(core, 'hello', b4a.from('world')) - await core.state.append([ b4a.from('hello'), b4a.from('world') @@ -225,12 +182,6 @@ test('core - clone', async function (t) { await copy.copyPrologue(core.state) - const userData = [] - const str = copy.storage.createUserDataStream() - for await (const { key, value } of str) userData.push({ key, value }) - - t.alike(userData, [{ key: 'hello', value: b4a.from('world') }]) - t.alike([ await getBlock(copy, 0), await getBlock(copy, 1) @@ -430,10 +381,6 @@ async function getBlock (core, i) { return p } -async function setUserData (core, key, value) { - return core.userData(key, value) -} - async function getProof (core, req) { const batch = core.storage.createReadBatch() const p = await core.tree.proof(batch, req) diff --git a/test/manifest.js b/test/manifest.js index 05b21ea3..b903c049 100644 --- a/test/manifest.js +++ b/test/manifest.js @@ -742,12 +742,12 @@ test('multisig - persist to disk', async function (t) { await core.close() await storage.close() - const clone = new Hypercore(await createStorage(t, dir), { manifest }) - await t.execution(clone.ready()) + const reopened = new Hypercore(await createStorage(t, dir), { manifest }) + await t.execution(reopened.ready()) const core2 = await create(t, { manifest }) - const s1 = clone.replicate(true) + const s1 = reopened.replicate(true) const s2 = core2.replicate(false) const p = new Promise((resolve, reject) => { @@ -761,13 +761,13 @@ test('multisig - persist to disk', async function (t) { await t.execution(p) - t.is(core2.length, clone.length) + t.is(core2.length, reopened.length) - await core2.download({ start: 0, end: clone.length }).downloaded() + await core2.download({ start: 0, end: reopened.length }).downloaded() t.alike(await core2.get(0), b4a.from('0')) - await clone.close() + await reopened.close() }) test('multisig - overlapping appends', async function (t) { diff --git a/test/replicate.js b/test/replicate.js index 13330690..1ecc1cf8 100644 --- a/test/replicate.js +++ b/test/replicate.js @@ -1630,7 +1630,7 @@ test('merkle-tree signature gets unslabbed', async function (t) { t.is( b.core.tree.signature.buffer.byteLength, - 64, + b.core.tree.signature.byteLength, 'Signature got unslabbed' ) })