diff --git a/connector/routingconnector/logs.go b/connector/routingconnector/logs.go index 95012465aab1..dca421b74f9d 100644 --- a/connector/routingconnector/logs.go +++ b/connector/routingconnector/logs.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/plogutil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" ) @@ -60,6 +61,49 @@ func (c *logsConnector) Capabilities() consumer.Capabilities { } func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + if c.config.MatchOnce { + return c.switchLogs(ctx, ld) + } + return c.matchAll(ctx, ld) +} + +// switchLogs removes items from the original plog.Logs as they are matched, +// and sends them to the appropriate consumer. +func (c *logsConnector) switchLogs(ctx context.Context, ld plog.Logs) error { + groups := make(map[consumer.Logs]plog.Logs) + var errs error + for _, route := range c.router.routeSlice { + matchedLogs := plog.NewLogs() + + plogutil.MoveResourcesIf(ld, matchedLogs, + func(rl plog.ResourceLogs) bool { + rtx := ottlresource.NewTransformContext(rl.Resource(), rl) + _, isMatch, err := route.statement.Execute(ctx, rtx) + errs = errors.Join(errs, err) + return isMatch + }, + ) + + if errs != nil { + if c.config.ErrorMode == ottl.PropagateError { + return errs + } + groupAll(groups, c.router.defaultConsumer, matchedLogs) + + } + groupAll(groups, route.consumer, matchedLogs) + } + + // anything left wasn't matched by any route. Send to default consumer + groupAll(groups, c.router.defaultConsumer, ld) + + for consumer, group := range groups { + errs = errors.Join(errs, consumer.ConsumeLogs(ctx, group)) + } + return errs +} + +func (c *logsConnector) matchAll(ctx context.Context, ld plog.Logs) error { // routingEntry is used to group plog.ResourceLogs that are routed to // the same set of exporters. // This way we're not ending up with all the logs split up which would cause @@ -78,22 +122,19 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error { if c.config.ErrorMode == ottl.PropagateError { return err } - c.group(groups, c.router.defaultConsumer, rlogs) + group(groups, c.router.defaultConsumer, rlogs) continue } if isMatch { noRoutesMatch = false - c.group(groups, route.consumer, rlogs) - if c.config.MatchOnce { - break - } + group(groups, route.consumer, rlogs) } } if noRoutesMatch { // no route conditions are matched, add resource logs to default exporters group - c.group(groups, c.router.defaultConsumer, rlogs) + group(groups, c.router.defaultConsumer, rlogs) } } for consumer, group := range groups { @@ -102,18 +143,28 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return errs } -func (c *logsConnector) group( +func groupAll( + groups map[consumer.Logs]plog.Logs, + cons consumer.Logs, + logs plog.Logs, +) { + for i := 0; i < logs.ResourceLogs().Len(); i++ { + group(groups, cons, logs.ResourceLogs().At(i)) + } +} + +func group( groups map[consumer.Logs]plog.Logs, - consumer consumer.Logs, + cons consumer.Logs, logs plog.ResourceLogs, ) { - if consumer == nil { + if cons == nil { return } - group, ok := groups[consumer] + group, ok := groups[cons] if !ok { group = plog.NewLogs() } logs.CopyTo(group.ResourceLogs().AppendEmpty()) - groups[consumer] = group + groups[cons] = group }