Skip to content

Commit

Permalink
Tag name filtering (#3822)
Browse files Browse the repository at this point in the history
* tag name filtering added in the block

Signed-off-by: Joe Elliott <number101010@gmail.com>

* add well known and dedicated cols

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Support for scopes at FetchTags

Signed-off-by: Joe Elliott <number101010@gmail.com>

* conditionally add selectAs in the trace level iterator

Signed-off-by: Joe Elliott <number101010@gmail.com>

* add support for intrinsics

Signed-off-by: Joe Elliott <number101010@gmail.com>

* add scope

Signed-off-by: Joe Elliott <number101010@gmail.com>

* all but vp4

Signed-off-by: Joe Elliott <number101010@gmail.com>

* all todos

Signed-off-by: Joe Elliott <number101010@gmail.com>

* vp4

Signed-off-by: Joe Elliott <number101010@gmail.com>

* pipe through

Signed-off-by: Joe Elliott <number101010@gmail.com>

* add to request

Signed-off-by: Joe Elliott <number101010@gmail.com>

* parse query

Signed-off-by: Joe Elliott <number101010@gmail.com>

* tempodb tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* docs

Signed-off-by: Joe Elliott <number101010@gmail.com>

* integration

Signed-off-by: Joe Elliott <number101010@gmail.com>

* changelog

Signed-off-by: Joe Elliott <number101010@gmail.com>

* vendor check

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint

Signed-off-by: Joe Elliott <number101010@gmail.com>

* remove hardcoded scopes

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Update docs/sources/tempo/api_docs/_index.md

Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com>

* review

Signed-off-by: Joe Elliott <number101010@gmail.com>

---------

Signed-off-by: Joe Elliott <number101010@gmail.com>
Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com>
  • Loading branch information
joe-elliott and knylander-grafana authored Jul 10, 2024
1 parent cfacd81 commit 89e6789
Show file tree
Hide file tree
Showing 28 changed files with 2,094 additions and 336 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [FEATURE] TraecQL support for event attributes [#3708](https://github.com/grafana/tempo/pull/3748) (@ie-pham)
* [FEATURE] Flush and query RF1 blocks for TraceQL metric queries [#3628](https://github.com/grafana/tempo/pull/3628) [#3691](https://github.com/grafana/tempo/pull/3691) [#3723](https://github.com/grafana/tempo/pull/3723) (@mapno)
* [FEATURE] Add new compare() metrics function [#3695](https://github.com/grafana/tempo/pull/3695) (@mdisibio)
* [FEATURE] Add a `q` parameter to `/api/v2/serach/tags` for tag name filtering [#3822](https://github.com/grafana/tempo/pull/3822) (@joe-elliott)
* [ENHANCEMENT] Tag value lookup use protobuf internally for improved latency [#3731](https://github.com/grafana/tempo/pull/3731) (@mdisibio)
* [ENHANCEMENT] TraceQL metrics queries use protobuf internally for improved latency [#3745](https://github.com/grafana/tempo/pull/3745) (@mdisibio)
* [ENHANCEMENT] Add local disk caching of metrics queries in local-blocks processor [#3799](https://github.com/grafana/tempo/pull/3799) (@mdisibio)
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ Parameters:
- `scope = (resource|span|intrinsic)`
Specifies the scope of the tags, this is an optional parameter, if not specified it means all scopes.
Default = `all`
- `q = (traceql query)`
Optional. A TraceQL query to filter tag names by. Currently only works for a single spanset of `&&`ed conditions. For example: `{ span.foo = "bar" && resource.baz = "bat" ...}`. See also [Filtered tag values](#filtered-tag-values).
- `start = (unix epoch seconds)`
Optional. Along with `end` define a time range from which tags should be returned.
- `end = (unix epoch seconds)`
Expand Down
339 changes: 337 additions & 2 deletions integration/e2e/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net/http"
"net/url"
"slices"
"sort"
"strconv"
"testing"
Expand All @@ -24,6 +25,249 @@ const (
resourceX = "resource.xx"
)

func TestSearchTagsV2(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()

require.NoError(t, util.CopyFileToSharedDir(s, configAllInOneLocal, "config.yaml"))
tempo := util.NewTempoAllInOne()
require.NoError(t, s.StartAndWaitReady(tempo))

jaegerClient, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, jaegerClient)

type batchTmpl struct {
spanCount int
name string
resourceAttVal, spanAttVal string
resourceAttr, SpanAttr string
}

firstBatch := batchTmpl{spanCount: 2, name: "foo", resourceAttVal: "bar", spanAttVal: "bar", resourceAttr: "firstRes", SpanAttr: "firstSpan"}
secondBatch := batchTmpl{spanCount: 2, name: "baz", resourceAttVal: "qux", spanAttVal: "qux", resourceAttr: "secondRes", SpanAttr: "secondSpan"}

batch := makeThriftBatchWithSpanCountAttributeAndName(firstBatch.spanCount, firstBatch.name, firstBatch.resourceAttVal, firstBatch.spanAttVal, firstBatch.resourceAttr, firstBatch.SpanAttr)
require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch))

batch = makeThriftBatchWithSpanCountAttributeAndName(secondBatch.spanCount, secondBatch.name, secondBatch.resourceAttVal, secondBatch.spanAttVal, secondBatch.resourceAttr, secondBatch.SpanAttr)
require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch))

// Wait for the traces to be written to the WAL
time.Sleep(time.Second * 3)

testCases := []struct {
name string
query string
scope string
expected tempopb.SearchTagsV2Response
}{
{
name: "no filtering",
query: "",
scope: "none",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{firstBatch.SpanAttr, secondBatch.SpanAttr},
},
{
Name: "resource",
Tags: []string{firstBatch.resourceAttr, secondBatch.resourceAttr, "service.name"},
},
},
},
},
{
name: "first batch - resource",
query: fmt.Sprintf(`{ name="%s" }`, firstBatch.name),
scope: "resource",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "resource",
Tags: []string{firstBatch.resourceAttr, "service.name"},
},
},
},
},
{
name: "second batch with incomplete query - span",
query: fmt.Sprintf(`{ name="%s" && span.x = }`, secondBatch.name),
scope: "span",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{secondBatch.SpanAttr},
},
},
},
},
{
name: "first batch - resource att - span",
query: fmt.Sprintf(`{ resource.%s="%s" }`, firstBatch.resourceAttr, firstBatch.resourceAttVal),
scope: "span",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{firstBatch.SpanAttr},
},
},
},
},
{
name: "first batch - resource att - resource",
query: fmt.Sprintf(`{ resource.%s="%s" }`, firstBatch.resourceAttr, firstBatch.resourceAttVal),
scope: "resource",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "resource",
Tags: []string{firstBatch.resourceAttr, "service.name"},
},
},
},
},
{
name: "second batch - resource attribute - span",
query: fmt.Sprintf(`{ resource.%s="%s" }`, secondBatch.resourceAttr, secondBatch.resourceAttVal),
scope: "span",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{secondBatch.SpanAttr},
},
},
},
},
{
name: "too restrictive query",
query: fmt.Sprintf(`{ resource.%s="%s" && resource.y="%s" }`, firstBatch.resourceAttr, firstBatch.resourceAttVal, secondBatch.resourceAttVal),
scope: "none",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "resource",
Tags: []string{"service.name"}, // well known column so included
},
},
},
},
// Unscoped not supported, unfiltered results.
{
name: "unscoped span attribute",
query: fmt.Sprintf(`{ .x="%s" }`, firstBatch.spanAttVal),
scope: "none",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{firstBatch.SpanAttr, secondBatch.SpanAttr},
},
{
Name: "resource",
Tags: []string{firstBatch.resourceAttr, secondBatch.resourceAttr, "service.name"},
},
},
},
},
{
name: "unscoped res attribute",
query: fmt.Sprintf(`{ .xx="%s" }`, firstBatch.resourceAttVal),
scope: "none",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{firstBatch.SpanAttr, secondBatch.SpanAttr},
},
{
Name: "resource",
Tags: []string{firstBatch.resourceAttr, secondBatch.resourceAttr, "service.name"},
},
},
},
},
{
name: "both batches - name and resource attribute",
query: `{ resource.service.name="my-service"}`,
scope: "none",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{firstBatch.SpanAttr, secondBatch.SpanAttr},
},
{
Name: "resource",
Tags: []string{firstBatch.resourceAttr, secondBatch.resourceAttr, "service.name"},
},
},
},
},
{
name: "bad query - unfiltered results",
query: fmt.Sprintf("%s = bar", spanX), // bad query, missing quotes
scope: "none",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{firstBatch.SpanAttr, secondBatch.SpanAttr},
},
{
Name: "resource",
Tags: []string{firstBatch.resourceAttr, secondBatch.resourceAttr, "service.name"},
},
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
callSearchTagsV2AndAssert(t, tempo, tc.scope, tc.query, tc.expected, 0, 0)
})
}

// Wait to block flushed to backend, 20 seconds is the complete_block_timeout configuration on all in one, we add
// 2s for security.
callFlush(t, tempo)
time.Sleep(time.Second * 22)
callFlush(t, tempo)

// test metrics
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total"))
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics))
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_cleared_total"))

// Assert no more on the ingester
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
callSearchTagsV2AndAssert(t, tempo, tc.scope, tc.query, tempopb.SearchTagsV2Response{}, 0, 0)
})
}

// Wait to blocklist_poll to be completed
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics))

// Assert tags on storage backend
now := time.Now()
start := now.Add(-2 * time.Hour)
end := now.Add(2 * time.Hour)

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
callSearchTagsV2AndAssert(t, tempo, tc.scope, tc.query, tc.expected, start.Unix(), end.Unix())
})
}
}

func TestSearchTagValuesV2(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
Expand All @@ -46,10 +290,10 @@ func TestSearchTagValuesV2(t *testing.T) {
firstBatch := batchTmpl{spanCount: 2, name: "foo", resourceAttVal: "bar", spanAttVal: "bar"}
secondBatch := batchTmpl{spanCount: 2, name: "baz", resourceAttVal: "qux", spanAttVal: "qux"}

batch := makeThriftBatchWithSpanCountAttributeAndName(firstBatch.spanCount, firstBatch.name, firstBatch.resourceAttVal, firstBatch.spanAttVal)
batch := makeThriftBatchWithSpanCountAttributeAndName(firstBatch.spanCount, firstBatch.name, firstBatch.resourceAttVal, firstBatch.spanAttVal, "xx", "x")
require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch))

batch = makeThriftBatchWithSpanCountAttributeAndName(secondBatch.spanCount, secondBatch.name, secondBatch.resourceAttVal, secondBatch.spanAttVal)
batch = makeThriftBatchWithSpanCountAttributeAndName(secondBatch.spanCount, secondBatch.name, secondBatch.resourceAttVal, secondBatch.spanAttVal, "xx", "x")
require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch))

// Wait for the traces to be written to the WAL
Expand Down Expand Up @@ -345,6 +589,97 @@ func callSearchTagValuesV2AndAssert(t *testing.T, svc *e2e.HTTPService, tagName,
require.Equal(t, expected, actualGrpcResp)
}

func callSearchTagsV2AndAssert(t *testing.T, svc *e2e.HTTPService, scope, query string, expected tempopb.SearchTagsV2Response, start, end int64) {
urlPath := fmt.Sprintf(`/api/v2/search/tags?scope=%s&q=%s`, scope, url.QueryEscape(query))

// expected will not have the intrinsic scope since it's the same every time, add it here.
if scope == "none" || scope == "" || scope == "intrinsic" {
expected.Scopes = append(expected.Scopes, &tempopb.SearchTagsV2Scope{
Name: "intrinsic",
Tags: []string{"duration", "event:name", "kind", "name", "rootName", "rootServiceName", "span:duration", "span:kind", "span:name", "span:status", "span:statusMessage", "status", "statusMessage", "trace:duration", "trace:rootName", "trace:rootService", "traceDuration"},
})
}
sort.Slice(expected.Scopes, func(i, j int) bool { return expected.Scopes[i].Name < expected.Scopes[j].Name })
for _, scope := range expected.Scopes {
slices.Sort(scope.Tags)
}

// search for tag values
req, err := http.NewRequest(http.MethodGet, "http://"+svc.Endpoint(3200)+urlPath, nil)
require.NoError(t, err)

q := req.URL.Query()

if start != 0 {
q.Set("start", strconv.Itoa(int(start)))
}

if end != 0 {
q.Set("end", strconv.Itoa(int(end)))
}

req.URL.RawQuery = q.Encode()

res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

// read body and print it
body, err := io.ReadAll(res.Body)
require.NoError(t, err)
defer res.Body.Close()

// parse response
var response tempopb.SearchTagsV2Response
require.NoError(t, json.Unmarshal(body, &response))

prepTagsResponse(&response)
require.Equal(t, expected, response)

// streaming
grpcReq := &tempopb.SearchTagsRequest{
Scope: scope,
Query: query,
Start: uint32(start),
End: uint32(end),
}

grpcClient, err := util.NewSearchGRPCClient(context.Background(), svc.Endpoint(3200))
require.NoError(t, err)

respTagsValuesV2, err := grpcClient.SearchTagsV2(context.Background(), grpcReq)
require.NoError(t, err)
var grpcResp *tempopb.SearchTagsV2Response
for {
resp, err := respTagsValuesV2.Recv()
if resp != nil {
grpcResp = resp
}
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
}
require.NotNil(t, grpcResp)

prepTagsResponse(&response)
require.Equal(t, expected, response)
}

func prepTagsResponse(resp *tempopb.SearchTagsV2Response) {
if len(resp.Scopes) == 0 {
resp.Scopes = nil
}
sort.Slice(resp.Scopes, func(i, j int) bool { return resp.Scopes[i].Name < resp.Scopes[j].Name })
for _, scope := range resp.Scopes {
if len(scope.Tags) == 0 {
scope.Tags = nil
}

slices.Sort(scope.Tags)
}
}

func callSearchTagsAndAssert(t *testing.T, svc *e2e.HTTPService, expected searchTagsResponse, start, end int64) {
urlPath := "/api/search/tags"
// search for tag values
Expand Down
Loading

0 comments on commit 89e6789

Please sign in to comment.