diff --git a/pkg/storage/wal/index/buffer.go b/pkg/storage/wal/index/buffer.go index c75665b87eba..e8ed9b55a05c 100644 --- a/pkg/storage/wal/index/buffer.go +++ b/pkg/storage/wal/index/buffer.go @@ -86,6 +86,11 @@ func (fw *BufferWriter) Close() error { return nil } +func (fw *BufferWriter) Reset() { + fw.pos = 0 + fw.buf.Reset() +} + func (fw *BufferWriter) Remove() error { return nil } diff --git a/pkg/storage/wal/index/index.go b/pkg/storage/wal/index/index.go index 28d57ef55f4d..29436bd2044b 100644 --- a/pkg/storage/wal/index/index.go +++ b/pkg/storage/wal/index/index.go @@ -118,8 +118,6 @@ type PostingsEncoder func(*encoding.Encbuf, []uint32) error // Writer implements the IndexWriter interface for the standard // serialization format. type Writer struct { - ctx context.Context - // For the main index file. f *BufferWriter @@ -197,9 +195,8 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { // NewWriter returns a new Writer to the given filename. It serializes data in format version 2. // It uses the given encoder to encode each postings list. -func NewWriterWithEncoder(ctx context.Context, encoder PostingsEncoder) (*Writer, error) { +func NewWriterWithEncoder(encoder PostingsEncoder) (*Writer, error) { iw := &Writer{ - ctx: ctx, f: NewBufferWriter(), fP: NewBufferWriter(), fPO: NewBufferWriter(), @@ -222,8 +219,8 @@ func NewWriterWithEncoder(ctx context.Context, encoder PostingsEncoder) (*Writer // NewWriter creates a new index writer using the default encoder. See // NewWriterWithEncoder. -func NewWriter(ctx context.Context) (*Writer, error) { - return NewWriterWithEncoder(ctx, EncodePostingsRaw) +func NewWriter() (*Writer, error) { + return NewWriterWithEncoder(EncodePostingsRaw) } func (w *Writer) write(bufs ...[]byte) error { @@ -242,15 +239,36 @@ func (w *Writer) Buffer() ([]byte, io.Closer, error) { return w.f.Buffer() } +func (w *Writer) Reset() error { + w.f.Reset() + w.fP.Reset() + w.fPO.Reset() + w.buf1.Reset() + w.buf2.Reset() + w.stage = idxStageNone + w.toc = TOC{} + w.postingsStart = 0 + w.numSymbols = 0 + w.symbols = nil + w.symbolFile = nil + w.lastSymbol = "" + w.symbolCache = make(map[string]symbolCacheEntry, 1<<8) + w.labelIndexes = w.labelIndexes[:0] + w.labelNames = make(map[string]uint64, 1<<8) + w.lastSeries = nil + w.lastSeriesRef = 0 + w.lastChunkRef = 0 + w.cntPO = 0 + w.crc32.Reset() + if err := w.writeMeta(); err != nil { + return err + } + return nil +} + // ensureStage handles transitions between write stages and ensures that IndexWriter // methods are called in an order valid for the implementation. func (w *Writer) ensureStage(s indexWriterStage) error { - select { - case <-w.ctx.Done(): - return w.ctx.Err() - default: - } - if w.stage == s { return nil } @@ -691,7 +709,6 @@ func (w *Writer) writePostingsOffsetTable() error { if err := w.fPO.Remove(); err != nil { return err } - w.fPO = nil err = w.writeLengthAndHash(startPos) if err != nil { @@ -854,11 +871,7 @@ func (w *Writer) writePostingsToTmpFiles() error { } } } - select { - case <-w.ctx.Done(): - return w.ctx.Err() - default: - } + } return nil } @@ -936,7 +949,6 @@ func (w *Writer) writePostings() error { if err := w.fP.Remove(); err != nil { return err } - w.fP = nil return nil } diff --git a/pkg/storage/wal/index/index_test.go b/pkg/storage/wal/index/index_test.go index 76089ebf6ff0..bb1f136b319f 100644 --- a/pkg/storage/wal/index/index_test.go +++ b/pkg/storage/wal/index/index_test.go @@ -135,7 +135,7 @@ func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, func TestIndexRW_Create_Open(t *testing.T) { // An empty index must still result in a readable file. - iw, err := NewWriter(context.Background()) + iw, err := NewWriter() require.NoError(t, err) require.NoError(t, iw.Close()) @@ -160,7 +160,7 @@ func TestIndexRW_Postings(t *testing.T) { labels: labels.FromStrings("a", "1", "b", strconv.Itoa(i)), }) } - ir, buf, _ := createReader(ctx, t, input) + ir, buf, _ := createReader(t, input) p, err := ir.Postings(ctx, "a", "1") require.NoError(t, err) @@ -271,7 +271,7 @@ func TestPostingsMany(t *testing.T) { labels: labels.FromStrings("i", v, "foo", "bar"), }) } - ir, _, symbols := createReader(ctx, t, input) + ir, _, symbols := createReader(t, input) cases := []struct { in []string @@ -353,7 +353,7 @@ func TestPersistence_index_e2e(t *testing.T) { }) } - ir, _, _ := createReader(ctx, t, input) + ir, _, _ := createReader(t, input) // Population procedure as done by compaction. var ( @@ -435,7 +435,7 @@ func TestPersistence_index_e2e(t *testing.T) { } func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) { - w, err := NewWriter(context.Background()) + w, err := NewWriter() require.NoError(t, err) require.NoError(t, w.AddSymbol("__name__")) @@ -523,7 +523,7 @@ func BenchmarkReader_ShardedPostings(b *testing.B) { labels: labels.FromStrings("const", fmt.Sprintf("%10d", 1), "unique", fmt.Sprintf("%10d", i)), }) } - ir, _, _ := createReader(ctx, b, input) + ir, _, _ := createReader(b, input) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -540,7 +540,7 @@ func TestDecoder_Postings_WrongInput(t *testing.T) { } func TestChunksRefOrdering(t *testing.T) { - idx, err := NewWriter(context.Background()) + idx, err := NewWriter() require.NoError(t, err) require.NoError(t, idx.AddSymbol("1")) @@ -558,7 +558,7 @@ func TestChunksRefOrdering(t *testing.T) { } func TestChunksTimeOrdering(t *testing.T) { - idx, err := NewWriter(context.Background()) + idx, err := NewWriter() require.NoError(t, err) require.NoError(t, idx.AddSymbol("1")) @@ -585,10 +585,10 @@ func TestChunksTimeOrdering(t *testing.T) { // createFileReader creates a temporary index file. It writes the provided input to this file. // It returns a Reader for this file, the file's name, and the symbol map. -func createReader(ctx context.Context, tb testing.TB, input indexWriterSeriesSlice) (*Reader, []byte, map[string]struct{}) { +func createReader(tb testing.TB, input indexWriterSeriesSlice) (*Reader, []byte, map[string]struct{}) { tb.Helper() - iw, err := NewWriter(ctx) + iw, err := NewWriter() require.NoError(tb, err) symbols := map[string]struct{}{} diff --git a/pkg/storage/wal/segment.go b/pkg/storage/wal/segment.go index 4c1d134fe2ca..8ced5da1cef4 100644 --- a/pkg/storage/wal/segment.go +++ b/pkg/storage/wal/segment.go @@ -8,8 +8,7 @@ import ( "fmt" "io" "sort" - - "github.com/dolthub/swiss" + "sync" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -24,8 +23,15 @@ import ( // LOKW is the magic number for the Loki WAL format. var ( - magicNumber = uint32(0x4C4F4B57) - magicBuf [4]byte + magicNumber = uint32(0x4C4F4B57) + magicBuf [4]byte + streamSegmentPool = sync.Pool{ + New: func() interface{} { + return &streamSegment{ + entries: make([]*logproto.Entry, 0, 4096), + } + }, + } ) func init() { @@ -37,9 +43,10 @@ type streamID struct { } type SegmentWriter struct { - streams *swiss.Map[streamID, *streamSegment] + streams map[streamID]*streamSegment buf1 encoding.Encbuf inputSize int64 + idxWriter *index.Writer } type streamSegment struct { @@ -49,12 +56,21 @@ type streamSegment struct { maxt int64 } +func (s *streamSegment) Reset() { + s.entries = s.entries[:0] +} + // NewWalSegmentWriter creates a new WalSegmentWriter. -func NewWalSegmentWriter() *SegmentWriter { - return &SegmentWriter{ - streams: swiss.NewMap[streamID, *streamSegment](64), - buf1: encoding.EncWith(make([]byte, 0, 4)), +func NewWalSegmentWriter() (*SegmentWriter, error) { + idxWriter, err := index.NewWriter() + if err != nil { + return nil, err } + return &SegmentWriter{ + streams: make(map[streamID]*streamSegment, 64), + buf1: encoding.EncWith(make([]byte, 0, 4)), + idxWriter: idxWriter, + }, nil } // Labels are passed a string `{foo="bar",baz="qux"}` `{foo="foo",baz="foo"}`. labels.Labels => Symbols foo, baz , qux @@ -66,22 +82,18 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels b.inputSize += int64(len(e.Line)) } id := streamID{labels: labelsString, tenant: tenantID} - s, ok := b.streams.Get(id) + s, ok := b.streams[id] if !ok { if lbls.Get(tsdb.TenantLabel) == "" { lbls = labels.NewBuilder(lbls).Set(tsdb.TenantLabel, tenantID).Labels() } - s = &streamSegment{ - // todo: should be pooled. - // prometheus bucketed pool - // https://pkg.go.dev/github.com/prometheus/prometheus/util/pool - entries: make([]*logproto.Entry, 0, 64), - lbls: lbls, - tenantID: tenantID, - } + s = streamSegmentPool.Get().(*streamSegment) + s.Reset() + s.lbls = lbls + s.tenantID = tenantID s.maxt = entries[len(entries)-1].Timestamp.UnixNano() s.entries = append(s.entries, entries...) - b.streams.Put(id, s) + b.streams[id] = s return } @@ -105,14 +117,17 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) { var ( total int64 - streams = make([]*streamSegment, 0, b.streams.Count()) + streams = make([]*streamSegment, 0, len(b.streams)) ) // Collect all streams and sort them by tenantID and labels. - b.streams.Iter(func(k streamID, v *streamSegment) bool { - streams = append(streams, v) - return false - }) + for _, s := range b.streams { + if len(s.entries) == 0 { + continue + } + streams = append(streams, s) + } + sort.Slice(streams, func(i, j int) bool { if streams[i].tenantID != streams[j].tenantID { return streams[i].tenantID < streams[j].tenantID @@ -120,7 +135,7 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) { return labels.Compare(streams[i].lbls, streams[j].lbls) < 0 }) - idxw, err := index.NewWriter(context.TODO()) + err := b.idxWriter.Reset() if err != nil { return total, err } @@ -143,7 +158,7 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) { // Add symbols for _, symbol := range symbols { - if err := idxw.AddSymbol(symbol); err != nil { + if err := b.idxWriter.AddSymbol(symbol); err != nil { return total, err } } @@ -163,7 +178,7 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) { if err != nil { return total, err } - err = idxw.AddSeries(storage.SeriesRef(i), s.lbls, chunks.Meta{ + err = b.idxWriter.AddSeries(storage.SeriesRef(i), s.lbls, chunks.Meta{ MinTime: s.entries[0].Timestamp.UnixNano(), MaxTime: s.entries[len(s.entries)-1].Timestamp.UnixNano(), Ref: chunks.NewChunkRef(uint64(total), uint64(n)), @@ -175,11 +190,11 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) { } - if err := idxw.Close(); err != nil { + if err := b.idxWriter.Close(); err != nil { return total, err } - buf, closer, err := idxw.Buffer() + buf, closer, err := b.idxWriter.Buffer() if err != nil { return total, err } @@ -226,7 +241,11 @@ func (s *streamSegment) WriteTo(w io.Writer) (n int64, err error) { // Reset clears the writer. // After calling Reset, the writer can be reused. func (b *SegmentWriter) Reset() { - b.streams.Clear() + for _, s := range b.streams { + s := s + streamSegmentPool.Put(s) + } + b.streams = make(map[streamID]*streamSegment, 64) b.buf1.Reset() b.inputSize = 0 } diff --git a/pkg/storage/wal/segment_test.go b/pkg/storage/wal/segment_test.go index f1755c975abb..ddcc7afc16b3 100644 --- a/pkg/storage/wal/segment_test.go +++ b/pkg/storage/wal/segment_test.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "sort" + "sync" "testing" "time" @@ -104,7 +105,8 @@ func TestWalSegmentWriter_Append(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() // Create a new WalSegmentWriter - w := NewWalSegmentWriter() + w, err := NewWalSegmentWriter() + require.NoError(t, err) // Append the entries for _, batch := range tt.batches { for _, stream := range batch { @@ -116,7 +118,7 @@ func TestWalSegmentWriter_Append(t *testing.T) { require.NotEmpty(t, tt.expected, "expected entries are empty") // Check the entries for _, expected := range tt.expected { - stream, ok := w.streams.Get(streamID{labels: expected.labels, tenant: expected.tenant}) + stream, ok := w.streams[streamID{labels: expected.labels, tenant: expected.tenant}] require.True(t, ok) lbs, err := syntax.ParseLabels(expected.labels) require.NoError(t, err) @@ -130,7 +132,8 @@ func TestWalSegmentWriter_Append(t *testing.T) { } func TestMultiTenantWrite(t *testing.T) { - w := NewWalSegmentWriter() + w, err := NewWalSegmentWriter() + require.NoError(t, err) dst := bytes.NewBuffer(nil) lbls := []labels.Labels{ @@ -199,7 +202,8 @@ func TestCompression(t *testing.T) { } func testCompression(t *testing.T, maxInputSize int64) { - w := NewWalSegmentWriter() + w, err := NewWalSegmentWriter() + require.NoError(t, err) dst := bytes.NewBuffer(nil) files := testdata.Files() lbls := []labels.Labels{} @@ -253,3 +257,106 @@ func testCompression(t *testing.T, maxInputSize int64) { } t.Logf("Series sizes: [%s]\n", sizesString) } + +func TestReset(t *testing.T) { + w, err := NewWalSegmentWriter() + require.NoError(t, err) + dst := bytes.NewBuffer(nil) + + w.Append("tenant", "foo", labels.FromStrings("container", "foo", "namespace", "dev"), []*push.Entry{ + {Timestamp: time.Unix(0, 0), Line: "Entry 1"}, + {Timestamp: time.Unix(1, 0), Line: "Entry 2"}, + {Timestamp: time.Unix(2, 0), Line: "Entry 3"}, + }) + + n, err := w.WriteTo(dst) + require.NoError(t, err) + require.True(t, n > 0) + + copyBuffer := bytes.NewBuffer(nil) + + w.Reset() + w.Append("tenant", "foo", labels.FromStrings("container", "foo", "namespace", "dev"), []*push.Entry{ + {Timestamp: time.Unix(0, 0), Line: "Entry 1"}, + {Timestamp: time.Unix(1, 0), Line: "Entry 2"}, + {Timestamp: time.Unix(2, 0), Line: "Entry 3"}, + }) + + n, err = w.WriteTo(copyBuffer) + require.NoError(t, err) + require.True(t, n > 0) + + require.Equal(t, dst.Bytes(), copyBuffer.Bytes()) +} + +func BenchmarkWrites(b *testing.B) { + files := testdata.Files() + lbls := []labels.Labels{} + generators := []*testdata.LogGenerator{} + + for _, file := range files { + lbls = append(lbls, labels.FromStrings("filename", file, "namespace", "dev")) + lbls = append(lbls, labels.FromStrings("filename", file, "namespace", "prod")) + g := testdata.NewLogGenerator(b, file) + generators = append(generators, g, g) + } + inputSize := int64(0) + data := []struct { + tenant string + labels string + lbls labels.Labels + entries []*push.Entry + }{} + for inputSize < 5<<20 { + for i, lbl := range lbls { + more, line := generators[i].Next() + if !more { + continue + } + inputSize += int64(len(line)) + data = append(data, struct { + tenant string + labels string + lbls labels.Labels + entries []*push.Entry + }{ + tenant: "tenant", + labels: lbl.String(), + lbls: lbl, + entries: []*push.Entry{ + {Timestamp: time.Unix(0, int64(i*1e9)), Line: string(line)}, + }, + }) + + } + } + + dst := bytes.NewBuffer(make([]byte, 0, inputSize)) + + pool := sync.Pool{ + New: func() interface{} { + writer, err := NewWalSegmentWriter() + if err != nil { + panic(err) + } + return writer + }, + } + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + writer := pool.Get().(*SegmentWriter) + + dst.Reset() + writer.Reset() + + for _, d := range data { + writer.Append(d.tenant, d.labels, d.lbls, d.entries) + } + n, err := writer.WriteTo(dst) + require.NoError(b, err) + require.True(b, n > 0) + pool.Put(writer) + } +}