-
Notifications
You must be signed in to change notification settings - Fork 20
/
lucene_codec.h
375 lines (299 loc) · 17.3 KB
/
lucene_codec.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
#pragma once
#include "codecs.h"
static_assert(sizeof(Trinity::isrc_docid_t) <= sizeof(uint32_t));
// You should edit Makefile accordingly. That is, Makefile's LUCENE_ENCODING_SCHEME should set in accordance with the LUCENE_USE_X macro set here
// You should probably use streaming vbyte, by defining LUCENE_USE_STREAMVBYTE and setting LUCENE_ENCODING_SCHEME to streamvbyte
// It's not the default here because you may have Lucene indices created alread using Lucene codec and PFOR encoding scheme.
#if 0
// Faster than both PFOR and MaskedVByte
// https://github.com/lemire/streamvbyte and https://lemire.me/blog/2017/09/27/stream-vbyte-breaking-new-speed-records-for-integer-compression/
//
// Results in larger indices compared to PFOR though
#define LUCENE_USE_STREAMVBYTE 1
#elif 0
// Slower than both PFOR and StreamingVByte
// http://maskedvbyte.org
#define LUCENE_USE_MASKEDVBYTE 1
#else
// Smaller indices in terms of size, but slower than StreamVByte
// https://github.com/lemire/FastPFor
#include <ext/FastPFor/headers/fastpfor.h>
#define LUCENE_USE_FASTPFOR 1
// if enabled, we will use thread-local storage for those so that we can reuse them
// as opposed to creating and destroying new FastPFor objects very frequently, which is
// expensive especially during merge where we would otherwise need to do this potentially thousands of times
#define LUCENE_USE_FASTPFOR_TL 1
#endif
#if defined(LUCENE_USE_FASTPFOR) && defined(LUCENE_USE_FASTPFOR_TL)
FastPForLib::FastPFor<4> *_acquire_tl_fastpfor();
void _release_tl_fastpfor(FastPForLib::FastPFor<4> *);
#endif
namespace Trinity {
namespace Codecs {
namespace Lucene {
#define LUCENE_SKIPLIST_SEEK_EARLY 1
#define LUCENE_LAZY_SKIPLIST_INIT 1
// We can't use this encoding idea because we can't handle deltas >=
// (DocIDsEND>>1), so that if e.g the first document ID for a term
// is (1<<31) + 15, this will fail. However, see IndexSource::translate_docid() for how that would work with docIDs translations
// #define LUCENE_ENCODE_FREQ1_DOCDELTA 1
#ifdef LUCENE_USE_MASKEDVBYTE
static constexpr size_t BLOCK_SIZE{64};
#elif defined(LUCENE_USE_STREAMVBYTE)
// XXX: 256 may make more sense here
static constexpr size_t BLOCK_SIZE{128};
#else
static constexpr size_t BLOCK_SIZE{128};
#endif
static constexpr size_t SKIPLIST_STEP{1}; // every (SKIPLIST_STEP * BLOCK_SIZE) documents
struct IndexSession final
: public Trinity::Codecs::IndexSession {
#ifdef LUCENE_USE_FASTPFOR
// handy for merge()
FastPForLib::FastPFor<4> *forUtil;
#ifndef LUCENE_USE_FASTPFOR_TL
std::unique_ptr<FastPForLib::FastPFor<4>> forUtil_local;
#endif
#endif
// TODO: support for periodic flushing
// i.e in either Encoder::end_term() or Encoder::end_document()
IOBuffer positionsOut;
uint32_t positionsOutFlushed;
int positionsOutFd;
uint32_t flushFreq;
// private
void flush_positions_data();
IndexSession(const char *bp)
: Trinity::Codecs::IndexSession{bp,
unsigned(Capabilities::AppendIndexChunk) |
unsigned(Capabilities::Merge)}
, positionsOutFlushed{0}
, positionsOutFd{-1}
, flushFreq{0} {
#ifdef LUCENE_USE_FASTPFOR
#ifdef LUCENE_USE_FASTPFOR_TL
forUtil = _acquire_tl_fastpfor();
#else
forUtil_local.reset(new FastPForLib::FastPFor<4>());
forUtil = forUtil_local.get();
#endif
#endif
}
~IndexSession() {
if (positionsOutFd != -1) {
close(positionsOutFd);
}
#if defined(LUCENE_USE_FASTPFOR) && defined(LUCENE_USE_FASTPFOR_TL)
_release_tl_fastpfor(forUtil);
#endif
}
constexpr void set_flush_freq(const uint32_t f) {
flushFreq = f;
}
void begin() override final;
void end() override final;
Trinity::Codecs::Encoder *new_encoder() override final;
strwlen8_t codec_identifier() override final {
return "LUCENE"_s8;
}
range32_t append_index_chunk(const Trinity::Codecs::AccessProxy *, const term_index_ctx srcTCTX) override final;
void merge(merge_participant *, const uint16_t, Trinity::Codecs::Encoder *) override final;
};
class Encoder final
: public Trinity::Codecs::Encoder {
private:
struct skiplist_entry final {
// offset to the index relative to the term base offset
uint32_t indexOffset;
// previous to the first document id in the block
// i.e last document ID in the previous block
isrc_docid_t lastDocID;
// offset to the hits relative to the term base offset
uint32_t lastHitsBlockOffset;
uint32_t totalDocumentsSoFar;
uint32_t lastHitsBlockTotalHits;
uint16_t curHitsBlockHits;
};
private:
std::vector<skiplist_entry> skiplist;
isrc_docid_t lastDocID;
uint32_t docDeltas[BLOCK_SIZE], docFreqs[BLOCK_SIZE], hitPayloadSizes[BLOCK_SIZE], hitPosDeltas[BLOCK_SIZE];
uint32_t buffered, totalHits, sumHits;
uint32_t termDocuments;
tokenpos_t lastPosition;
uint32_t termIndexOffset, termPositionsOffset;
#ifdef LUCENE_USE_FASTPFOR
#ifndef LUCENE_USE_FASTPFOR_TL
std::unique_ptr<FastPForLib::FastPFor<4>> forUtil_local;
#endif
FastPForLib::FastPFor<4> *forUtil;
#endif
IOBuffer payloadsBuf;
uint32_t skiplistCountdown, lastHitsBlockOffset, lastHitsBlockTotalHits;
skiplist_entry cur_block;
private:
void output_block();
public:
Encoder(Trinity::Codecs::IndexSession *s)
: Trinity::Codecs::Encoder{s} {
#ifdef LUCENE_USE_FASTPFOR
#ifdef LUCENE_USE_FASTPFOR_TL
forUtil = _acquire_tl_fastpfor();
#else
forUtil_local.reset(new FastPForLib::FastPFor<4>());
forUtil = forUtil_local.get();
#endif
#endif
}
#ifdef LUCENE_USE_FASTPFOR
Encoder(Trinity::Codecs::IndexSession *const s, FastPForLib::FastPFor<4> *const p)
: Trinity::Codecs::Encoder{s}, forUtil{p} {
//
}
#endif
#if defined(LUCENE_USE_FASTPFOR) && defined(LUCENE_USE_FASTPFOR_TL)
~Encoder() {
_release_tl_fastpfor(forUtil);
}
#endif
void begin_term() override final;
void begin_document(const isrc_docid_t documentID) override final;
void new_hit(const uint32_t pos, const range_base<const uint8_t *, const uint8_t> payload) override final;
inline void new_position(const uint32_t pos) {
new_hit(pos, {});
}
void end_document() override final;
void end_term(term_index_ctx *tctx) override final;
};
struct AccessProxy final
: public Trinity::Codecs::AccessProxy {
const uint8_t *hitsDataPtr;
uint64_t hitsDataSize{0};
AccessProxy(const char *bp, const uint8_t *p, const uint8_t *hd = nullptr);
~AccessProxy();
strwlen8_t codec_identifier() override final {
return "LUCENE"_s8;
}
Trinity::Codecs::Decoder *new_decoder(const term_index_ctx &tctx) override final;
};
class Decoder;
struct PostingsListIterator final
: public Trinity::Codecs::PostingsListIterator {
friend class Decoder;
protected:
const uint8_t *p;
const uint8_t *hdp;
const uint8_t *payloadsIt, *payloadsEnd;
isrc_docid_t lastDocID;
uint32_t lastPosition{0};
uint32_t docsLeft, hitsLeft;
uint16_t docsIndex, hitsIndex;
uint16_t bufferedDocs, bufferedHits;
uint32_t skippedHits;
uint32_t docDeltas[BLOCK_SIZE], docFreqs[BLOCK_SIZE], hitsPositionDeltas[BLOCK_SIZE], hitsPayloadLengths[BLOCK_SIZE];
uint32_t skipListIdx;
isrc_docid_t curSkipListLastDocID{DocIDsEND};
public:
inline isrc_docid_t next() override final;
inline isrc_docid_t advance(const isrc_docid_t) override final;
inline void materialize_hits(DocWordsSpace *dwspace, term_hit *out) override final;
PostingsListIterator(Decoder *const d)
: Trinity::Codecs::PostingsListIterator{reinterpret_cast<Trinity::Codecs::Decoder *>(d)} {
}
};
class Decoder final
: public Trinity::Codecs::Decoder {
friend struct PostingsListIterator;
private:
// Pretty much the only shared state among iterators created by
// this decoder is the skiplist, which may be initialized once and owned by the Decoder.
struct skiplist_entry final {
uint32_t indexOffset;
isrc_docid_t lastDocID;
uint32_t lastHitsBlockOffset;
uint32_t totalDocumentsSoFar;
uint32_t totalHitsSoFar;
uint16_t curHitsBlockHits;
};
protected:
void next(PostingsListIterator *);
void advance(PostingsListIterator *, const isrc_docid_t);
void materialize_hits(PostingsListIterator *, DocWordsSpace *, term_hit *);
private:
const uint8_t *chunkEnd;
#ifdef LUCENE_LAZY_SKIPLIST_INIT
uint16_t skiplistSize;
#endif
#ifdef LUCENE_USE_FASTPFOR
// when we are decoding during merge, this is apparently expensive now because
// we wind up creating and destroying vectors
// so we are going to allow for reuse - a thread local seems like a good choice here
#ifdef LUCENE_USE_FASTPFOR_TL
std::unique_ptr<FastPForLib::FastPFor<4>> forUtil_local;
#endif
FastPForLib::FastPFor<4> *forUtil;
#endif
struct skiplist_struct final {
skiplist_entry *data;
uint16_t size{0};
~skiplist_struct() noexcept {
if (size) {
std::free(data);
}
}
} skiplist;
const uint8_t *postingListBase, *hitsBase;
uint32_t totalDocuments, totalHits;
private:
void init_skiplist(const uint16_t);
uint32_t skiplist_search(PostingsListIterator *, const isrc_docid_t) const noexcept;
void refill_hits(PostingsListIterator *);
void refill_documents(PostingsListIterator *);
[[gnu::always_inline]] void update_curdoc(PostingsListIterator *const __restrict__ it) noexcept {
const auto docsIndex{it->docsIndex};
auto & curDocument{it->curDocument};
curDocument.id = it->lastDocID + it->docDeltas[docsIndex];
it->freq = it->docFreqs[docsIndex];
}
inline void finalize(PostingsListIterator *const it) noexcept {
it->curDocument.id = DocIDsEND;
}
void decode_next_block(PostingsListIterator *);
void skip_hits(PostingsListIterator *, const uint32_t);
public:
void init(const term_index_ctx &tctx, Trinity::Codecs::AccessProxy *access) override final;
Trinity::Codecs::PostingsListIterator *new_iterator() override final;
#ifdef LUCENE_USE_FASTPFOR
Decoder() {
#ifdef LUCENE_USE_FASTPFOR_TL
forUtil = _acquire_tl_fastpfor();
#else
forUtil_local.reset(new FastPForLib::FastPFor<4>());
forUtil = forUtil_local.get();
#endif
}
Decoder(decltype(forUtil) p)
: forUtil{p} {
//
}
#endif
#if defined(LUCENE_USE_FASTPFOR) && defined(LUCENE_USE_FASTPFOR_TL)
~Decoder() {
_release_tl_fastpfor(forUtil);
}
#endif
};
isrc_docid_t PostingsListIterator::next() {
static_cast<Codecs::Lucene::Decoder *>(dec)->next(this);
return curDocument.id;
}
isrc_docid_t PostingsListIterator::advance(const isrc_docid_t target) {
static_cast<Codecs::Lucene::Decoder *>(dec)->advance(this, target);
return curDocument.id;
}
void PostingsListIterator::materialize_hits(DocWordsSpace *dwspace, term_hit *out) {
static_cast<Codecs::Lucene::Decoder *>(dec)->materialize_hits(this, dwspace, out);
}
} // namespace Lucene
} // namespace Codecs
} // namespace Trinity