Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(TSDB): add hooks for in-memory only tsdb creation #14734

Merged
merged 14 commits into from
Nov 4, 2024
Prev Previous commit
Next Next commit
uses writer, removes mmap from tsdb writing
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
owen-d committed Oct 31, 2024
commit fddaddace593805f60dfb2a9d220da6d0d73a24b
81 changes: 35 additions & 46 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
@@ -111,13 +111,13 @@ type symbolCacheEntry struct {
type Creator struct {
ctx context.Context

// For the main index file.
f *FileWriter
// For the main index.
f writer

// Temporary file for postings.
fP *FileWriter
// Temporary file for posting offsets table.
fPO *FileWriter
// Temporary writer for postings.
fP writer
// Temporary writer for posting offsets table.
fPO writer
cntPO uint64

toc TOC
@@ -130,7 +130,6 @@ type Creator struct {

numSymbols int
symbols *Symbols
symbolFile *fileutil.MmapFile
lastSymbol string
symbolCache map[string]symbolCacheEntry

@@ -273,8 +272,9 @@ func (w *Creator) write(bufs ...[]byte) error {
return w.f.WriteBufs(bufs...)
}

func (w *Creator) writeAt(buf []byte, pos uint64) error {
return w.f.WriteAt(buf, pos)
func (w *Creator) writeAt(buf []byte, pos int64) error {
_, err := w.f.WriteAt(buf, pos)
return err
}

func (w *Creator) addPadding(size int) error {
@@ -594,7 +594,7 @@ func (w *Creator) finishSymbols() error {
w.buf1.Reset()
w.buf1.PutBE32int(int(symbolTableSize))
w.buf1.PutBE32int(w.numSymbols)
if err := w.writeAt(w.buf1.Get(), w.toc.Symbols); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(w.toc.Symbols)); err != nil {
return err
}

@@ -608,20 +608,22 @@ func (w *Creator) finishSymbols() error {
return err
}

sf, err := fileutil.OpenMmapFile(w.f.name)
symbolBytes, err := w.f.Bytes()
if err != nil {
return err
}
w.symbolFile = sf
hash := crc32.Checksum(w.symbolFile.Bytes()[w.toc.Symbols+4:hashPos], castagnoliTable)
hash := crc32.Checksum(symbolBytes[w.toc.Symbols+4:hashPos], castagnoliTable)
w.buf1.Reset()
w.buf1.PutBE32(hash)
if err := w.writeAt(w.buf1.Get(), hashPos); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(hashPos)); err != nil {
return err
}

// Load in the symbol table efficiently for the rest of the index writing.
w.symbols, err = NewSymbols(RealByteSlice(w.symbolFile.Bytes()), w.Version, int(w.toc.Symbols))
// Now that we've calculated and added the checksum on disk, add it to the
// pre-checksummed bytes in memory so we can use this later,
// loading the symbol table efficiently for the rest of the index writing.
copy(symbolBytes[hashPos:], w.buf1.Get())
w.symbols, err = NewSymbols(RealByteSlice(symbolBytes), w.Version, int(w.toc.Symbols))
if err != nil {
return errors.Wrap(err, "read symbols")
}
@@ -633,14 +635,12 @@ func (w *Creator) writeLabelIndices() error {
return err
}

// Find all the label values in the tmp posting offset table.
f, err := fileutil.OpenMmapFile(w.fPO.name)
pOffsets, err := w.fPO.Bytes()
if err != nil {
return err
}
defer f.Close()

d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.fPO.Pos())))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(pOffsets), int(w.fPO.Pos())))
cnt := w.cntPO
current := []byte{}
values := []uint32{}
@@ -723,7 +723,7 @@ func (w *Creator) writeLabelIndex(name string, values []uint32) error {
return errors.Errorf("label index size exceeds 4 bytes: %d", l)
}
w.buf1.PutBE32int(int(l))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(startPos)); err != nil {
return err
}

@@ -767,7 +767,7 @@ func (w *Creator) writeLabelIndexesOffsetTable() error {
return errors.Errorf("label indexes offset table size exceeds 4 bytes: %d", l)
}
w.buf1.PutBE32int(int(l))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(startPos)); err != nil {
return err
}

@@ -801,16 +801,12 @@ func (w *Creator) writePostingsOffsetTable() error {
return err
}

f, err := fileutil.OpenMmapFile(w.fPO.name)
pOffsets, err := w.fPO.Bytes()
if err != nil {
return err
}
defer func() {
if f != nil {
f.Close()
}
}()
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.fPO.Pos())))

d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(pOffsets), int(w.fPO.Pos())))
cnt := w.cntPO
for d.Err() == nil && cnt > 0 {
w.buf1.Reset()
@@ -828,11 +824,6 @@ func (w *Creator) writePostingsOffsetTable() error {
return d.Err()
}

// Cleanup temporary file.
if err := f.Close(); err != nil {
return err
}
f = nil
if err := w.fPO.Close(); err != nil {
return err
}
@@ -848,7 +839,7 @@ func (w *Creator) writePostingsOffsetTable() error {
return errors.Errorf("postings offset table size exceeds 4 bytes: %d", l)
}
w.buf1.PutBE32int(int(l))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(startPos)); err != nil {
return err
}

@@ -922,15 +913,15 @@ func (w *Creator) writePostingsToTmpFiles() error {
if err := w.f.Flush(); err != nil {
return err
}
f, err := fileutil.OpenMmapFile(w.f.name)

b, err := w.f.Bytes()
if err != nil {
return err
}
defer f.Close()

// Write out the special all posting.
offsets := []uint32{}
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.toc.LabelIndices)))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(b), int(w.toc.LabelIndices)))
d.Skip(int(w.toc.Series))
for d.Len() > 0 {
d.ConsumePadding()
@@ -976,7 +967,7 @@ func (w *Creator) writePostingsToTmpFiles() error {
// Label name -> label value -> positions.
postings := map[uint32]map[uint32][]uint32{}

d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.toc.LabelIndices)))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(b), int(w.toc.LabelIndices)))
d.Skip(int(w.toc.Series))
for d.Len() > 0 {
d.ConsumePadding()
@@ -1086,11 +1077,14 @@ func (w *Creator) writePostings() error {
if err := w.fP.Flush(); err != nil {
return err
}
if _, err := w.fP.f.Seek(0, 0); err != nil {
// NB(owen-d): inefficient, but avoids complexity `Pos()` altering `Seek` logic.
postings, err := w.fP.Bytes()
if err != nil {
return err
}

// Don't need to calculate a checksum, so can copy directly.
n, err := io.CopyBuffer(w.f, w.fP.f, make([]byte, 1<<20))
n, err := io.CopyBuffer(w.f, bytes.NewReader(postings), make([]byte, 1<<20))
if err != nil {
return err
}
@@ -1123,11 +1117,6 @@ func (w *Creator) Close() error {
// Even if this fails, we need to close all the files.
ensureErr := w.ensureStage(idxStageDone)

if w.symbolFile != nil {
if err := w.symbolFile.Close(); err != nil {
return err
}
}
if w.fP != nil {
if err := w.fP.Close(); err != nil {
return err