-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
182 lines (168 loc) · 5.8 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// @ts-check
const { Writable } = require('streamx')
const { TypedEmitter } = require('tiny-typed-emitter')
const { once } = require('events')
const raf = require('random-access-file')
const { discoveryKey } = require('hypercore-crypto')
// const log = require('debug')('multi-core-indexer')
const { CoreIndexStream } = require('./lib/core-index-stream')
const { MultiCoreIndexStream } = require('./lib/multi-core-index-stream')
const DEFAULT_BATCH_SIZE = 100
// The indexing rate (in entries per second) is calculated as an exponential
// moving average. A factor > 1 will put more weight on previous values.
const MOVING_AVG_FACTOR = 5
const kHandleEntries = Symbol('handleEntries')
const kEmitState = Symbol('emitState')
const kGetState = Symbol('getState')
const kHandleIndexing = Symbol('handleIndexing')
/** @typedef {string | ((name: string) => import('random-access-storage'))} StorageParam */
/** @typedef {import('./lib/types').ValueEncoding} ValueEncoding */
/** @typedef {import('./lib/types').IndexState} IndexState */
/** @typedef {import('./lib/types').IndexEvents} IndexEvents */
/**
* @template {ValueEncoding} [T='binary']
* @typedef {import('./lib/types').Entry<T>} Entry
*/
/**
* @template {ValueEncoding} [T='binary']
* @extends {TypedEmitter<IndexEvents>}
*/
class MultiCoreIndexer extends TypedEmitter {
#indexStream
#writeStream
#batch
#handleIndexingBound = this[kHandleIndexing].bind(this)
/** @type {import('./lib/types').IndexStateCurrent} */
#state = 'idle'
#lastRemaining = -1
#rateMeasurementStart = Date.now()
#rate = 0
#createStorage
/** @type {IndexState | undefined} */
#prevEmittedState
/**
*
* @param {import('hypercore')<T, Buffer | string>[]} cores
* @param {object} opts
* @param {(entries: Entry<T>[]) => Promise<void>} opts.batch
* @param {StorageParam} opts.storage
* @param {number} [opts.maxBatch=100]
*/
constructor(cores, { batch, maxBatch = DEFAULT_BATCH_SIZE, storage }) {
super()
this.#createStorage = MultiCoreIndexer.defaultStorage(storage)
const coreIndexStreams = cores.map((core) => {
const storage = this.#createStorage(getStorageName(core))
return new CoreIndexStream(core, storage)
})
this.#indexStream = new MultiCoreIndexStream(coreIndexStreams, {
highWaterMark: maxBatch,
})
this.#batch = batch
this.#writeStream = /** @type {Writable<Entry<T>>} */ (
new Writable({
writev: (entries, cb) => {
// @ts-ignore - I don't know why TS does not like this
this[kHandleEntries](entries).then(() => cb(), cb)
},
highWaterMark: maxBatch,
byteLength: () => 1,
})
)
this.#indexStream.pipe(this.#writeStream)
// This is needed because the source streams can start indexing before this
// stream starts reading data. This ensures that the indexing state is
// emitted when the source cores first append / download data
this.#indexStream.on('indexing', this.#handleIndexingBound)
}
/**
* @type {IndexState}
*/
get state() {
return this[kGetState]()
}
/**
* Add a core to be indexed
* @param {import('hypercore')<T, Buffer | string>} core
*/
addCore(core) {
const storage = this.#createStorage(getStorageName(core))
const coreIndexStream = new CoreIndexStream(core, storage)
this.#indexStream.addStream(coreIndexStream)
}
async close() {
this.#indexStream.off('indexing', this.#handleIndexingBound)
this.#writeStream.destroy()
this.#indexStream.destroy()
return Promise.all([
once(this.#indexStream, 'close'),
once(this.#writeStream, 'close'),
])
}
/** @param {Entry<T>[]} entries */
async [kHandleEntries](entries) {
this[kEmitState]()
/* istanbul ignore if - not sure this is necessary, but better safe than sorry */
if (!entries.length) return
await this.#batch(entries)
for (const { key, index } of entries) {
this.#indexStream.setIndexed(key.toString('hex'), index)
}
const batchTime = Date.now() - this.#rateMeasurementStart
// Current rate entries per second
const rate = entries.length / (batchTime / 1000)
// Moving average rate - use current rate if this is the first measurement
this.#rate =
rate + (this.#rate > 0 ? (this.#rate - rate) / MOVING_AVG_FACTOR : 0)
// Set this at the end of batch rather than start so the timing also
// includes the reads from the index streams
this.#rateMeasurementStart = Date.now()
this[kEmitState]()
}
[kHandleIndexing]() {
if (this.#state === 'indexing') return
this[kEmitState]()
}
[kEmitState]() {
const state = this[kGetState]()
if (state.current !== this.#prevEmittedState?.current) {
this.emit(state.current)
}
// Only emit if remaining has changed
if (state.remaining !== this.#prevEmittedState?.remaining) {
this.emit('index-state', state)
}
this.#prevEmittedState = state
}
[kGetState]() {
const remaining = (this.#lastRemaining = this.#indexStream.remaining)
const prevState = this.#state
this.#state = remaining === 0 ? 'idle' : 'indexing'
if (this.#state === 'indexing' && prevState === 'idle') {
this.#rateMeasurementStart = Date.now()
}
return {
current: this.#state,
remaining,
entriesPerSecond: this.#rate,
}
}
/**
*
* @param {StorageParam} storage
* @returns {(name: string) => import('random-access-storage')}
*/
static defaultStorage(storage) {
if (typeof storage !== 'string') return storage
const directory = storage
return function createFile(name) {
return new raf(name, { directory })
}
}
}
module.exports = MultiCoreIndexer
/** @param {{ key: Buffer }} core */
function getStorageName(core) {
const id = discoveryKey(core.key).toString('hex')
return [id.slice(0, 2), id.slice(2, 4), id].join('/')
}