Skip to content

Commit

Permalink
feat: add a method for resetting index state
Browse files Browse the repository at this point in the history
This adds
`MultiCoreIndexer.prototype.removeCoreAndUnlinkIndexStorage(core)` which
removes the core from the indexer and destroys the storage.

Addresses [issue #26][0].

[0]: #26
  • Loading branch information
EvanHahn committed Jan 18, 2024
1 parent e73630a commit 9c70871
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 30 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ Type: `Hypercore`
Add a hypercore to the indexer. Must have the same value encoding as other
hypercores already in the indexer.

### indexer.removeCoreAndUnlinkIndexStorage(core)

#### core

_Required_\
Type: `Hypercore`

Remove a core from being indexed and unlink its storage. To be clear, this destroys the index's storage and doesn't touch the Hypercore's storage.

If the core is not being indexed (or was previously removed), this is a no-op.

### indexer.close()

Stop the indexer and flush index state to storage. This will not close the
Expand Down
35 changes: 31 additions & 4 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const MOVING_AVG_FACTOR = 5
class MultiCoreIndexer extends TypedEmitter {
#indexStream
#writeStream
/** @type {Map<import('hypercore')<T, any>, CoreIndexStream<T>>} */
#coreIndexStreamsByCore = new Map()
#batch
/** @type {import('./lib/types').IndexStateCurrent} */
#state = 'indexing'
Expand All @@ -53,9 +55,9 @@ class MultiCoreIndexer extends TypedEmitter {
constructor(cores, { batch, maxBatch = DEFAULT_BATCH_SIZE, storage }) {
super()
this.#createStorage = MultiCoreIndexer.defaultStorage(storage)
const coreIndexStreams = cores.map((core) => {
return new CoreIndexStream(core, this.#createStorage)
})
const coreIndexStreams = cores.map((core) =>
this.#createCoreIndexStream(core)
)
this.#indexStream = new MultiCoreIndexStream(coreIndexStreams, {
highWaterMark: maxBatch,
})
Expand Down Expand Up @@ -92,10 +94,25 @@ class MultiCoreIndexer extends TypedEmitter {
* @param {import('hypercore')<T, any>} core
*/
addCore(core) {
const coreIndexStream = new CoreIndexStream(core, this.#createStorage)
const coreIndexStream = this.#createCoreIndexStream(core)
this.#indexStream.addStream(coreIndexStream)
}

/**
* Remove a core from being indexed and unlink its storage. To be clear, this destroys the index's storage and doesn't touch the Hypercore's storage.
*
* If the core is not being indexed (or was previously removed), this is a no-op.
*
* @param {import('hypercore')<T, any>} core
* @returns {Promise<void>}
*/
async removeCoreAndUnlinkIndexStorage(core) {
const coreIndexStream = this.#coreIndexStreamsByCore.get(core)
if (!coreIndexStream) return
await this.#indexStream.removeStreamAndUnlinkStorage(coreIndexStream)
this.#coreIndexStreamsByCore.delete(core)
}

/**
* Resolves when indexing state is 'idle'
*/
Expand All @@ -118,6 +135,16 @@ class MultiCoreIndexer extends TypedEmitter {
])
}

/**
* @param {import('hypercore')<T, any>} core
* @returns {CoreIndexStream<T>}
*/
#createCoreIndexStream(core) {
const coreIndexStream = new CoreIndexStream(core, this.#createStorage)
this.#coreIndexStreamsByCore.set(core, coreIndexStream)
return coreIndexStream
}

/** @param {Entry<T>[]} entries */
async #handleEntries(entries) {
this.#emitState()
Expand Down
21 changes: 21 additions & 0 deletions lib/core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,27 @@ class CoreIndexStream extends Readable {
this.#inProgressBitfield?.set(index, false)
}

/**
* @returns {Promise<void>}
*/
unlinkStorage() {
return new Promise((resolve, reject) => {
const storage = this.#storage

if (storage) {
storage.unlink((err) => {
if (err) {
reject(err)
} else {
resolve()
}
})
} else {
resolve()
}
})
}

async #destroy() {
this.#core.removeListener('append', this.#handleAppendBound)
this.#core.removeListener('download', this.#handleDownloadBound)
Expand Down
85 changes: 59 additions & 26 deletions lib/multi-core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ class MultiCoreIndexStream extends Readable {
#readable = new Set()
#pending = pDefer()
#destroying = false
// We cache drained state here rather than reading all streams every time
#drained

/**
*
Expand All @@ -45,7 +43,6 @@ class MultiCoreIndexStream extends Readable {
highWaterMark: opts.highWaterMark || 16,
byteLength: () => 1,
})
this.#drained = streams.length === 0
this.#handleIndexingBound = this.#handleIndexing.bind(this)
this.#handleDrainedBound = this.#handleDrained.bind(this)
for (const s of streams) {
Expand All @@ -62,7 +59,10 @@ class MultiCoreIndexStream extends Readable {
}

get drained() {
return this.#drained
for (const stream of this.#streams.keys()) {
if (!stream.drained) return false
}
return true
}

/**
Expand All @@ -84,13 +84,14 @@ class MultiCoreIndexStream extends Readable {
*/
addStream(stream) {
if (this.#streams.has(stream)) return
this.#drained = false
// Do this so that we can remove this listener when we destroy the stream
const handleReadableFn = this.#handleReadable.bind(this, stream)
this.#streams.set(stream, handleReadableFn)
stream.core
.ready()
.then(() => {
// This can happen if the stream was removed between the call to `.ready()` and the time it resolved.
if (!this.#streams.has(stream)) return
const coreKey = stream.core.key
/* istanbul ignore next: this is set after ready */
if (!coreKey) return
Expand All @@ -103,6 +104,45 @@ class MultiCoreIndexStream extends Readable {
stream.on('drained', this.#handleDrainedBound)
}

/**
* Remove a stream and unlink its storage. If the stream is not found, this is a no-op.
*
* @param {CoreIndexStream<T>} stream
* @returns {Promise<void>}
*/
async removeStreamAndUnlinkStorage(stream) {
const handleReadableFn = this.#streams.get(stream)
if (!handleReadableFn) return

const wasDrained = this.drained
await this.#removeStream(stream, handleReadableFn)
await stream.unlinkStorage()
if (!wasDrained && this.drained) this.emit('drained')
}

/**
* @param {CoreIndexStream<T>} stream
* @param {() => void} handleReadableFn
* @returns {Promise<void>}
*/
async #removeStream(stream, handleReadableFn) {
// It's important to delete these before returning any control to
// the event loop, otherwise multiple calls to this method could
// cause a stream to be destroyed multiple times.
this.#readable.delete(stream)
this.#streams.delete(stream)
const coreKeyString = stream.core.key?.toString('hex')
if (coreKeyString) this.#streamsById.delete(coreKeyString)

stream.off('readable', handleReadableFn)
stream.off('indexing', this.#handleIndexingBound)
stream.off('drained', this.#handleDrainedBound)

const closePromise = once(stream, 'close')
stream.destroy()
await closePromise
}

/** @param {any} cb */
_open(cb) {
cb()
Expand All @@ -124,15 +164,11 @@ class MultiCoreIndexStream extends Readable {
}

async #destroy() {
const closePromises = []
const removePromises = []
for (const [stream, handleReadableFn] of this.#streams) {
stream.off('readable', handleReadableFn)
stream.off('indexing', this.#handleIndexingBound)
stream.off('drained', this.#handleDrainedBound)
stream.destroy()
closePromises.push(once(stream, 'close'))
removePromises.push(this.#removeStream(stream, handleReadableFn))
}
await Promise.all(closePromises)
await Promise.all(removePromises)
}

async #read() {
Expand Down Expand Up @@ -163,24 +199,21 @@ class MultiCoreIndexStream extends Readable {
this.#pending.resolve()
}

// Whenever a source stream emits an indexing event, bubble it up so that the
// `indexing` event always fires at the start of indexing in the chain of
// streams (the `drained` event should happen at the end of the chain once
// everything is read)
#handleIndexing() {
if (!this.#drained) return
this.#drained = false
this.emit('indexing')
let indexingCount = 0
for (const stream of this.#streams.keys()) {
if (!stream.drained) indexingCount++
// We only care if there's exactly 1, so we can break early as an optimization.
if (indexingCount >= 2) break
}
const isFirstIndexing = indexingCount === 1

if (isFirstIndexing) this.emit('indexing')
}

#handleDrained() {
let drained = true
for (const stream of this.#streams.keys()) {
if (!stream.drained) drained = false
}
if (drained === this.#drained && !drained) return
this.#drained = drained
this.emit('drained')
const allDrained = this.drained
if (allDrained) this.emit('drained')
}
}

Expand Down
Loading

0 comments on commit 9c70871

Please sign in to comment.