From d79a638b85b1d73c8063e8fa291bff96c4373e47 Mon Sep 17 00:00:00 2001 From: Vocanno2 <1162013672@qq.com> Date: Fri, 25 Jan 2019 01:15:40 +0800 Subject: [PATCH] Fix duplicated spans when querying Elasticsearch Signed-off-by: Pavol Loffay --- plugin/storage/es/spanstore/reader.go | 5 +- plugin/storage/es/spanstore/reader_test.go | 76 ++++++++++++++++++++++ 2 files changed, 78 insertions(+), 3 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 41ca50f9054..0c631eb4fc1 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -308,9 +308,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st if len(traceIDs) == 0 { return []*model.Trace{}, nil } - searchRequests := make([]*elastic.SearchRequest, len(traceIDs)) - var traces []*model.Trace // Add an hour in both directions so that traces that straddle two indexes are retrieved. // i.e starts in one and ends in another. indices := s.timeRangeIndices(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour)) @@ -323,7 +321,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st if len(traceIDs) == 0 { break } - + searchRequests := make([]*elastic.SearchRequest, len(traceIDs)) for i, traceID := range traceIDs { query := elastic.NewTermQuery("traceID", traceID.String()) if val, ok := searchAfterTime[traceID]; ok { @@ -375,6 +373,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st } } + var traces []*model.Trace for _, trace := range tracesMap { traces = append(traces, trace) } diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 14d96896f62..e2aeec27c12 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -205,6 +205,82 @@ func TestSpanReader_GetTrace(t *testing.T) { }) } +func TestSpanReader_multiRead_followUp_query(t *testing.T) { + withSpanReader(func(r *spanReaderTest) { + now := time.Now() + spanID1 := dbmodel.Span{SpanID: "0", TraceID:"1", StartTime: model.TimeAsEpochMicroseconds(now)} + spanBytesID1, err := json.Marshal(spanID1) + require.NoError(t, err) + spanID2 := dbmodel.Span{SpanID: "0", TraceID:"2", StartTime: model.TimeAsEpochMicroseconds(now)} + spanBytesID2, err := json.Marshal(spanID2) + require.NoError(t, err) + + id1Query := elastic.NewTermQuery("traceID", model.TraceID{High: 0, Low:1}.String()) + id1Search := elastic.NewSearchRequest(). + IgnoreUnavailable(true). + Type(spanType). + Source(r.reader.sourceFn(id1Query, model.TimeAsEpochMicroseconds(now.Add(-time.Hour)))) + id2Query := elastic.NewTermQuery("traceID", model.TraceID{High: 0, Low:2}.String()) + id2Search := elastic.NewSearchRequest(). + IgnoreUnavailable(true). + Type(spanType). + Source(r.reader.sourceFn(id2Query, model.TimeAsEpochMicroseconds(now.Add(-time.Hour)))) + id1SearchSpanTime := elastic.NewSearchRequest(). + IgnoreUnavailable(true). + Type(spanType). + Source(r.reader.sourceFn(id1Query, spanID1.StartTime)) + + multiSearchService := &mocks.MultiSearchService{} + firstMultiSearch := &mocks.MultiSearchService{} + secondMultiSearch := &mocks.MultiSearchService{} + multiSearchService.On("Add", id1Search, id2Search).Return(firstMultiSearch) + multiSearchService.On("Add", id1SearchSpanTime).Return(secondMultiSearch) + + firstMultiSearch.On("Index", mock.AnythingOfType("string")).Return(firstMultiSearch) + secondMultiSearch.On("Index", mock.AnythingOfType("string")).Return(secondMultiSearch) + r.client.On("MultiSearch").Return(multiSearchService) + + fistMultiSearchMock := firstMultiSearch.On("Do", mock.AnythingOfType("*context.emptyCtx")) + secondMultiSearchMock := secondMultiSearch.On("Do", mock.AnythingOfType("*context.emptyCtx")) + + // set TotalHits to two to trigger the follow up query + // the client will return only one span therefore the implementation + // triggers follow up query for the same traceID with the timestamp of the last span + searchHitsID1 := &elastic.SearchHits{Hits: []*elastic.SearchHit{ + {Source: (*json.RawMessage)(&spanBytesID1)}, + }, TotalHits: 2} + fistMultiSearchMock. + Return(&elastic.MultiSearchResult{ + Responses: []*elastic.SearchResult{ + {Hits: searchHitsID1}, + }, + }, nil) + + searchHitsID2 := &elastic.SearchHits{Hits: []*elastic.SearchHit{ + {Source: (*json.RawMessage)(&spanBytesID2)}, + }, TotalHits: 1} + secondMultiSearchMock. + Return(&elastic.MultiSearchResult{ + Responses: []*elastic.SearchResult{ + {Hits: searchHitsID2}, + }, + }, nil) + + traces, err := r.reader.multiRead(context.Background(), []model.TraceID{{High:0, Low:1}, {High:0, Low:2}}, now, now) + require.NoError(t, err) + require.NotNil(t, traces) + require.Len(t, traces, 2) + + toDomain := dbmodel.NewToDomain("-") + sModel1, err := toDomain.SpanToDomain(&spanID1) + require.NoError(t, err) + sModel2, err := toDomain.SpanToDomain(&spanID2) + require.NoError(t, err) + assert.EqualValues(t, traces[0].Spans[0], sModel1) + assert.EqualValues(t, traces[1].Spans[0], sModel2) + }) +} + func TestSpanReader_SearchAfter(t *testing.T) { withSpanReader(func(r *spanReaderTest) { var hits []*elastic.SearchHit