From 07a7ca94b813b4e4d59043a1bbd01f0142f09713 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Tue, 5 Nov 2024 21:12:07 -0500 Subject: [PATCH] [connector/routing] Add ability to route metrics and traces by request context (#36143) --- .chloggen/routing-connector-by-request.yaml | 2 +- connector/routingconnector/README.md | 1 - connector/routingconnector/logs_test.go | 12 +- connector/routingconnector/metrics.go | 5 + connector/routingconnector/metrics_test.go | 218 +++++++++++++++----- connector/routingconnector/traces.go | 5 + connector/routingconnector/traces_test.go | 184 ++++++++++++++--- 7 files changed, 342 insertions(+), 85 deletions(-) diff --git a/.chloggen/routing-connector-by-request.yaml b/.chloggen/routing-connector-by-request.yaml index 77cdfec98d48..0ef130b131ec 100644 --- a/.chloggen/routing-connector-by-request.yaml +++ b/.chloggen/routing-connector-by-request.yaml @@ -7,7 +7,7 @@ change_type: enhancement component: routingconnector # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Add ability to route logs by request metadata. +note: Add ability to route by request metadata. # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [19738] diff --git a/connector/routingconnector/README.md b/connector/routingconnector/README.md index e22814c4c7dd..b4f06f607a6b 100644 --- a/connector/routingconnector/README.md +++ b/connector/routingconnector/README.md @@ -44,7 +44,6 @@ The following settings are available: ### Limitations - The `match_once` setting is only supported when using the `resource` context. If any routes use `log` or `request` context, `match_once` must be set to `true`. -- The `request` context is only supported for logs at this time. - The `request` context requires use of the `condition` setting, and relies on a very limited grammar. Conditions must be in the form of `request["key"] == "value"` or `request["key"] != "value"`. (In the future, this grammar may be expanded to support more complex conditions.) ### Supported [OTTL] functions diff --git a/connector/routingconnector/logs_test.go b/connector/routingconnector/logs_test.go index 2fefc28b4b97..24747154c213 100644 --- a/connector/routingconnector/logs_test.go +++ b/connector/routingconnector/logs_test.go @@ -819,7 +819,7 @@ func TestLogsConnectorDetailed(t *testing.T) { ), }, { - name: "match_resource_then_logs", + name: "mixed/match_resource_then_logs", cfg: testConfig( withRoute("resource", isResourceA, idSink0), withRoute("log", isLogE, idSink1), @@ -831,7 +831,7 @@ func TestLogsConnectorDetailed(t *testing.T) { expectSinkD: plogutiltest.NewLogs("B", "CD", "F"), }, { - name: "match_logs_then_resource", + name: "mixed/match_logs_then_resource", cfg: testConfig( withRoute("log", isLogE, idSink0), withRoute("resource", isResourceB, idSink1), @@ -843,7 +843,7 @@ func TestLogsConnectorDetailed(t *testing.T) { expectSinkD: plogutiltest.NewLogs("A", "CD", "F"), }, { - name: "match_resource_then_grpc_request", + name: "mixed/match_resource_then_grpc_request", cfg: testConfig( withRoute("resource", isResourceA, idSink0), withRoute("request", isAcme, idSink1), @@ -856,7 +856,7 @@ func TestLogsConnectorDetailed(t *testing.T) { expectSinkD: plog.Logs{}, }, { - name: "match_logs_then_grpc_request", + name: "mixed/match_logs_then_grpc_request", cfg: testConfig( withRoute("log", isLogF, idSink0), withRoute("request", isAcme, idSink1), @@ -869,7 +869,7 @@ func TestLogsConnectorDetailed(t *testing.T) { expectSinkD: plog.Logs{}, }, { - name: "match_resource_then_http_request", + name: "mixed/match_resource_then_http_request", cfg: testConfig( withRoute("resource", isResourceA, idSink0), withRoute("request", isAcme, idSink1), @@ -882,7 +882,7 @@ func TestLogsConnectorDetailed(t *testing.T) { expectSinkD: plog.Logs{}, }, { - name: "match_logs_then_http_request", + name: "mixed/match_logs_then_http_request", cfg: testConfig( withRoute("log", isLogF, idSink0), withRoute("request", isAcme, idSink1), diff --git a/connector/routingconnector/metrics.go b/connector/routingconnector/metrics.go index 025a5bc95bb5..8f25c586bf71 100644 --- a/connector/routingconnector/metrics.go +++ b/connector/routingconnector/metrics.go @@ -74,6 +74,11 @@ func (c *metricsConnector) switchMetrics(ctx context.Context, md pmetric.Metrics route := c.router.routeSlice[i] matchedMetrics := pmetric.NewMetrics() switch route.statementContext { + case "request": + if route.requestCondition.matchRequest(ctx) { + groupAllMetrics(groups, route.consumer, md) + md = pmetric.NewMetrics() // all metrics have been routed + } case "", "resource": pmetricutil.MoveResourcesIf(md, matchedMetrics, func(rs pmetric.ResourceMetrics) bool { diff --git a/connector/routingconnector/metrics_test.go b/connector/routingconnector/metrics_test.go index 661d182c7760..0fba4eabc748 100644 --- a/connector/routingconnector/metrics_test.go +++ b/connector/routingconnector/metrics_test.go @@ -503,114 +503,233 @@ func TestMetricsConnectorDetailed(t *testing.T) { idSink1 := pipeline.NewIDWithName(pipeline.SignalMetrics, "1") idSinkD := pipeline.NewIDWithName(pipeline.SignalMetrics, "default") - isNotNil := `attributes["resourceName"] != nil` - isA := `attributes["resourceName"] == "resourceA"` - isB := `attributes["resourceName"] == "resourceB"` - isX := `attributes["resourceName"] == "resourceX"` - isY := `attributes["resourceName"] == "resourceY"` + isAcme := `request["X-Tenant"] == "acme"` + + isAnyResource := `attributes["resourceName"] != nil` + isResourceA := `attributes["resourceName"] == "resourceA"` + isResourceB := `attributes["resourceName"] == "resourceB"` + isResourceX := `attributes["resourceName"] == "resourceX"` + isResourceY := `attributes["resourceName"] == "resourceY"` testCases := []struct { name string cfg *Config + ctx context.Context input pmetric.Metrics expectSink0 pmetric.Metrics expectSink1 pmetric.Metrics expectSinkD pmetric.Metrics }{ { - name: "all_match_first_only", + name: "request/no_request_values", cfg: testConfig( - withRoute("resource", isNotNil, idSink0), - withRoute("resource", isY, idSink1), + withRoute("request", isAcme, idSink0), withDefault(idSinkD), ), - input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"), - expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"), + ctx: context.Background(), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetric.Metrics{}, + expectSink1: pmetric.Metrics{}, + expectSinkD: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + }, + { + name: "request/match_any_value", + cfg: testConfig( + withRoute("request", isAcme, idSink0), + withDefault(idSinkD), + ), + ctx: withGRPCMetadata( + withHTTPMetadata( + context.Background(), + map[string][]string{"X-Tenant": {"acme"}}, + ), + map[string]string{"X-Tenant": "notacme"}, + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), expectSink1: pmetric.Metrics{}, expectSinkD: pmetric.Metrics{}, }, { - name: "all_match_last_only", + name: "request/match_grpc_value", cfg: testConfig( - withRoute("resource", isX, idSink0), - withRoute("resource", isNotNil, idSink1), + withRoute("request", isAcme, idSink0), withDefault(idSinkD), ), - input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"), + ctx: withGRPCMetadata(context.Background(), map[string]string{"X-Tenant": "acme"}), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink1: pmetric.Metrics{}, + expectSinkD: pmetric.Metrics{}, + }, + { + name: "request/match_no_grpc_value", + cfg: testConfig( + withRoute("request", isAcme, idSink0), + withDefault(idSinkD), + ), + ctx: withGRPCMetadata(context.Background(), map[string]string{"X-Tenant": "notacme"}), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), expectSink0: pmetric.Metrics{}, - expectSink1: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"), + expectSink1: pmetric.Metrics{}, + expectSinkD: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + }, + { + name: "request/match_http_value", + cfg: testConfig( + withRoute("request", isAcme, idSink0), + withDefault(idSinkD), + ), + ctx: withHTTPMetadata(context.Background(), map[string][]string{"X-Tenant": {"acme"}}), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink1: pmetric.Metrics{}, expectSinkD: pmetric.Metrics{}, }, { - name: "all_match_only_once", + name: "request/match_http_value2", cfg: testConfig( - withRoute("resource", isNotNil, idSink0), - withRoute("resource", isA+" or "+isB, idSink1), + withRoute("request", isAcme, idSink0), withDefault(idSinkD), ), - input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"), - expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"), + ctx: withHTTPMetadata(context.Background(), map[string][]string{"X-Tenant": {"notacme", "acme"}}), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), expectSink1: pmetric.Metrics{}, expectSinkD: pmetric.Metrics{}, }, { - name: "each_matches_one", + name: "request/match_no_http_value", + cfg: testConfig( + withRoute("request", isAcme, idSink0), + withDefault(idSinkD), + ), + ctx: withHTTPMetadata(context.Background(), map[string][]string{"X-Tenant": {"notacme"}}), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetric.Metrics{}, + expectSink1: pmetric.Metrics{}, + expectSinkD: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + }, + { + name: "resource/all_match_first_only", cfg: testConfig( - withRoute("resource", isA, idSink0), - withRoute("resource", isB, idSink1), + withRoute("resource", isAnyResource, idSink0), + withRoute("resource", isResourceY, idSink1), withDefault(idSinkD), ), - input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"), - expectSink0: pmetricutiltest.NewMetrics("A", "CD", "EF", "FG"), - expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "FG"), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink1: pmetric.Metrics{}, expectSinkD: pmetric.Metrics{}, }, { - name: "some_match_with_default", + name: "resource/all_match_last_only", cfg: testConfig( - withRoute("resource", isX, idSink0), - withRoute("resource", isB, idSink1), + withRoute("resource", isResourceX, idSink0), + withRoute("resource", isAnyResource, idSink1), withDefault(idSinkD), ), - input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), expectSink0: pmetric.Metrics{}, - expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "FG"), - expectSinkD: pmetricutiltest.NewMetrics("A", "CD", "EF", "FG"), + expectSink1: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSinkD: pmetric.Metrics{}, }, { - name: "some_match_without_default", + name: "resource/all_match_only_once", cfg: testConfig( - withRoute("resource", isX, idSink0), - withRoute("resource", isB, idSink1), + withRoute("resource", isAnyResource, idSink0), + withRoute("resource", isResourceA+" or "+isResourceB, idSink1), + withDefault(idSinkD), ), - input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink1: pmetric.Metrics{}, + expectSinkD: pmetric.Metrics{}, + }, + { + name: "resource/each_matches_one", + cfg: testConfig( + withRoute("resource", isResourceA, idSink0), + withRoute("resource", isResourceB, idSink1), + withDefault(idSinkD), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("A", "CD", "EF", "GH"), + expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"), + expectSinkD: pmetric.Metrics{}, + }, + { + name: "resource/some_match_with_default", + cfg: testConfig( + withRoute("resource", isResourceX, idSink0), + withRoute("resource", isResourceB, idSink1), + withDefault(idSinkD), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetric.Metrics{}, + expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"), + expectSinkD: pmetricutiltest.NewMetrics("A", "CD", "EF", "GH"), + }, + { + name: "resource/some_match_without_default", + cfg: testConfig( + withRoute("resource", isResourceX, idSink0), + withRoute("resource", isResourceB, idSink1), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), expectSink0: pmetric.Metrics{}, - expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "FG"), + expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"), expectSinkD: pmetric.Metrics{}, }, { - name: "match_none_with_default", + name: "resource/match_none_with_default", cfg: testConfig( - withRoute("resource", isX, idSink0), - withRoute("resource", isY, idSink1), + withRoute("resource", isResourceX, idSink0), + withRoute("resource", isResourceY, idSink1), withDefault(idSinkD), ), - input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), expectSink0: pmetric.Metrics{}, expectSink1: pmetric.Metrics{}, - expectSinkD: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"), + expectSinkD: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), }, { - name: "match_none_without_default", + name: "resource/match_none_without_default", cfg: testConfig( - withRoute("resource", isX, idSink0), - withRoute("resource", isY, idSink1), + withRoute("resource", isResourceX, idSink0), + withRoute("resource", isResourceY, idSink1), ), - input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), expectSink0: pmetric.Metrics{}, expectSink1: pmetric.Metrics{}, expectSinkD: pmetric.Metrics{}, }, + { + name: "mixed/match_resource_then_grpc_request", + cfg: testConfig( + withRoute("resource", isResourceA, idSink0), + withRoute("request", isAcme, idSink1), + withDefault(idSinkD), + ), + ctx: withGRPCMetadata(context.Background(), map[string]string{"X-Tenant": "acme"}), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("A", "CD", "EF", "GH"), + expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"), + expectSinkD: pmetric.Metrics{}, + }, + { + name: "mixed/match_resource_then_http_request", + cfg: testConfig( + withRoute("resource", isResourceA, idSink0), + withRoute("request", isAcme, idSink1), + withDefault(idSinkD), + ), + ctx: withHTTPMetadata(context.Background(), map[string][]string{"X-Tenant": {"acme"}}), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("A", "CD", "EF", "GH"), + expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"), + expectSinkD: pmetric.Metrics{}, + }, } for _, tt := range testCases { @@ -630,7 +749,12 @@ func TestMetricsConnectorDetailed(t *testing.T) { ) require.NoError(t, err) - require.NoError(t, conn.ConsumeMetrics(context.Background(), tt.input)) + ctx := context.Background() + if tt.ctx != nil { + ctx = tt.ctx + } + + require.NoError(t, conn.ConsumeMetrics(ctx, tt.input)) assertExpected := func(sink *consumertest.MetricsSink, expected pmetric.Metrics, name string) { if expected == (pmetric.Metrics{}) { diff --git a/connector/routingconnector/traces.go b/connector/routingconnector/traces.go index dd5966e6a66d..a82ee85a9973 100644 --- a/connector/routingconnector/traces.go +++ b/connector/routingconnector/traces.go @@ -74,6 +74,11 @@ func (c *tracesConnector) switchTraces(ctx context.Context, td ptrace.Traces) er route := c.router.routeSlice[i] matchedSpans := ptrace.NewTraces() switch route.statementContext { + case "request": + if route.requestCondition.matchRequest(ctx) { + groupAllTraces(groups, route.consumer, td) + td = ptrace.NewTraces() // all traces have been routed + } case "", "resource": ptraceutil.MoveResourcesIf(td, matchedSpans, func(rs ptrace.ResourceSpans) bool { diff --git a/connector/routingconnector/traces_test.go b/connector/routingconnector/traces_test.go index dd6c862d965a..291e8fd230af 100644 --- a/connector/routingconnector/traces_test.go +++ b/connector/routingconnector/traces_test.go @@ -427,25 +427,118 @@ func TestTracesConnectorDetailed(t *testing.T) { idSink1 := pipeline.NewIDWithName(pipeline.SignalTraces, "1") idSinkD := pipeline.NewIDWithName(pipeline.SignalTraces, "default") - isNotNil := `attributes["resourceName"] != nil` - isA := `attributes["resourceName"] == "resourceA"` - isB := `attributes["resourceName"] == "resourceB"` - isX := `attributes["resourceName"] == "resourceX"` - isY := `attributes["resourceName"] == "resourceY"` + isAcme := `request["X-Tenant"] == "acme"` + + isAnyResource := `attributes["resourceName"] != nil` + isResourceA := `attributes["resourceName"] == "resourceA"` + isResourceB := `attributes["resourceName"] == "resourceB"` + isResourceX := `attributes["resourceName"] == "resourceX"` + isResourceY := `attributes["resourceName"] == "resourceY"` testCases := []struct { name string cfg *Config + ctx context.Context input ptrace.Traces expectSink0 ptrace.Traces expectSink1 ptrace.Traces expectSinkD ptrace.Traces }{ { - name: "all_match_first_only", + name: "request/no_request_values", cfg: testConfig( - withRoute("resource", isNotNil, idSink0), - withRoute("resource", isY, idSink1), + withRoute("request", isAcme, idSink0), + withDefault(idSinkD), + ), + ctx: context.Background(), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptrace.Traces{}, + expectSink1: ptrace.Traces{}, + expectSinkD: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + }, + { + name: "request/match_any_value", + cfg: testConfig( + withRoute("request", isAcme, idSink0), + withDefault(idSinkD), + ), + ctx: withGRPCMetadata( + withHTTPMetadata( + context.Background(), + map[string][]string{"X-Tenant": {"acme"}}, + ), + map[string]string{"X-Tenant": "notacme"}, + ), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink1: ptrace.Traces{}, + expectSinkD: ptrace.Traces{}, + }, + { + name: "request/match_grpc_value", + cfg: testConfig( + withRoute("request", isAcme, idSink0), + withDefault(idSinkD), + ), + ctx: withGRPCMetadata(context.Background(), map[string]string{"X-Tenant": "acme"}), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink1: ptrace.Traces{}, + expectSinkD: ptrace.Traces{}, + }, + { + name: "request/match_no_grpc_value", + cfg: testConfig( + withRoute("request", isAcme, idSink0), + withDefault(idSinkD), + ), + ctx: withGRPCMetadata(context.Background(), map[string]string{"X-Tenant": "notacme"}), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptrace.Traces{}, + expectSink1: ptrace.Traces{}, + expectSinkD: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + }, + { + name: "request/match_http_value", + cfg: testConfig( + withRoute("request", isAcme, idSink0), + withDefault(idSinkD), + ), + ctx: withHTTPMetadata(context.Background(), map[string][]string{"X-Tenant": {"acme"}}), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink1: ptrace.Traces{}, + expectSinkD: ptrace.Traces{}, + }, + { + name: "request/match_http_value2", + cfg: testConfig( + withRoute("request", isAcme, idSink0), + withDefault(idSinkD), + ), + ctx: withHTTPMetadata(context.Background(), map[string][]string{"X-Tenant": {"notacme", "acme"}}), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink1: ptrace.Traces{}, + expectSinkD: ptrace.Traces{}, + }, + { + name: "request/match_no_http_value", + cfg: testConfig( + withRoute("request", isAcme, idSink0), + withDefault(idSinkD), + ), + ctx: withHTTPMetadata(context.Background(), map[string][]string{"X-Tenant": {"notacme"}}), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptrace.Traces{}, + expectSink1: ptrace.Traces{}, + expectSinkD: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + }, + { + name: "resource/all_match_first_only", + cfg: testConfig( + withRoute("resource", isAnyResource, idSink0), + withRoute("resource", isResourceY, idSink1), withDefault(idSinkD), ), input: ptraceutiltest.NewTraces("AB", "CD", "EF", "FG"), @@ -454,10 +547,10 @@ func TestTracesConnectorDetailed(t *testing.T) { expectSinkD: ptrace.Traces{}, }, { - name: "all_match_last_only", + name: "resource/all_match_last_only", cfg: testConfig( - withRoute("resource", isX, idSink0), - withRoute("resource", isNotNil, idSink1), + withRoute("resource", isResourceX, idSink0), + withRoute("resource", isAnyResource, idSink1), withDefault(idSinkD), ), input: ptraceutiltest.NewTraces("AB", "CD", "EF", "FG"), @@ -466,10 +559,10 @@ func TestTracesConnectorDetailed(t *testing.T) { expectSinkD: ptrace.Traces{}, }, { - name: "all_match_only_once", + name: "resource/all_match_only_once", cfg: testConfig( - withRoute("resource", isNotNil, idSink0), - withRoute("resource", isA+" or "+isB, idSink1), + withRoute("resource", isAnyResource, idSink0), + withRoute("resource", isResourceA+" or "+isResourceB, idSink1), withDefault(idSinkD), ), input: ptraceutiltest.NewTraces("AB", "CD", "EF", "FG"), @@ -478,10 +571,10 @@ func TestTracesConnectorDetailed(t *testing.T) { expectSinkD: ptrace.Traces{}, }, { - name: "each_matches_one", + name: "resource/each_matches_one", cfg: testConfig( - withRoute("resource", isA, idSink0), - withRoute("resource", isB, idSink1), + withRoute("resource", isResourceA, idSink0), + withRoute("resource", isResourceB, idSink1), withDefault(idSinkD), ), input: ptraceutiltest.NewTraces("AB", "CD", "EF", "FG"), @@ -490,10 +583,10 @@ func TestTracesConnectorDetailed(t *testing.T) { expectSinkD: ptrace.Traces{}, }, { - name: "some_match_with_default", + name: "resource/some_match_with_default", cfg: testConfig( - withRoute("resource", isX, idSink0), - withRoute("resource", isB, idSink1), + withRoute("resource", isResourceX, idSink0), + withRoute("resource", isResourceB, idSink1), withDefault(idSinkD), ), input: ptraceutiltest.NewTraces("AB", "CD", "EF", "FG"), @@ -502,10 +595,10 @@ func TestTracesConnectorDetailed(t *testing.T) { expectSinkD: ptraceutiltest.NewTraces("A", "CD", "EF", "FG"), }, { - name: "some_match_without_default", + name: "resource/some_match_without_default", cfg: testConfig( - withRoute("resource", isX, idSink0), - withRoute("resource", isB, idSink1), + withRoute("resource", isResourceX, idSink0), + withRoute("resource", isResourceB, idSink1), ), input: ptraceutiltest.NewTraces("AB", "CD", "EF", "FG"), expectSink0: ptrace.Traces{}, @@ -513,10 +606,10 @@ func TestTracesConnectorDetailed(t *testing.T) { expectSinkD: ptrace.Traces{}, }, { - name: "match_none_with_default", + name: "resource/match_none_with_default", cfg: testConfig( - withRoute("resource", isX, idSink0), - withRoute("resource", isY, idSink1), + withRoute("resource", isResourceX, idSink0), + withRoute("resource", isResourceY, idSink1), withDefault(idSinkD), ), input: ptraceutiltest.NewTraces("AB", "CD", "EF", "FG"), @@ -525,16 +618,42 @@ func TestTracesConnectorDetailed(t *testing.T) { expectSinkD: ptraceutiltest.NewTraces("AB", "CD", "EF", "FG"), }, { - name: "match_none_without_default", + name: "resource/match_none_without_default", cfg: testConfig( - withRoute("resource", isX, idSink0), - withRoute("resource", isY, idSink1), + withRoute("resource", isResourceX, idSink0), + withRoute("resource", isResourceY, idSink1), ), input: ptraceutiltest.NewTraces("AB", "CD", "EF", "FG"), expectSink0: ptrace.Traces{}, expectSink1: ptrace.Traces{}, expectSinkD: ptrace.Traces{}, }, + { + name: "mixed/match_resource_then_grpc_request", + cfg: testConfig( + withRoute("resource", isResourceA, idSink0), + withRoute("request", isAcme, idSink1), + withDefault(idSinkD), + ), + ctx: withGRPCMetadata(context.Background(), map[string]string{"X-Tenant": "acme"}), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("A", "CD", "EF", "GH"), + expectSink1: ptraceutiltest.NewTraces("B", "CD", "EF", "GH"), + expectSinkD: ptrace.Traces{}, + }, + { + name: "mixed/match_resource_then_http_request", + cfg: testConfig( + withRoute("resource", isResourceA, idSink0), + withRoute("request", isAcme, idSink1), + withDefault(idSinkD), + ), + ctx: withHTTPMetadata(context.Background(), map[string][]string{"X-Tenant": {"acme"}}), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("A", "CD", "EF", "GH"), + expectSink1: ptraceutiltest.NewTraces("B", "CD", "EF", "GH"), + expectSinkD: ptrace.Traces{}, + }, } for _, tt := range testCases { @@ -554,7 +673,12 @@ func TestTracesConnectorDetailed(t *testing.T) { ) require.NoError(t, err) - require.NoError(t, conn.ConsumeTraces(context.Background(), tt.input)) + ctx := context.Background() + if tt.ctx != nil { + ctx = tt.ctx + } + + require.NoError(t, conn.ConsumeTraces(ctx, tt.input)) assertExpected := func(sink *consumertest.TracesSink, expected ptrace.Traces, name string) { if expected == (ptrace.Traces{}) {