From 77a5b8ed1a959a099ae7a6587db9ada4944d60bc Mon Sep 17 00:00:00 2001 From: Zach Leslie Date: Fri, 18 Oct 2024 20:47:58 +0000 Subject: [PATCH 1/3] Pool zstd encoding/decoding --- tempodb/backend/compression.go | 45 ++++++++++++++++++++++++++++++++++ tempodb/backend/tenantindex.go | 33 ++++++------------------- 2 files changed, 52 insertions(+), 26 deletions(-) create mode 100644 tempodb/backend/compression.go diff --git a/tempodb/backend/compression.go b/tempodb/backend/compression.go new file mode 100644 index 00000000000..dac5085dd5f --- /dev/null +++ b/tempodb/backend/compression.go @@ -0,0 +1,45 @@ +package backend + +import ( + "sync" + + "github.com/klauspost/compress/zstd" +) + +var _ Codec = (*ZstdCodec)(nil) + +type Codec interface { + Encode([]byte, []byte) ([]byte, error) + Decode([]byte) ([]byte, error) +} + +type ZstdCodec struct { + encoders sync.Pool // *zstd.Encoder + decoders sync.Pool // *zstd.Decoder +} + +func (c *ZstdCodec) Encode(src, dst []byte) ([]byte, error) { + e, _ := c.encoders.Get().(*zstd.Encoder) + if e == nil { + var err error + e, err = zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)) + if err != nil { + return nil, err + } + } + defer c.encoders.Put(e) + return e.EncodeAll(src, dst), nil +} + +func (c *ZstdCodec) Decode(buf []byte) ([]byte, error) { + d, _ := c.decoders.Get().(*zstd.Decoder) + if d == nil { + var err error + d, err = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) + if err != nil { + return nil, err + } + } + defer c.decoders.Put(d) + return d.DecodeAll(buf, nil) +} diff --git a/tempodb/backend/tenantindex.go b/tempodb/backend/tenantindex.go index dcebcde1acc..a11f1f04f48 100644 --- a/tempodb/backend/tenantindex.go +++ b/tempodb/backend/tenantindex.go @@ -8,14 +8,16 @@ import ( proto "github.com/gogo/protobuf/proto" "github.com/klauspost/compress/gzip" - "github.com/klauspost/compress/zstd" ) const ( internalFilename = "index.json" ) -var _ proto.Message = (*TenantIndex)(nil) +var ( + _ proto.Message = (*TenantIndex)(nil) + Zstd = &ZstdCodec{} +) func newTenantIndex(meta []*BlockMeta, compactedMeta []*CompactedBlockMeta) *TenantIndex { return &TenantIndex{ @@ -63,38 +65,17 @@ func (b *TenantIndex) unmarshal(buffer []byte) error { } func (b *TenantIndex) marshalPb() ([]byte, error) { - buffer := &bytes.Buffer{} - - z, err := zstd.NewWriter(buffer) - if err != nil { - return nil, err - } - pbBytes, err := proto.Marshal(b) if err != nil { return nil, err } - if _, err = z.Write(pbBytes); err != nil { - return nil, err - } - if err = z.Flush(); err != nil { - return nil, err - } - if err = z.Close(); err != nil { - return nil, err - } - - return buffer.Bytes(), nil + buffer := []byte{} + return Zstd.Encode(pbBytes, buffer) } func (b *TenantIndex) unmarshalPb(buffer []byte) error { - decoder, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) - if err != nil { - return fmt.Errorf("error creating zstd decoder: %w", err) - } - - bb, err := decoder.DecodeAll(buffer, nil) + bb, err := Zstd.Decode(buffer) if err != nil { return fmt.Errorf("error decoding zstd: %w", err) } From a41401923b99bcb993db4e099ca17a82a59006c4 Mon Sep 17 00:00:00 2001 From: Zach Leslie Date: Fri, 18 Oct 2024 20:53:38 +0000 Subject: [PATCH 2/3] Include benchmark --- tempodb/backend/tenantindex_benchmark_test.go | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/tempodb/backend/tenantindex_benchmark_test.go b/tempodb/backend/tenantindex_benchmark_test.go index 1a48d82caa5..aeb2eff6a99 100644 --- a/tempodb/backend/tenantindex_benchmark_test.go +++ b/tempodb/backend/tenantindex_benchmark_test.go @@ -85,3 +85,44 @@ func BenchmarkIndexUnmarshal(b *testing.B) { _ = unIdx.unmarshal(buf) } } + +func BenchmarkIndexUnmarshalPb(b *testing.B) { + idx := &TenantIndex{ + Meta: []*BlockMeta{ + NewBlockMeta("test", uuid.New(), "v1", EncGZIP, "adsf"), + NewBlockMeta("test", uuid.New(), "v2", EncNone, "adsf"), + NewBlockMeta("test", uuid.New(), "v3", EncLZ4_4M, "adsf"), + }, + CompactedMeta: []*CompactedBlockMeta{ + { + BlockMeta: *NewBlockMeta("test", uuid.New(), "v1", EncGZIP, "adsf"), + CompactedTime: time.Now(), + }, + { + BlockMeta: *NewBlockMeta("test", uuid.New(), "v1", EncZstd, "adsf"), + CompactedTime: time.Now(), + }, + { + BlockMeta: *NewBlockMeta("test", uuid.New(), "v1", EncSnappy, "adsf"), + CompactedTime: time.Now(), + }, + }, + } + + for i := range idx.Meta { + idx.Meta[i].DedicatedColumns = DedicatedColumns{ + {Scope: "resource", Name: "namespace", Type: "string"}, + {Scope: "span", Name: "http.method", Type: "string"}, + {Scope: "span", Name: "namespace", Type: "string"}, + } + } + + buf, err := idx.marshalPb() + require.NoError(b, err) + + unIdx := &TenantIndex{} + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = unIdx.unmarshalPb(buf) + } +} From 365dcc3ba507032167a59c7bc2dd5744ea1965a1 Mon Sep 17 00:00:00 2001 From: Zach Leslie Date: Mon, 21 Oct 2024 13:34:14 +0000 Subject: [PATCH 3/3] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ed04b3de2a..4221d358f0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ * [ENHANCEMENT] Support exporting internal Tempo traces via OTLP exporter when `use_otel_tracer` is enabled. Use the OpenTelemetry SDK environment variables to configure the span exporter. [#4028](https://github.com/grafana/tempo/pull/4028) (@andreasgerstmayr) * [ENHANCEMENT] TraceQL metrics queries: add min_over_time [#3975](https://github.com/grafana/tempo/pull/3975) (@javiermolinar) * [ENHANCEMENT] Write tenantindex as proto and json with a prefernce for proto [#4072](https://github.com/grafana/tempo/pull/4072) (@zalegrala) +* [ENHANCEMENT] Pool zstd encoding/decoding for tmepodb/backend [#4208](https://github.com/grafana/tempo/pull/4208) (@zalegrala) # v2.6.1