Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedupe search records while replaying WAL #940

Merged
merged 9 commits into from
Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* [ENHANCEMENT] Include additional detail when searching for traces [#916](https://github.com/grafana/tempo/pull/916) (@zalegrala)
* [ENHANCEMENT] Add `gen index` and `gen bloom` commands to tempo-cli. [#903](https://github.com/grafana/tempo/pull/903) (@annanay25)
* [ENHANCEMENT] Implement trace comparison in Vulture [#904](https://github.com/grafana/tempo/pull/904) (@zalegrala)
* [ENHANCEMENT] Dedupe search records while replaying WAL [#940](https://github.com/grafana/tempo/pull/940) (@annanay25)
* [CHANGE] Renamed CLI flag from `--storage.trace.maintenance-cycle` to `--storage.trace.blocklist_poll`. This is a **breaking change** [#897](https://github.com/grafana/tempo/pull/897) (@mritunjaysharma394)
* [CHANGE] update jsonnet alerts and recording rules to use `job_selectors` and `cluster_selectors` for configurable unique identifier labels [#935](https://github.com/grafana/tempo/pull/935) (@kevinschoonover)
* [CHANGE] Modify generated tag keys in Vulture for easier filtering [#934](https://github.com/grafana/tempo/pull/934) (@zalegrala)
Expand Down
4 changes: 2 additions & 2 deletions modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func (c *Compactor) Owns(hash string) bool {
}

// Combine implements common.ObjectCombiner
func (c *Compactor) Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool) {
return model.ObjectCombiner.Combine(objA, objB, dataEncoding)
func (c *Compactor) Combine(dataEncoding string, objs ...[]byte) ([]byte, bool) {
return model.ObjectCombiner.Combine(dataEncoding, objs...)
}

// BlockRetentionForTenant implements CompactorOverrides
Expand Down
7 changes: 5 additions & 2 deletions modules/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ func (m *mockSharder) Owns(hash string) bool {
return true
}

func (m *mockSharder) Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool) {
combined, wasCombined, _ := model.CombineTraceBytes(objA, objB, dataEncoding, dataEncoding)
func (m *mockSharder) Combine(dataEncoding string, objs ...[]byte) ([]byte, bool) {
if len(objs) != 2 {
return nil, false
}
combined, wasCombined, _ := model.CombineTraceBytes(objs[0], objs[1], dataEncoding, dataEncoding)
return combined, wasCombined
}

Expand Down
29 changes: 25 additions & 4 deletions pkg/model/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"hash"
"hash/fnv"

"github.com/grafana/tempo/tempodb/encoding/common"

"github.com/go-kit/kit/log/level"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/pkg/errors"
Expand All @@ -18,12 +20,31 @@ type objectCombiner struct{}

var ObjectCombiner = objectCombiner{}

var _ common.ObjectCombiner = (*objectCombiner)(nil)

// Combine implements tempodb/encoding/common.ObjectCombiner
func (o objectCombiner) Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool) {
combinedTrace, wasCombined, err := CombineTraceBytes(objA, objB, dataEncoding, dataEncoding)
if err != nil {
level.Error(log.Logger).Log("msg", "error combining trace protos", "err", err.Error())
func (o objectCombiner) Combine(dataEncoding string, objs ...[]byte) ([]byte, bool) {
if len(objs) <= 0 {
return nil, false
}

if len(objs) == 1 {
return objs[0], false
}

combinedTrace := objs[0]
var wasCombined bool
var err error
for _, obj := range objs[1:] {
// Todo: Find an efficient way to combine all objs in a single step
// However, this is ok for now because Combine() is never called with len(objs) > 2
combinedTrace, wasCombined, err = CombineTraceBytes(combinedTrace, obj, dataEncoding, dataEncoding)
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
level.Error(log.Logger).Log("msg", "error combining trace protos", "err", err.Error())
break
}
}

return combinedTrace, wasCombined
}

Expand Down
4 changes: 2 additions & 2 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ type instrumentedObjectCombiner struct {
}

// Combine wraps the inner combiner with combined metrics
func (i instrumentedObjectCombiner) Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool) {
b, wasCombined := i.inner.Combine(objA, objB, dataEncoding)
func (i instrumentedObjectCombiner) Combine(dataEncoding string, objs ...[]byte) ([]byte, bool) {
b, wasCombined := i.inner.Combine(dataEncoding, objs...)
if wasCombined {
metricCompactionObjectsCombined.WithLabelValues(i.compactionLevelLabel).Inc()
}
Expand Down
18 changes: 11 additions & 7 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,22 @@ func (m *mockSharder) Owns(hash string) bool {
return true
}

type mockJobSharder struct{}

func (m *mockJobSharder) Owns(_ string) bool { return true }
func (m *mockSharder) Combine(dataEncoding string, objs ...[]byte) ([]byte, bool) {
if len(objs) != 2 {
return nil, false
}

func (m *mockSharder) Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool) {
if len(objA) > len(objB) {
return objA, true
if len(objs[0]) > len(objs[1]) {
return objs[0], true
}

return objB, true
return objs[1], true
}

type mockJobSharder struct{}

func (m *mockJobSharder) Owns(_ string) bool { return true }

type mockOverrides struct {
blockRetention time.Duration
}
Expand Down
4 changes: 2 additions & 2 deletions tempodb/encoding/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ type Record struct {

// ObjectCombiner is used to combine two objects in the backend
type ObjectCombiner interface {
// Combine objA and objB encoded using dataEncoding. The returned object must
// Combine objects encoded using dataEncoding. The returned object must
// use the same dataEncoding. Returns a bool indicating if it the objects required combining and
// the combined slice
Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool)
Combine(dataEncoding string, objs ...[]byte) ([]byte, bool)
}

// DataReader returns a slice of pages in the encoding/v0 format referenced by
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/finder_paged.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (f *pagedFinder) Find(ctx context.Context, id common.ID) ([]byte, error) {
break
}

bytesFound, _ = f.combiner.Combine(bytesFound, bytesOne, f.dataEncoding)
bytesFound, _ = f.combiner.Combine(f.dataEncoding, bytesFound, bytesOne)

// we need to check the next record to see if it also matches our id
i++
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/iterator_deduping.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (i *dedupingIterator) Next(ctx context.Context) (common.ID, []byte, error)
}

i.currentID = id
i.currentObject, _ = i.combiner.Combine(i.currentObject, obj, i.dataEncoding)
i.currentObject, _ = i.combiner.Combine(i.dataEncoding, i.currentObject, obj)
}

return dedupedID, dedupedObject, nil
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/iterator_multiblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (i *multiblockIterator) iterate(ctx context.Context) {
comparison := bytes.Compare(currentID, lowestID)

if comparison == 0 {
lowestObject, _ = i.combiner.Combine(currentObject, lowestObject, i.dataEncoding)
lowestObject, _ = i.combiner.Combine(i.dataEncoding, currentObject, lowestObject)
b.clear()
} else if len(lowestID) == 0 || comparison == -1 {
lowestID = currentID
Expand Down
117 changes: 105 additions & 12 deletions tempodb/search/backend_search_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package search
import (
"bytes"
"context"
"io"
"os"

"github.com/google/uuid"
"github.com/pkg/errors"

"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/tempodb/backend"
Expand All @@ -23,6 +26,78 @@ type BackendSearchBlock struct {
l *local.Backend
}

//nolint:golint
type SearchDataCombiner struct{}

func (*SearchDataCombiner) Combine(_ string, searchData ...[]byte) ([]byte, bool) {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved

if len(searchData) <= 0 {
return nil, false
}

if len(searchData) == 1 {
return searchData[0], false
}

// Squash all datas into 1
data := tempofb.SearchEntryMutable{}
kv := &tempofb.KeyValues{} // buffer
for _, sb := range searchData {
sd := tempofb.SearchEntryFromBytes(sb)
for i := 0; i < sd.TagsLength(); i++ {
sd.Tags(kv, i)
for j := 0; j < kv.ValueLength(); j++ {
data.AddTag(string(kv.Key()), string(kv.Value(j)))
}
}
data.SetStartTimeUnixNano(sd.StartTimeUnixNano())
data.SetEndTimeUnixNano(sd.EndTimeUnixNano())
data.TraceID = sd.Id()
}

return data.ToBytes(), true
}

var _ common.ObjectCombiner = (*SearchDataCombiner)(nil)

//nolint:golint
type SearchDataIterator struct {
currentIndex int
records []common.Record
file *os.File

buffer []byte
}

func (s *SearchDataIterator) Next(_ context.Context) (common.ID, []byte, error) {
if s.currentIndex >= len(s.records) {
return nil, nil, io.EOF
}

currentRecord := s.records[s.currentIndex]

// resize/extend buffer
if cap(s.buffer) < int(currentRecord.Length) {
s.buffer = make([]byte, currentRecord.Length)
}
s.buffer = s.buffer[:currentRecord.Length]

_, err := s.file.ReadAt(s.buffer, int64(currentRecord.Start))
if err != nil {
return nil, nil, errors.Wrap(err, "error reading search file")
}

s.currentIndex++

return currentRecord.ID, s.buffer, nil
}

func (*SearchDataIterator) Close() {
// file will be closed by StreamingSearchBlock
}

var _ encoding.Iterator = (*SearchDataIterator)(nil)

// NewBackendSearchBlock iterates through the given WAL search data and writes it to the persistent backend
// in a more efficient paged form. Multiple traces are written in the same page to make sure of the flatbuffer
// CreateSharedString feature which dedupes strings across the entire buffer.
Expand All @@ -42,25 +117,43 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockI
pageSizeBytes = defaultBackendSearchBlockPageSize
}

// Copy records into the appender
w, err := newBackendSearchBlockWriter(blockID, tenantID, l, version, enc)
if err != nil {
return err
}

// set up deduping iterator for streaming search block
combiner := &SearchDataCombiner{}
searchIterator := &SearchDataIterator{
records: input.appender.Records(),
file: input.file,
}
iter, err := encoding.NewDedupingIterator(searchIterator, combiner, "")
if err != nil {
return errors.Wrap(err, "error creating deduping iterator")
}
a := encoding.NewBufferedAppenderGeneric(w, pageSizeBytes)
for _, r := range input.appender.Records() {

// Copy records into the appender
for {

// Read
buf := make([]byte, r.Length)
_, err = input.file.ReadAt(buf, int64(r.Start))
if err != nil {
return err
id, data, err := iter.Next(ctx)
if err != nil && err != io.EOF {
return errors.Wrap(err, "error iterating")
}

if id == nil {
break
}

if len(data) == 0 {
continue
}

s := tempofb.SearchEntryFromBytes(buf)
data := &tempofb.SearchEntryMutable{
TraceID: r.ID,
s := tempofb.SearchEntryFromBytes(data)
entry := &tempofb.SearchEntryMutable{
TraceID: id,
StartTimeUnixNano: s.StartTimeUnixNano(),
EndTimeUnixNano: s.EndTimeUnixNano(),
}
Expand All @@ -69,13 +162,13 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockI
for i := 0; i < l; i++ {
s.Tags(kv, i)
for j := 0; j < kv.ValueLength(); j++ {
data.AddTag(string(kv.Key()), string(kv.Value(j)))
entry.AddTag(string(kv.Key()), string(kv.Value(j)))
}
}

err = a.Append(ctx, r.ID, data)
err = a.Append(ctx, id, entry)
if err != nil {
return err
return errors.Wrap(err, "error appending to backend block")
}
}

Expand Down
27 changes: 14 additions & 13 deletions tempodb/search/backend_search_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path"
"strconv"
"sync"
"testing"
"time"
Expand All @@ -18,23 +19,24 @@ import (
"github.com/stretchr/testify/require"
)

func newBackendSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.Encoding, pageSizeBytes int) *BackendSearchBlock {
id := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} // 16-byte ids required
searchData := [][]byte{(&tempofb.SearchEntryMutable{
func genSearchData(traceID []byte, i int) [][]byte {
return [][]byte{(&tempofb.SearchEntryMutable{
TraceID: traceID,
Tags: tempofb.SearchDataMap{
"key1": {"value10", "value11"},
"key2": {"value20", "value21"},
"key3": {"value30", "value31"},
"key4": {"value40", "value41"},
"key" + strconv.Itoa(i): {"value_A_" + strconv.Itoa(i), "value_B_" + strconv.Itoa(i)},
}}).ToBytes()}
}

func newBackendSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.Encoding, pageSizeBytes int) *BackendSearchBlock {
id := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} // 16-byte ids required

f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644)
require.NoError(t, err)

b1, err := NewStreamingSearchBlockForFile(f)
require.NoError(t, err)
for i := 0; i < traceCount; i++ {
assert.NoError(t, b1.Append(context.Background(), id, searchData))
assert.NoError(t, b1.Append(context.Background(), id, genSearchData(id, i)))
}

l, err := local.NewBackend(&local.Config{
Expand All @@ -52,13 +54,12 @@ func newBackendSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.E
}

func TestBackendSearchBlockSearch(t *testing.T) {
traceCount := 50_000
traceCount := 1_000

b2 := newBackendSearchBlockWithTraces(t, traceCount, backend.EncNone, 0)

// Matches every trace
p := NewSearchPipeline(&tempopb.SearchRequest{
Tags: map[string]string{"key1": "value10"},
Tags: map[string]string{"key1": "value_A_1", "key20": "value_B_20"},
})

sr := NewResults()
Expand All @@ -75,8 +76,8 @@ func TestBackendSearchBlockSearch(t *testing.T) {
for r := range sr.Results() {
results = append(results, r)
}
require.Equal(t, traceCount, len(results))
require.Equal(t, traceCount, int(sr.TracesInspected()))
require.Equal(t, 1, len(results))
require.Equal(t, 1, int(sr.TracesInspected()))
}

func BenchmarkBackendSearchBlockSearch(b *testing.B) {
Expand Down
2 changes: 1 addition & 1 deletion tempodb/search/backend_search_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

// backendSearchBlockWriter is a DataWriter for search data. Instead of receiving bytes slices, it
// receives search data objects and combintes them into a single FlatBuffer Builder and
// receives search data objects and combines them into a single FlatBuffer Builder and
// flushes periodically, one page per flush.
type backendSearchBlockWriter struct {
// input
Expand Down
Loading