Skip to content

Commit

Permalink
[TraceQL] Multiple &&ed or ||ed should return unique spans (#2254)
Browse files Browse the repository at this point in the history
* added uniquespans bench and test

Signed-off-by: Joe Elliott <number101010@gmail.com>

* more tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* sort spans slice

Signed-off-by: Joe Elliott <number101010@gmail.com>

* changelog

Signed-off-by: Joe Elliott <number101010@gmail.com>

* clarify

Signed-off-by: Joe Elliott <number101010@gmail.com>

---------

Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott authored Mar 23, 2023
1 parent 060f8be commit 0ed2362
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* [BUGFIX] Fix not closing WAL block file before attempting to delete the folder. [#2139](https://github.com/grafana/tempo/pull/2152) (@kostya9)
* [BUGFIX] Stop searching for virtual tags if there are any hits.
This prevents invalid values from showing up for intrinsics like `status` [#2219](https://github.com/grafana/tempo/pull/2152) (@joe-elliott)
* [BUGFIX] Correctly return unique spans when &&ing and ||ing spansets. [#2254](https://github.com/grafana/tempo/pull/2254) (@joe-elliott)

## v2.0.1 / 2023-03-03

Expand Down
53 changes: 42 additions & 11 deletions pkg/traceql/ast_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,6 @@ import (
"regexp"
)

func appendSpans(buffer []Span, input []*Spanset) []Span {
for _, i := range input {
buffer = append(buffer, i.Spans...)
}
return buffer
}

func (o SpansetOperation) evaluate(input []*Spanset) (output []*Spanset, err error) {

for i := range input {
Expand All @@ -33,16 +26,14 @@ func (o SpansetOperation) evaluate(input []*Spanset) (output []*Spanset, err err
case OpSpansetAnd:
if len(lhs) > 0 && len(rhs) > 0 {
matchingSpanset := input[i].clone()
matchingSpanset.Spans = appendSpans(nil, lhs)
matchingSpanset.Spans = appendSpans(matchingSpanset.Spans, rhs)
matchingSpanset.Spans = uniqueSpans(lhs, rhs)
output = append(output, matchingSpanset)
}

case OpSpansetUnion:
if len(lhs) > 0 || len(rhs) > 0 {
matchingSpanset := input[i].clone()
matchingSpanset.Spans = appendSpans(nil, lhs)
matchingSpanset.Spans = appendSpans(matchingSpanset.Spans, rhs)
matchingSpanset.Spans = uniqueSpans(lhs, rhs)
output = append(output, matchingSpanset)
}

Expand Down Expand Up @@ -274,3 +265,43 @@ func (a Attribute) execute(span Span) (Static, error) {

return NewStaticNil(), nil
}

func uniqueSpans(ss1 []*Spanset, ss2 []*Spanset) []Span {
ss1Count := 0
ss2Count := 0

for _, ss1 := range ss1 {
ss1Count += len(ss1.Spans)
}
for _, ss2 := range ss2 {
ss2Count += len(ss2.Spans)
}
output := make([]Span, 0, ss1Count+ss2Count)

ssSmaller := ss2
ssLarger := ss1
if ss1Count < ss2Count {
ssSmaller = ss1
ssLarger = ss2
}

// make the map with ssSmaller
spans := map[Span]struct{}{}
for _, ss := range ssSmaller {
for _, span := range ss.Spans {
spans[span] = struct{}{}
output = append(output, span)
}
}

// only add the spans from ssLarger that aren't in the map
for _, ss := range ssLarger {
for _, span := range ss.Spans {
if _, ok := spans[span]; !ok {
output = append(output, span)
}
}
}

return output
}
57 changes: 55 additions & 2 deletions pkg/traceql/ast_execute_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package traceql

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -181,8 +182,8 @@ func TestSpansetOperationEvaluate(t *testing.T) {
},
[]*Spanset{
{Spans: []Span{
&mockSpan{id: []byte{1}, attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticString("a")}},
&mockSpan{id: []byte{2}, attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticString("b")}},
&mockSpan{id: []byte{1}, attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticString("a")}},
}},
},
},
Expand All @@ -201,14 +202,40 @@ func TestSpansetOperationEvaluate(t *testing.T) {
},
[]*Spanset{
{Spans: []Span{
&mockSpan{id: []byte{1}, attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticString("a")}},
&mockSpan{id: []byte{2}, attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticString("b")}},
&mockSpan{id: []byte{1}, attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticString("a")}},
}},
{Spans: []Span{
&mockSpan{id: []byte{3}, attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticString("b")}},
}},
},
},
{
"{ true } && { true } && { true }",
[]*Spanset{
{Spans: []Span{
&mockSpan{id: []byte{1}, attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticString("a")}},
}},
},
[]*Spanset{
{Spans: []Span{
&mockSpan{id: []byte{1}, attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticString("a")}},
}},
},
},
{
"{ true } || { true } || { true }",
[]*Spanset{
{Spans: []Span{
&mockSpan{id: []byte{1}, attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticString("a")}},
}},
},
[]*Spanset{
{Spans: []Span{
&mockSpan{id: []byte{1}, attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticString("a")}},
}},
},
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -676,3 +703,29 @@ func BenchmarkBinOp(b *testing.B) {
})
}
}

// BenchmarkUniquespans benchmarks the performance of the uniqueSpans function using
// different numbers of spansets and spans.
func BenchmarkUniqueSpans(b *testing.B) {
sizes := []int{1, 10, 100, 1000, 10000}

for _, lhs := range sizes {
for i := len(sizes) - 1; i >= 0; i-- {
rhs := sizes[i]
b.Run(fmt.Sprintf("%d|%d", rhs, lhs), func(b *testing.B) {
lhsSpansets := []*Spanset{{Spans: make([]Span, lhs)}}
rhsSpansets := []*Spanset{{Spans: make([]Span, rhs)}}
for j := 0; j < lhs; j++ {
lhsSpansets[0].Spans[j] = &mockSpan{id: []byte{byte(j)}}
}
for j := 0; j < rhs; j++ {
rhsSpansets[0].Spans[j] = &mockSpan{id: []byte{byte(j)}}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
uniqueSpans(lhsSpansets, rhsSpansets)
}
})
}
}
}
7 changes: 7 additions & 0 deletions tempodb/encoding/vparquet/block_traceql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math"
"reflect"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -295,6 +296,12 @@ func (i *spansetIterator) Next() (*span, error) {
}
}

// spans returned from the filter are not guaranteed to be in file order
// we need them to be so that the meta iterators work correctly. sort here
sort.Slice(i.currentSpans, func(j, k int) bool {
return parquetquery.CompareRowNumbers(DefinitionLevelResourceSpans, i.currentSpans[j].rowNum, i.currentSpans[k].rowNum) == -1
})

// found something!
if len(i.currentSpans) > 0 {
ret := i.currentSpans[0]
Expand Down
2 changes: 2 additions & 0 deletions tempodb/tempodb_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ func testAdvancedTraceQLCompleteBlock(t *testing.T, blockVersion string) {
{Query: fmt.Sprintf("{} | count() != %d", totalSpans+1)},
{Query: fmt.Sprintf("{} | count() <= %d", totalSpans)},
{Query: fmt.Sprintf("{} | count() >= %d", totalSpans)},
{Query: fmt.Sprintf("{ true } && { true } | count() = %d", totalSpans)},
{Query: fmt.Sprintf("{ true } || { true } | count() = %d", totalSpans)},
// avgs
{Query: "{ } | avg(duration) > 0"}, // todo: make this better
}
Expand Down

0 comments on commit 0ed2362

Please sign in to comment.