Skip to content

Commit

Permalink
[connector/routing] Add ability to route metrics and traces by reques…
Browse files Browse the repository at this point in the history
…t context (#36143)
  • Loading branch information
djaglowski authored Nov 6, 2024
1 parent 106eda9 commit 07a7ca9
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 85 deletions.
2 changes: 1 addition & 1 deletion .chloggen/routing-connector-by-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ change_type: enhancement
component: routingconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add ability to route logs by request metadata.
note: Add ability to route by request metadata.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [19738]
Expand Down
1 change: 0 additions & 1 deletion connector/routingconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ The following settings are available:
### Limitations

- The `match_once` setting is only supported when using the `resource` context. If any routes use `log` or `request` context, `match_once` must be set to `true`.
- The `request` context is only supported for logs at this time.
- The `request` context requires use of the `condition` setting, and relies on a very limited grammar. Conditions must be in the form of `request["key"] == "value"` or `request["key"] != "value"`. (In the future, this grammar may be expanded to support more complex conditions.)

### Supported [OTTL] functions
Expand Down
12 changes: 6 additions & 6 deletions connector/routingconnector/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ func TestLogsConnectorDetailed(t *testing.T) {
),
},
{
name: "match_resource_then_logs",
name: "mixed/match_resource_then_logs",
cfg: testConfig(
withRoute("resource", isResourceA, idSink0),
withRoute("log", isLogE, idSink1),
Expand All @@ -831,7 +831,7 @@ func TestLogsConnectorDetailed(t *testing.T) {
expectSinkD: plogutiltest.NewLogs("B", "CD", "F"),
},
{
name: "match_logs_then_resource",
name: "mixed/match_logs_then_resource",
cfg: testConfig(
withRoute("log", isLogE, idSink0),
withRoute("resource", isResourceB, idSink1),
Expand All @@ -843,7 +843,7 @@ func TestLogsConnectorDetailed(t *testing.T) {
expectSinkD: plogutiltest.NewLogs("A", "CD", "F"),
},
{
name: "match_resource_then_grpc_request",
name: "mixed/match_resource_then_grpc_request",
cfg: testConfig(
withRoute("resource", isResourceA, idSink0),
withRoute("request", isAcme, idSink1),
Expand All @@ -856,7 +856,7 @@ func TestLogsConnectorDetailed(t *testing.T) {
expectSinkD: plog.Logs{},
},
{
name: "match_logs_then_grpc_request",
name: "mixed/match_logs_then_grpc_request",
cfg: testConfig(
withRoute("log", isLogF, idSink0),
withRoute("request", isAcme, idSink1),
Expand All @@ -869,7 +869,7 @@ func TestLogsConnectorDetailed(t *testing.T) {
expectSinkD: plog.Logs{},
},
{
name: "match_resource_then_http_request",
name: "mixed/match_resource_then_http_request",
cfg: testConfig(
withRoute("resource", isResourceA, idSink0),
withRoute("request", isAcme, idSink1),
Expand All @@ -882,7 +882,7 @@ func TestLogsConnectorDetailed(t *testing.T) {
expectSinkD: plog.Logs{},
},
{
name: "match_logs_then_http_request",
name: "mixed/match_logs_then_http_request",
cfg: testConfig(
withRoute("log", isLogF, idSink0),
withRoute("request", isAcme, idSink1),
Expand Down
5 changes: 5 additions & 0 deletions connector/routingconnector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func (c *metricsConnector) switchMetrics(ctx context.Context, md pmetric.Metrics
route := c.router.routeSlice[i]
matchedMetrics := pmetric.NewMetrics()
switch route.statementContext {
case "request":
if route.requestCondition.matchRequest(ctx) {
groupAllMetrics(groups, route.consumer, md)
md = pmetric.NewMetrics() // all metrics have been routed
}
case "", "resource":
pmetricutil.MoveResourcesIf(md, matchedMetrics,
func(rs pmetric.ResourceMetrics) bool {
Expand Down
218 changes: 171 additions & 47 deletions connector/routingconnector/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,114 +503,233 @@ func TestMetricsConnectorDetailed(t *testing.T) {
idSink1 := pipeline.NewIDWithName(pipeline.SignalMetrics, "1")
idSinkD := pipeline.NewIDWithName(pipeline.SignalMetrics, "default")

isNotNil := `attributes["resourceName"] != nil`
isA := `attributes["resourceName"] == "resourceA"`
isB := `attributes["resourceName"] == "resourceB"`
isX := `attributes["resourceName"] == "resourceX"`
isY := `attributes["resourceName"] == "resourceY"`
isAcme := `request["X-Tenant"] == "acme"`

isAnyResource := `attributes["resourceName"] != nil`
isResourceA := `attributes["resourceName"] == "resourceA"`
isResourceB := `attributes["resourceName"] == "resourceB"`
isResourceX := `attributes["resourceName"] == "resourceX"`
isResourceY := `attributes["resourceName"] == "resourceY"`

testCases := []struct {
name string
cfg *Config
ctx context.Context
input pmetric.Metrics
expectSink0 pmetric.Metrics
expectSink1 pmetric.Metrics
expectSinkD pmetric.Metrics
}{
{
name: "all_match_first_only",
name: "request/no_request_values",
cfg: testConfig(
withRoute("resource", isNotNil, idSink0),
withRoute("resource", isY, idSink1),
withRoute("request", isAcme, idSink0),
withDefault(idSinkD),
),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"),
expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"),
ctx: context.Background(),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetric.Metrics{},
expectSink1: pmetric.Metrics{},
expectSinkD: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
},
{
name: "request/match_any_value",
cfg: testConfig(
withRoute("request", isAcme, idSink0),
withDefault(idSinkD),
),
ctx: withGRPCMetadata(
withHTTPMetadata(
context.Background(),
map[string][]string{"X-Tenant": {"acme"}},
),
map[string]string{"X-Tenant": "notacme"},
),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink1: pmetric.Metrics{},
expectSinkD: pmetric.Metrics{},
},
{
name: "all_match_last_only",
name: "request/match_grpc_value",
cfg: testConfig(
withRoute("resource", isX, idSink0),
withRoute("resource", isNotNil, idSink1),
withRoute("request", isAcme, idSink0),
withDefault(idSinkD),
),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"),
ctx: withGRPCMetadata(context.Background(), map[string]string{"X-Tenant": "acme"}),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink1: pmetric.Metrics{},
expectSinkD: pmetric.Metrics{},
},
{
name: "request/match_no_grpc_value",
cfg: testConfig(
withRoute("request", isAcme, idSink0),
withDefault(idSinkD),
),
ctx: withGRPCMetadata(context.Background(), map[string]string{"X-Tenant": "notacme"}),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetric.Metrics{},
expectSink1: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"),
expectSink1: pmetric.Metrics{},
expectSinkD: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
},
{
name: "request/match_http_value",
cfg: testConfig(
withRoute("request", isAcme, idSink0),
withDefault(idSinkD),
),
ctx: withHTTPMetadata(context.Background(), map[string][]string{"X-Tenant": {"acme"}}),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink1: pmetric.Metrics{},
expectSinkD: pmetric.Metrics{},
},
{
name: "all_match_only_once",
name: "request/match_http_value2",
cfg: testConfig(
withRoute("resource", isNotNil, idSink0),
withRoute("resource", isA+" or "+isB, idSink1),
withRoute("request", isAcme, idSink0),
withDefault(idSinkD),
),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"),
expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"),
ctx: withHTTPMetadata(context.Background(), map[string][]string{"X-Tenant": {"notacme", "acme"}}),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink1: pmetric.Metrics{},
expectSinkD: pmetric.Metrics{},
},
{
name: "each_matches_one",
name: "request/match_no_http_value",
cfg: testConfig(
withRoute("request", isAcme, idSink0),
withDefault(idSinkD),
),
ctx: withHTTPMetadata(context.Background(), map[string][]string{"X-Tenant": {"notacme"}}),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetric.Metrics{},
expectSink1: pmetric.Metrics{},
expectSinkD: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
},
{
name: "resource/all_match_first_only",
cfg: testConfig(
withRoute("resource", isA, idSink0),
withRoute("resource", isB, idSink1),
withRoute("resource", isAnyResource, idSink0),
withRoute("resource", isResourceY, idSink1),
withDefault(idSinkD),
),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"),
expectSink0: pmetricutiltest.NewMetrics("A", "CD", "EF", "FG"),
expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "FG"),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink1: pmetric.Metrics{},
expectSinkD: pmetric.Metrics{},
},
{
name: "some_match_with_default",
name: "resource/all_match_last_only",
cfg: testConfig(
withRoute("resource", isX, idSink0),
withRoute("resource", isB, idSink1),
withRoute("resource", isResourceX, idSink0),
withRoute("resource", isAnyResource, idSink1),
withDefault(idSinkD),
),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetric.Metrics{},
expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "FG"),
expectSinkD: pmetricutiltest.NewMetrics("A", "CD", "EF", "FG"),
expectSink1: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSinkD: pmetric.Metrics{},
},
{
name: "some_match_without_default",
name: "resource/all_match_only_once",
cfg: testConfig(
withRoute("resource", isX, idSink0),
withRoute("resource", isB, idSink1),
withRoute("resource", isAnyResource, idSink0),
withRoute("resource", isResourceA+" or "+isResourceB, idSink1),
withDefault(idSinkD),
),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink1: pmetric.Metrics{},
expectSinkD: pmetric.Metrics{},
},
{
name: "resource/each_matches_one",
cfg: testConfig(
withRoute("resource", isResourceA, idSink0),
withRoute("resource", isResourceB, idSink1),
withDefault(idSinkD),
),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetricutiltest.NewMetrics("A", "CD", "EF", "GH"),
expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"),
expectSinkD: pmetric.Metrics{},
},
{
name: "resource/some_match_with_default",
cfg: testConfig(
withRoute("resource", isResourceX, idSink0),
withRoute("resource", isResourceB, idSink1),
withDefault(idSinkD),
),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetric.Metrics{},
expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"),
expectSinkD: pmetricutiltest.NewMetrics("A", "CD", "EF", "GH"),
},
{
name: "resource/some_match_without_default",
cfg: testConfig(
withRoute("resource", isResourceX, idSink0),
withRoute("resource", isResourceB, idSink1),
),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetric.Metrics{},
expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "FG"),
expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"),
expectSinkD: pmetric.Metrics{},
},
{
name: "match_none_with_default",
name: "resource/match_none_with_default",
cfg: testConfig(
withRoute("resource", isX, idSink0),
withRoute("resource", isY, idSink1),
withRoute("resource", isResourceX, idSink0),
withRoute("resource", isResourceY, idSink1),
withDefault(idSinkD),
),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetric.Metrics{},
expectSink1: pmetric.Metrics{},
expectSinkD: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"),
expectSinkD: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
},
{
name: "match_none_without_default",
name: "resource/match_none_without_default",
cfg: testConfig(
withRoute("resource", isX, idSink0),
withRoute("resource", isY, idSink1),
withRoute("resource", isResourceX, idSink0),
withRoute("resource", isResourceY, idSink1),
),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "FG"),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetric.Metrics{},
expectSink1: pmetric.Metrics{},
expectSinkD: pmetric.Metrics{},
},
{
name: "mixed/match_resource_then_grpc_request",
cfg: testConfig(
withRoute("resource", isResourceA, idSink0),
withRoute("request", isAcme, idSink1),
withDefault(idSinkD),
),
ctx: withGRPCMetadata(context.Background(), map[string]string{"X-Tenant": "acme"}),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetricutiltest.NewMetrics("A", "CD", "EF", "GH"),
expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"),
expectSinkD: pmetric.Metrics{},
},
{
name: "mixed/match_resource_then_http_request",
cfg: testConfig(
withRoute("resource", isResourceA, idSink0),
withRoute("request", isAcme, idSink1),
withDefault(idSinkD),
),
ctx: withHTTPMetadata(context.Background(), map[string][]string{"X-Tenant": {"acme"}}),
input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectSink0: pmetricutiltest.NewMetrics("A", "CD", "EF", "GH"),
expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"),
expectSinkD: pmetric.Metrics{},
},
}

for _, tt := range testCases {
Expand All @@ -630,7 +749,12 @@ func TestMetricsConnectorDetailed(t *testing.T) {
)
require.NoError(t, err)

require.NoError(t, conn.ConsumeMetrics(context.Background(), tt.input))
ctx := context.Background()
if tt.ctx != nil {
ctx = tt.ctx
}

require.NoError(t, conn.ConsumeMetrics(ctx, tt.input))

assertExpected := func(sink *consumertest.MetricsSink, expected pmetric.Metrics, name string) {
if expected == (pmetric.Metrics{}) {
Expand Down
5 changes: 5 additions & 0 deletions connector/routingconnector/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func (c *tracesConnector) switchTraces(ctx context.Context, td ptrace.Traces) er
route := c.router.routeSlice[i]
matchedSpans := ptrace.NewTraces()
switch route.statementContext {
case "request":
if route.requestCondition.matchRequest(ctx) {
groupAllTraces(groups, route.consumer, td)
td = ptrace.NewTraces() // all traces have been routed
}
case "", "resource":
ptraceutil.MoveResourcesIf(td, matchedSpans,
func(rs ptrace.ResourceSpans) bool {
Expand Down
Loading

0 comments on commit 07a7ca9

Please sign in to comment.