Skip to content

Commit

Permalink
chore(TSDB): add hooks for in-memory only tsdb creation (#14734)
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d authored Nov 4, 2024
1 parent 0704f5d commit 1c993f9
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 32 deletions.
95 changes: 69 additions & 26 deletions pkg/storage/stores/shipper/indexshipper/tsdb/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsdb
import (
"context"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
Expand Down Expand Up @@ -111,12 +112,47 @@ func (b *Builder) Build(
name := fmt.Sprintf("%s-%x.staging", index.IndexFilename, rng)
tmpPath := filepath.Join(scratchDir, name)

var writer *index.Creator
writer, err := index.NewFileWriterWithVersion(ctx, b.version, tmpPath)
if err != nil {
return id, err
}

writer, err = index.NewWriterWithVersion(ctx, b.version, tmpPath)
if _, err := b.build(writer, false); err != nil {
return id, err
}

reader, err := index.NewFileReader(tmpPath)
if err != nil {
return id, err
}

from, through := reader.Bounds()

// load the newly compacted index to grab checksum, promptly close
dst := createFn(model.Time(from), model.Time(through), reader.Checksum())

reader.Close()
defer func() {
if err != nil {
os.RemoveAll(tmpPath)
}
}()

if err := chunk_util.EnsureDirectory(filepath.Dir(dst.Path())); err != nil {
return id, err
}
dstPath := dst.Path()
if err := os.Rename(tmpPath, dstPath); err != nil {
return id, err
}

return dst, nil
}

func (b *Builder) build(
writer *index.Creator,
reader bool, // whether to return the ReadCloser of the underlying DB
) (io.ReadCloser, error) {
// TODO(owen-d): multithread

// Sort series
Expand Down Expand Up @@ -155,7 +191,7 @@ func (b *Builder) Build(
// Add symbols
for _, symbol := range symbols {
if err := writer.AddSymbol(symbol); err != nil {
return id, err
return nil, err
}
}

Expand All @@ -165,38 +201,45 @@ func (b *Builder) Build(
s.chunks = s.chunks.Finalize()
}
if err := writer.AddSeries(storage.SeriesRef(i), s.labels, s.fp, s.chunks...); err != nil {
return id, err
return nil, err
}
}

if _, err := writer.Close(false); err != nil {
return id, err
}
return writer.Close(reader)
}

reader, err := index.NewFileReader(tmpPath)
func (b *Builder) BuildInMemory(
ctx context.Context,
// Determines how to create the resulting Identifier and file name.
// This is variable as we use Builder for multiple reasons,
// such as building multi-tenant tsdbs on the ingester
// and per tenant ones during compaction
createFn func(from, through model.Time, checksum uint32) Identifier,
) (id Identifier, data []byte, err error) {
writer, err := index.NewMemWriterWithVersion(ctx, b.version)
if err != nil {
return id, err
return id, nil, err
}

from, through := reader.Bounds()

// load the newly compacted index to grab checksum, promptly close
dst := createFn(model.Time(from), model.Time(through), reader.Checksum())

reader.Close()
defer func() {
if err != nil {
os.RemoveAll(tmpPath)
}
}()
readCloser, err := b.build(writer, true)
if err != nil {
return id, nil, err
}
defer readCloser.Close()

if err := chunk_util.EnsureDirectory(filepath.Dir(dst.Path())); err != nil {
return id, err
data, err = io.ReadAll(readCloser)
if err != nil {
return nil, nil, err
}
dstPath := dst.Path()
if err := os.Rename(tmpPath, dstPath); err != nil {
return id, err

reader, err := index.NewReader(index.RealByteSlice(data))
if err != nil {
return id, nil, err
}
defer reader.Close()

return dst, nil
from, through := reader.Bounds()
id = createFn(model.Time(from), model.Time(through), reader.Checksum())

return id, data, nil
}
38 changes: 33 additions & 5 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
}, nil
}

func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Creator, error) {
// For writing TSDBs using temporary files
func NewFileWriterWithVersion(ctx context.Context, version int, fn string) (*Creator, error) {
dir := filepath.Dir(fn)

df, err := fileutil.OpenDir(dir)
Expand Down Expand Up @@ -243,12 +244,39 @@ func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Creator
return nil, errors.Wrap(err, "sync dir")
}

return newWriter(
ctx,
version,
f,
fP,
fPO,
)
}

// For writing TSDBs in memory
func NewMemWriterWithVersion(ctx context.Context, version int) (*Creator, error) {
return newWriter(
ctx,
version,
NewBufferWriter(),
NewBufferWriter(),
NewBufferWriter(),
)
}

func newWriter(
ctx context.Context,
version int,
fWriter writer,
postingsWriter writer,
postingOffsetsWriter writer,
) (*Creator, error) {
iw := &Creator{
Version: version,
ctx: ctx,
f: f,
fP: fP,
fPO: fPO,
f: fWriter,
fP: postingsWriter,
fPO: postingOffsetsWriter,
stage: idxStageNone,

// Reusable memory.
Expand All @@ -267,7 +295,7 @@ func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Creator

// NewWriter returns a new Writer to the given filename.
func NewWriter(ctx context.Context, indexFormat int, fn string) (*Creator, error) {
return NewWriterWithVersion(ctx, indexFormat, fn)
return NewFileWriterWithVersion(ctx, indexFormat, fn)
}

func (w *Creator) write(bufs ...[]byte) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func TestDecoder_ChunkSamples(t *testing.T) {
},
} {
t.Run(name, func(t *testing.T) {
iw, err := NewWriterWithVersion(context.Background(), FormatV2, filepath.Join(dir, name))
iw, err := NewFileWriterWithVersion(context.Background(), FormatV2, filepath.Join(dir, name))
require.NoError(t, err)

syms := []string{}
Expand Down

0 comments on commit 1c993f9

Please sign in to comment.