Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(TSDB): add hooks for in-memory only tsdb creation #14734

Merged
merged 14 commits into from
Nov 4, 2024
95 changes: 69 additions & 26 deletions pkg/storage/stores/shipper/indexshipper/tsdb/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsdb
import (
"context"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
Expand Down Expand Up @@ -111,12 +112,47 @@ func (b *Builder) Build(
name := fmt.Sprintf("%s-%x.staging", index.IndexFilename, rng)
tmpPath := filepath.Join(scratchDir, name)

var writer *index.Creator
writer, err := index.NewFileWriterWithVersion(ctx, b.version, tmpPath)
if err != nil {
return id, err
}

writer, err = index.NewWriterWithVersion(ctx, b.version, tmpPath)
if _, err := b.build(writer, false); err != nil {
return id, err
}

reader, err := index.NewFileReader(tmpPath)
if err != nil {
return id, err
}

from, through := reader.Bounds()

// load the newly compacted index to grab checksum, promptly close
dst := createFn(model.Time(from), model.Time(through), reader.Checksum())

reader.Close()
defer func() {
if err != nil {
os.RemoveAll(tmpPath)
}
}()

if err := chunk_util.EnsureDirectory(filepath.Dir(dst.Path())); err != nil {
return id, err
}
dstPath := dst.Path()
if err := os.Rename(tmpPath, dstPath); err != nil {
return id, err
}

return dst, nil
}

func (b *Builder) build(
writer *index.Creator,
reader bool, // whether to return the ReadCloser of the underlying DB
) (io.ReadCloser, error) {
// TODO(owen-d): multithread

// Sort series
Expand Down Expand Up @@ -155,7 +191,7 @@ func (b *Builder) Build(
// Add symbols
for _, symbol := range symbols {
if err := writer.AddSymbol(symbol); err != nil {
return id, err
return nil, err
}
}

Expand All @@ -165,38 +201,45 @@ func (b *Builder) Build(
s.chunks = s.chunks.Finalize()
}
if err := writer.AddSeries(storage.SeriesRef(i), s.labels, s.fp, s.chunks...); err != nil {
return id, err
return nil, err
}
}

if _, err := writer.Close(false); err != nil {
return id, err
}
return writer.Close(reader)
}

reader, err := index.NewFileReader(tmpPath)
func (b *Builder) BuildInMemory(
ctx context.Context,
// Determines how to create the resulting Identifier and file name.
// This is variable as we use Builder for multiple reasons,
// such as building multi-tenant tsdbs on the ingester
// and per tenant ones during compaction
createFn func(from, through model.Time, checksum uint32) Identifier,
) (id Identifier, data []byte, err error) {
writer, err := index.NewMemWriterWithVersion(ctx, b.version)
if err != nil {
return id, err
return id, nil, err
}

from, through := reader.Bounds()

// load the newly compacted index to grab checksum, promptly close
dst := createFn(model.Time(from), model.Time(through), reader.Checksum())

reader.Close()
defer func() {
if err != nil {
os.RemoveAll(tmpPath)
}
}()
readCloser, err := b.build(writer, true)
if err != nil {
return id, nil, err
}
defer readCloser.Close()

if err := chunk_util.EnsureDirectory(filepath.Dir(dst.Path())); err != nil {
return id, err
data, err = io.ReadAll(readCloser)
if err != nil {
return nil, nil, err
}
dstPath := dst.Path()
if err := os.Rename(tmpPath, dstPath); err != nil {
return id, err

reader, err := index.NewReader(index.RealByteSlice(data))
if err != nil {
return id, nil, err
}
defer reader.Close()

return dst, nil
from, through := reader.Bounds()
id = createFn(model.Time(from), model.Time(through), reader.Checksum())

return id, data, nil
}
38 changes: 33 additions & 5 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
}, nil
}

func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Creator, error) {
// For writing TSDBs using temporary files
func NewFileWriterWithVersion(ctx context.Context, version int, fn string) (*Creator, error) {
dir := filepath.Dir(fn)

df, err := fileutil.OpenDir(dir)
Expand Down Expand Up @@ -243,12 +244,39 @@ func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Creator
return nil, errors.Wrap(err, "sync dir")
}

return newWriter(
ctx,
version,
f,
fP,
fPO,
)
}

// For writing TSDBs in memory
func NewMemWriterWithVersion(ctx context.Context, version int) (*Creator, error) {
return newWriter(
ctx,
version,
NewBufferWriter(),
NewBufferWriter(),
NewBufferWriter(),
)
}

func newWriter(
ctx context.Context,
version int,
fWriter writer,
postingsWriter writer,
postingOffsetsWriter writer,
) (*Creator, error) {
iw := &Creator{
Version: version,
ctx: ctx,
f: f,
fP: fP,
fPO: fPO,
f: fWriter,
fP: postingsWriter,
fPO: postingOffsetsWriter,
stage: idxStageNone,

// Reusable memory.
Expand All @@ -267,7 +295,7 @@ func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Creator

// NewWriter returns a new Writer to the given filename.
func NewWriter(ctx context.Context, indexFormat int, fn string) (*Creator, error) {
return NewWriterWithVersion(ctx, indexFormat, fn)
return NewFileWriterWithVersion(ctx, indexFormat, fn)
}

func (w *Creator) write(bufs ...[]byte) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func TestDecoder_ChunkSamples(t *testing.T) {
},
} {
t.Run(name, func(t *testing.T) {
iw, err := NewWriterWithVersion(context.Background(), FormatV2, filepath.Join(dir, name))
iw, err := NewFileWriterWithVersion(context.Background(), FormatV2, filepath.Join(dir, name))
require.NoError(t, err)

syms := []string{}
Expand Down
Loading