diff --git a/CHANGELOG.md b/CHANGELOG.md index 20c88bf4280..9f9faa7b73f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala) * [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio) * [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio) +* [BUGFIX] Fix an issue with WAL replay of zero-length search data when search is disabled. [#968](https://github.com/grafana/tempo/pull/968) (@annanay25) * [ENHANCEMENT] Added "query blocks" cli option. [#876](https://github.com/grafana/tempo/pull/876) (@joe-elliott) * [ENHANCEMENT] Added traceid to `trace too large message`. [#888](https://github.com/grafana/tempo/pull/888) (@mritunjaysharma394) * [ENHANCEMENT] Add support to tempo workloads to `overrides` from single configmap in microservice mode. [#896](https://github.com/grafana/tempo/pull/896) (@kavirajk) diff --git a/modules/distributor/search_data_test.go b/modules/distributor/search_data_test.go index ebb2927ad11..58a9c57c27f 100644 --- a/modules/distributor/search_data_test.go +++ b/modules/distributor/search_data_test.go @@ -14,7 +14,7 @@ import ( ) func TestExtractSearchData(t *testing.T) { - traceIDA := []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} + traceIDA := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} testCases := []struct { name string diff --git a/tempodb/search/backend_search_block_test.go b/tempodb/search/backend_search_block_test.go index 5322c6b5336..2756c45ab60 100644 --- a/tempodb/search/backend_search_block_test.go +++ b/tempodb/search/backend_search_block_test.go @@ -81,6 +81,84 @@ func TestBackendSearchBlockSearch(t *testing.T) { require.Equal(t, traceCount, int(sr.TracesInspected())) } +func TestBackendSearchBlockDedupesWAL(t *testing.T) { + traceCount := 1_000 + + testCases := []struct { + name string + searchDataGenerator func(traceID []byte, i int) [][]byte + searchTags map[string]string + expectedLenResults int + expectedLenInspected int + }{ + { + name: "distinct traces", + searchDataGenerator: genSearchData, + searchTags: map[string]string{"key10": "value_A_10", "key20": "value_B_20"}, + expectedLenResults: 1, + expectedLenInspected: 1, + }, + { + name: "empty traces", + searchDataGenerator: func(traceID []byte, i int) [][]byte { + return [][]byte{} + }, + searchTags: map[string]string{"key10": "value_A_10"}, + expectedLenResults: 0, + expectedLenInspected: 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644) + require.NoError(t, err) + + b1, err := NewStreamingSearchBlockForFile(f) + require.NoError(t, err) + + id := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} + for i := 0; i < traceCount; i++ { + require.NoError(t, b1.Append(context.Background(), id, tc.searchDataGenerator(id, i))) + } + + l, err := local.NewBackend(&local.Config{ + Path: t.TempDir(), + }) + require.NoError(t, err) + + blockID := uuid.New() + tenantID := "fake" + err = NewBackendSearchBlock(b1, l, blockID, tenantID, backend.EncNone, 0) + require.NoError(t, err) + + b2 := OpenBackendSearchBlock(l, blockID, tenantID) + + p := NewSearchPipeline(&tempopb.SearchRequest{ + Tags: tc.searchTags, + }) + + sr := NewResults() + + sr.StartWorker() + go func() { + defer sr.FinishWorker() + err := b2.Search(context.TODO(), p, sr) + require.NoError(t, err) + }() + sr.AllWorkersStarted() + + var results []*tempopb.TraceSearchMetadata + for r := range sr.Results() { + results = append(results, r) + } + require.Equal(t, tc.expectedLenResults, len(results)) + require.Equal(t, tc.expectedLenInspected, int(sr.TracesInspected())) + }) + } + +} + func BenchmarkBackendSearchBlockSearch(b *testing.B) { pageSizesMB := []float32{0.5, 1, 2} diff --git a/tempodb/search/data_combiner.go b/tempodb/search/data_combiner.go index 41387ed0b9d..5b51846837b 100644 --- a/tempodb/search/data_combiner.go +++ b/tempodb/search/data_combiner.go @@ -25,6 +25,10 @@ func (*DataCombiner) Combine(_ string, searchData ...[]byte) ([]byte, bool) { data := tempofb.SearchEntryMutable{} kv := &tempofb.KeyValues{} // buffer for _, sb := range searchData { + // we append zero-length entries to the WAL even when search is disabled. skipping to prevent unmarshalling and panik :) + if len(sb) == 0 { + continue + } sd := tempofb.SearchEntryFromBytes(sb) for i, ii := 0, sd.TagsLength(); i < ii; i++ { sd.Tags(kv, i)