Skip to content

Commit

Permalink
[chore] Refactor log switching (open-telemetry#35984)
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Oct 24, 2024
1 parent 7c7b7c8 commit eb48ed7
Showing 1 changed file with 62 additions and 11 deletions.
73 changes: 62 additions & 11 deletions connector/routingconnector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/plogutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource"
)
Expand Down Expand Up @@ -60,6 +61,49 @@ func (c *logsConnector) Capabilities() consumer.Capabilities {
}

func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
if c.config.MatchOnce {
return c.switchLogs(ctx, ld)
}
return c.matchAll(ctx, ld)
}

// switchLogs removes items from the original plog.Logs as they are matched,
// and sends them to the appropriate consumer.
func (c *logsConnector) switchLogs(ctx context.Context, ld plog.Logs) error {
groups := make(map[consumer.Logs]plog.Logs)
var errs error
for _, route := range c.router.routeSlice {
matchedLogs := plog.NewLogs()

plogutil.MoveResourcesIf(ld, matchedLogs,
func(rl plog.ResourceLogs) bool {
rtx := ottlresource.NewTransformContext(rl.Resource(), rl)
_, isMatch, err := route.statement.Execute(ctx, rtx)
errs = errors.Join(errs, err)
return isMatch
},
)

if errs != nil {
if c.config.ErrorMode == ottl.PropagateError {
return errs
}
groupAll(groups, c.router.defaultConsumer, matchedLogs)

}
groupAll(groups, route.consumer, matchedLogs)
}

// anything left wasn't matched by any route. Send to default consumer
groupAll(groups, c.router.defaultConsumer, ld)

for consumer, group := range groups {
errs = errors.Join(errs, consumer.ConsumeLogs(ctx, group))
}
return errs
}

func (c *logsConnector) matchAll(ctx context.Context, ld plog.Logs) error {
// routingEntry is used to group plog.ResourceLogs that are routed to
// the same set of exporters.
// This way we're not ending up with all the logs split up which would cause
Expand All @@ -78,22 +122,19 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
if c.config.ErrorMode == ottl.PropagateError {
return err
}
c.group(groups, c.router.defaultConsumer, rlogs)
group(groups, c.router.defaultConsumer, rlogs)
continue
}
if isMatch {
noRoutesMatch = false
c.group(groups, route.consumer, rlogs)
if c.config.MatchOnce {
break
}
group(groups, route.consumer, rlogs)
}

}

if noRoutesMatch {
// no route conditions are matched, add resource logs to default exporters group
c.group(groups, c.router.defaultConsumer, rlogs)
group(groups, c.router.defaultConsumer, rlogs)
}
}
for consumer, group := range groups {
Expand All @@ -102,18 +143,28 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return errs
}

func (c *logsConnector) group(
func groupAll(
groups map[consumer.Logs]plog.Logs,
cons consumer.Logs,
logs plog.Logs,
) {
for i := 0; i < logs.ResourceLogs().Len(); i++ {
group(groups, cons, logs.ResourceLogs().At(i))
}
}

func group(
groups map[consumer.Logs]plog.Logs,
consumer consumer.Logs,
cons consumer.Logs,
logs plog.ResourceLogs,
) {
if consumer == nil {
if cons == nil {
return
}
group, ok := groups[consumer]
group, ok := groups[cons]
if !ok {
group = plog.NewLogs()
}
logs.CopyTo(group.ResourceLogs().AppendEmpty())
groups[consumer] = group
groups[cons] = group
}

0 comments on commit eb48ed7

Please sign in to comment.