Skip to content

Commit

Permalink
[sumconnector] implement summing logic (open-telemetry#34797)
Browse files Browse the repository at this point in the history
**Description:** 
- Adds connector and summing logic
- Adds testing of traces and spans summing

Note: testing and test data makes up the bulk of the lines in this PR

**Link to tracking Issue:** 32669

**Testing:** 
- condition, attribute, default attribute, multiple condition, multiple
attributes, multiple metrics for traces / spans telemetry types

**Documentation:** No new docs

---------

Co-authored-by: Antoine Toulme <antoine@toulme.name>
  • Loading branch information
2 people authored and jriguera committed Oct 4, 2024
1 parent ac9623a commit 3ca682d
Show file tree
Hide file tree
Showing 15 changed files with 2,375 additions and 6 deletions.
27 changes: 27 additions & 0 deletions .chloggen/32669-sumconnector-sum-logic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: 'sumconnector'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "adds connector and summing logic along with tests"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32669]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
12 changes: 6 additions & 6 deletions connector/sumconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ The `sum` connector can be used to sum attribute values from spans, span events,

If you are not already familiar with connectors, you may find it helpful to first visit the [Connectors README](https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md).

### Configuration

#### Basic configuration
### Basic configuration

This configuration will sum numerical values found within the attribute `attribute.with.numerical.value` of any log telemetry routed to the connector. It will then output a metric time series with the name `my.example.metric.name` with those summed values.

Note: Values found within an attribute will be converted into a float regardless of their original type before being summed and output as a metric value. Non-convertible strings will be dropped and not included.

```yaml
receivers:
foo:
Expand All @@ -58,15 +58,15 @@ service:
The sum connector has three required configuration settings and numerous optional settings
- Telemetry type: Nested below the `sum:` connector declaration. Declared as `logs:` in the [Basic Example](#basic-configuration).
- Telemetry type: Nested below the `sum:` connector declaration. Declared as `logs:` in the [Basic Example](#basic-configuration).
- Can be any of `spans`, `spanevents`, `metrics`, `datapoints`, or `logs`.
- Metric name: Nested below the telemetry type; this is the metric name the sum connector will output summed values to. Declared as `my.example.metric.name` in the [Basic Example](#basic-configuration)
- `source_attribute`: A specific attribute to search for within the source telemetry being fed to the connector. This attribute is where the connector will look for numerical values to sum into the output metric value. Declared as `attribute.with.numerical.value` in the [Basic Example](#basic-configuration)

#### Optional Settings

- `conditions`: [OTTL syntax](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/LANGUAGE.md) can be used to provide conditions for processing incoming telemetry. Conditions are ORed together, so if any condition is met the attribute's value will be included in the resulting sum.
- `attributes`: Declaration of attributes to include. Any of these attributes found will generate a separate sum for each set of unique combination of attribute values and output as its own datapoint in the metric time series.
- `conditions`: [OTTL syntax](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/LANGUAGE.md) can be used to provide conditions for processing incoming telemetry. Conditions are ORed together, so if any condition is met the attribute's value will be included in the resulting sum.
- `attributes`: Declaration of attributes to include. Any of these attributes found will generate a separate sum for each set of unique combination of attribute values and output as its own datapoint in the metric time series.
- `key`: (required for `attributes`) the attribute name to match against
- `default_value`: (optional for `attributes`) a default value for the attribute when no matches are found. The `default_value` value can be of type string, integer, or float.

Expand Down
139 changes: 139 additions & 0 deletions connector/sumconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package sumconnector // import "github.com/open-telemetry/opentelemetry-collecto

import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -38,22 +41,158 @@ func (c *sum) Capabilities() consumer.Capabilities {
}

func (c *sum) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
var multiError error
sumMetrics := pmetric.NewMetrics()
sumMetrics.ResourceMetrics().EnsureCapacity(td.ResourceSpans().Len())
for i := 0; i < td.ResourceSpans().Len(); i++ {
resourceSpan := td.ResourceSpans().At(i)
spansSummer := newSummer[ottlspan.TransformContext](c.spansMetricDefs)
spanEventsSummer := newSummer[ottlspanevent.TransformContext](c.spanEventsMetricDefs)

for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ {
scopeSpan := resourceSpan.ScopeSpans().At(j)

for k := 0; k < scopeSpan.Spans().Len(); k++ {
span := scopeSpan.Spans().At(k)
sCtx := ottlspan.NewTransformContext(span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan)
multiError = errors.Join(multiError, spansSummer.update(ctx, span.Attributes(), sCtx))

for l := 0; l < span.Events().Len(); l++ {
event := span.Events().At(l)
eCtx := ottlspanevent.NewTransformContext(event, span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan)
multiError = errors.Join(multiError, spanEventsSummer.update(ctx, event.Attributes(), eCtx))
}
}
}

if len(spansSummer.sums)+len(spanEventsSummer.sums) == 0 {
continue // don't add an empty resource
}

sumResource := sumMetrics.ResourceMetrics().AppendEmpty()
resourceSpan.Resource().Attributes().CopyTo(sumResource.Resource().Attributes())

sumResource.ScopeMetrics().EnsureCapacity(resourceSpan.ScopeSpans().Len())
sumScope := sumResource.ScopeMetrics().AppendEmpty()

spansSummer.appendMetricsTo(sumScope.Metrics())
spanEventsSummer.appendMetricsTo(sumScope.Metrics())
}
if multiError != nil {
return multiError
}
return c.metricsConsumer.ConsumeMetrics(ctx, sumMetrics)
}

func (c *sum) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
var multiError error
sumMetrics := pmetric.NewMetrics()
sumMetrics.ResourceMetrics().EnsureCapacity(md.ResourceMetrics().Len())
for i := 0; i < md.ResourceMetrics().Len(); i++ {
resourceMetric := md.ResourceMetrics().At(i)
metricsSummer := newSummer[ottlmetric.TransformContext](c.metricsMetricDefs)
dataPointsSummer := newSummer[ottldatapoint.TransformContext](c.dataPointsMetricDefs)

for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
scopeMetrics := resourceMetric.ScopeMetrics().At(j)

for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
metric := scopeMetrics.Metrics().At(k)
mCtx := ottlmetric.NewTransformContext(metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, metricsSummer.update(ctx, pcommon.NewMap(), mCtx))

//exhaustive:enforce
// For metric types each must be handled in exactly the same way
// Switch case required because each type calls DataPoints() differently
switch metric.Type() {
case pmetric.MetricTypeGauge:
dps := metric.Gauge().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
}
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
}
case pmetric.MetricTypeSummary:
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
}
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
}
case pmetric.MetricTypeExponentialHistogram:
dps := metric.ExponentialHistogram().DataPoints()
for i := 0; i < dps.Len(); i++ {
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
multiError = errors.Join(multiError, dataPointsSummer.update(ctx, dps.At(i).Attributes(), dCtx))
}
case pmetric.MetricTypeEmpty:
multiError = errors.Join(multiError, fmt.Errorf("metric %q: invalid metric type: %v", metric.Name(), metric.Type()))
}
}
}

if len(metricsSummer.sums)+len(dataPointsSummer.sums) == 0 {
continue // don't add an empty resource
}

sumResource := sumMetrics.ResourceMetrics().AppendEmpty()
resourceMetric.Resource().Attributes().CopyTo(sumResource.Resource().Attributes())

sumResource.ScopeMetrics().EnsureCapacity(resourceMetric.ScopeMetrics().Len())
sumScope := sumResource.ScopeMetrics().AppendEmpty()

metricsSummer.appendMetricsTo(sumScope.Metrics())
dataPointsSummer.appendMetricsTo(sumScope.Metrics())
}
if multiError != nil {
return multiError
}
return c.metricsConsumer.ConsumeMetrics(ctx, sumMetrics)
}

func (c *sum) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
var multiError error
sumMetrics := pmetric.NewMetrics()
sumMetrics.ResourceMetrics().EnsureCapacity(ld.ResourceLogs().Len())
for i := 0; i < ld.ResourceLogs().Len(); i++ {
resourceLog := ld.ResourceLogs().At(i)
summer := newSummer[ottllog.TransformContext](c.logsMetricDefs)

for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
scopeLogs := resourceLog.ScopeLogs().At(j)

for k := 0; k < scopeLogs.LogRecords().Len(); k++ {
logRecord := scopeLogs.LogRecords().At(k)

lCtx := ottllog.NewTransformContext(logRecord, scopeLogs.Scope(), resourceLog.Resource(), scopeLogs, resourceLog)
multiError = errors.Join(multiError, summer.update(ctx, logRecord.Attributes(), lCtx))
}
}

if len(summer.sums) == 0 {
continue // don't add an empty resource
}

sumResource := sumMetrics.ResourceMetrics().AppendEmpty()
resourceLog.Resource().Attributes().CopyTo(sumResource.Resource().Attributes())

sumResource.ScopeMetrics().EnsureCapacity(resourceLog.ScopeLogs().Len())
sumScope := sumResource.ScopeMetrics().AppendEmpty()

summer.appendMetricsTo(sumScope.Metrics())
}
if multiError != nil {
return multiError
}
return c.metricsConsumer.ConsumeMetrics(ctx, sumMetrics)
}
Loading

0 comments on commit 3ca682d

Please sign in to comment.