Skip to content

Commit

Permalink
Add otelcol.processor.groupbyattrs component (#1300)
Browse files Browse the repository at this point in the history
* Add `otelcol.processor.groupbyattrs` component

* Add groupbyattrs converter

---------

Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com>
Co-authored-by: Paulin Todev <paulin.todev@gmail.com>
  • Loading branch information
3 people authored Aug 27, 2024
1 parent 4494649 commit de79265
Show file tree
Hide file tree
Showing 11 changed files with 537 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Main (unreleased)

- Updated Snowflake exporter with performance improvements for larger environments.
Also added a new panel to track deleted tables to the Snowflake mixin. (@Caleb-Hurshman)
- Add a `otelcol.processor.groupbyattrs` component to reassociate collected metrics that match specified attributes
from opentelemetry. (@kehindesalaam)

- Live debugging of `loki.process` will now also print the timestamp of incoming and outgoing log lines.
This is helpful for debugging `stage.timestamp`. (@ptodev)
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/reference/compatibility/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ The following components, grouped by namespace, _export_ OpenTelemetry `otelcol.
- [otelcol.processor.deltatocumulative](../components/otelcol/otelcol.processor.deltatocumulative)
- [otelcol.processor.discovery](../components/otelcol/otelcol.processor.discovery)
- [otelcol.processor.filter](../components/otelcol/otelcol.processor.filter)
- [otelcol.processor.groupbyattrs](../components/otelcol/otelcol.processor.groupbyattrs)
- [otelcol.processor.k8sattributes](../components/otelcol/otelcol.processor.k8sattributes)
- [otelcol.processor.memory_limiter](../components/otelcol/otelcol.processor.memory_limiter)
- [otelcol.processor.probabilistic_sampler](../components/otelcol/otelcol.processor.probabilistic_sampler)
Expand Down Expand Up @@ -337,6 +338,7 @@ The following components, grouped by namespace, _consume_ OpenTelemetry `otelcol
- [otelcol.processor.deltatocumulative](../components/otelcol/otelcol.processor.deltatocumulative)
- [otelcol.processor.discovery](../components/otelcol/otelcol.processor.discovery)
- [otelcol.processor.filter](../components/otelcol/otelcol.processor.filter)
- [otelcol.processor.groupbyattrs](../components/otelcol/otelcol.processor.groupbyattrs)
- [otelcol.processor.k8sattributes](../components/otelcol/otelcol.processor.k8sattributes)
- [otelcol.processor.memory_limiter](../components/otelcol/otelcol.processor.memory_limiter)
- [otelcol.processor.probabilistic_sampler](../components/otelcol/otelcol.processor.probabilistic_sampler)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
---
canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol/otelcol.processor.groupbyattrs/
description: Learn about otelcol.processor.groupbyattrs
title: otelcol.processor.groupbyattrs
---

# otelcol.processor.groupbyattrs

`otelcol.processor.groupbyattrs` accepts spans, metrics, and traces from other `otelcol`
components and groups them under the same resource.

{{% admonition type="note" %}}
`otelcol.processor.groupbyattrs` is a wrapper over the upstream OpenTelemetry
Collector `groupbyattrs` processor. If necessary, bug reports or feature requests
will be redirected to the upstream repository.
{{% /admonition %}}

We recommend you use the groupbyattrs processor together with [otelcol.processor.batch][], as a consecutive step. This will reduce the fragmentation of data by grouping records together under the matching Resource/Instrumentation Library.

You can specify multiple `otelcol.processor.groupbyattrs` components by giving them
different labels.

## Usage

```alloy
otelcol.processor.groupbyattrs "LABEL" {
output {
metrics = [...]
logs = [...]
traces = [...]
}
}
```

## Arguments

The following arguments are supported:

| Name | Type | Description | Default | Required |
|-----------------|-------------------|-----------------------------------------------------------------------------------------|---------|----------|
| `keys` | `list(string)` | Keys that will be used to group the spans, log records, or metric data points together. | `[]` | no |

`keys` is a string array that is used for grouping the data.
If it is empty, the processor performs compaction and reassociates all spans with matching Resource and InstrumentationLibrary.

## Blocks

The following blocks are supported inside the definition of `otelcol.processor.groupbyattrs`:

Hierarchy | Block | Description | Required
--------- | ----------- | ------------------------------------------------- | --------
output | [output][] | Configures where to send received telemetry data. | yes
debug_metrics | [debug_metrics][] | Configures the metrics that this component generates to monitor its state. | no

[output]: #output-block
[debug_metrics]: #debug_metrics-block

### output block

{{< docs/shared lookup="reference/components/output-block.md" source="alloy" version="<ALLOY_VERSION>" >}}

### debug_metrics block

{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="<ALLOY_VERSION>" >}}

## Exported fields

The following fields are exported and can be referenced by other components:

| Name | Type | Description |
|---------|--------------------|---------------------------------------------------------------|
| `input` | `otelcol.Consumer` | Accepts `otelcol.Consumer` data for metrics, logs, or traces. |

`input` accepts `otelcol.Consumer` data for any telemetry signal (metrics,
logs, or traces).

## Component health

`otelcol.processor.groupbyattrs` is only reported as unhealthy if given an invalid
configuration.

## Debug information

`otelcol.processor.groupbyattrs` doesn't expose any component-specific debug
information.

## Debug metrics

`otelcol.processor.groupbyattrs` doesn't expose any component-specific debug metrics.

## Examples

### Grouping metrics

Consider the following metrics, all originally associated to the same Resource:

```
Resource {host.name="localhost",source="prom"}
Metric "gauge-1" (GAUGE)
DataPoint {host.name="host-A",id="eth0"}
DataPoint {host.name="host-A",id="eth0"}
DataPoint {host.name="host-B",id="eth0"}
Metric "gauge-1" (GAUGE) // Identical to previous Metric
DataPoint {host.name="host-A",id="eth0"}
DataPoint {host.name="host-A",id="eth0"}
DataPoint {host.name="host-B",id="eth0"}
Metric "mixed-type" (GAUGE)
DataPoint {host.name="host-A",id="eth0"}
DataPoint {host.name="host-A",id="eth0"}
DataPoint {host.name="host-B",id="eth0"}
Metric "mixed-type" (SUM)
DataPoint {host.name="host-A",id="eth0"}
DataPoint {host.name="host-A",id="eth0"}
Metric "dont-move" (Gauge)
DataPoint {id="eth0"}
```

With the following configuration, the groupbyattrs will re-associate the metrics with either `host-A` or `host-B`, based on the value of the `host.name` attribute.

```alloy
otelcol.processor.groupbyattrs "default" {
keys = [ "host.name" ]
output {
metrics = [otelcol.exporter.otlp.default.input]
}
}
```

The output of the processor will therefore be:

```
Resource {host.name="localhost",source="prom"}
Metric "dont-move" (Gauge)
DataPoint {id="eth0"}
Resource {host.name="host-A",source="prom"}
Metric "gauge-1"
DataPoint {id="eth0"}
DataPoint {id="eth0"}
DataPoint {id="eth0"}
DataPoint {id="eth0"}
Metric "mixed-type" (GAUGE)
DataPoint {id="eth0"}
DataPoint {id="eth0"}
Metric "mixed-type" (SUM)
DataPoint {id="eth0"}
DataPoint {id="eth0"}
Resource {host.name="host-B",source="prom"}
Metric "gauge-1"
DataPoint {id="eth0"}
DataPoint {id="eth0"}
Metric "mixed-type" (GAUGE)
DataPoint {id="eth0"}
```

This output demonstrates how `otelcol.processor.groupbyattrs` works in various situations:

- The DataPoints for the `gauge-1` (GAUGE) metric were originally split under 2 Metric instances and have been merged in the output.
- The DataPoints of the `mixed-type` (GAUGE) and `mixed-type` (SUM) metrics have not been merged under the same Metric, because their DataType is different.
- The `dont-move` metric DataPoints don't have a `host.name` attribute and therefore remained under the original Resource.
- The new Resources inherited the attributes from the original Resource (`source="prom"`), plus the specified attributes from the processed metrics (`host.name="host-A"` or `host.name="host-B"`).
- The specified "grouping" attributes that are set on the new Resources are also removed from the metric DataPoints.
- While not shown in the above example, the processor also merges collections of records under matching InstrumentationLibrary.

### Compaction

Sometimes telemetry data can become fragmented due to multiple duplicated ResourceSpans/ResourceLogs/ResourceMetrics objects.
This leads to additional memory consumption, increased processing costs, inefficient serialization and increase of the export requests.
In such situations, `otelcol.processor.groupbyattrs` can be used to compact the data with matching Resource and InstrumentationLibrary properties.

For example, consider this input data:

```
Resource {host.name="localhost"}
InstrumentationLibrary {name="MyLibrary"}
Spans
Span {span_id=1, ...}
InstrumentationLibrary {name="OtherLibrary"}
Spans
Span {span_id=2, ...}
Resource {host.name="localhost"}
InstrumentationLibrary {name="MyLibrary"}
Spans
Span {span_id=3, ...}
Resource {host.name="localhost"}
InstrumentationLibrary {name="MyLibrary"}
Spans
Span {span_id=4, ...}
Resource {host.name="otherhost"}
InstrumentationLibrary {name="MyLibrary"}
Spans
Span {span_id=5, ...}
```

You can use `otelcol.processor.groupbyattrs` with its default configuration to compact the data:
```alloy
otelcol.processor.groupbyattrs "default" {
output {
metrics = [otelcol.exporter.otlp.default.input]
}
}
```

The output will be:

```
Resource {host.name="localhost"}
InstrumentationLibrary {name="MyLibrary"}
Spans
Span {span_id=1, ...}
Span {span_id=3, ...}
Span {span_id=4, ...}
InstrumentationLibrary {name="OtherLibrary"}
Spans
Span {span_id=2, ...}
Resource {host.name="otherhost"}
InstrumentationLibrary {name="MyLibrary"}
Spans
Span {span_id=5, ...}
```
[otelcol.processor.batch]: ../otelcol.processor.batch/
<!-- START GENERATED COMPATIBLE COMPONENTS -->

## Compatible components

`otelcol.processor.groupbyattrs` can accept arguments from the following components:

- Components that export [OpenTelemetry `otelcol.Consumer`](../../../compatibility/#opentelemetry-otelcolconsumer-exporters)

`otelcol.processor.groupbyattrs` has exports that can be consumed by the following components:

- Components that consume [OpenTelemetry `otelcol.Consumer`](../../../compatibility/#opentelemetry-otelcolconsumer-consumers)

{{< admonition type="note" >}}
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly.
Refer to the linked documentation for more details.
{{< /admonition >}}

<!-- END GENERATED COMPATIBLE COMPONENTS -->
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.105.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1966,6 +1966,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumul
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.105.0/go.mod h1:PKox+dLnO2bWc1qUN6WZnyHPV0MpWZ10arqGV5v69kI=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.105.0 h1:oRa+acTM4f5rjTT3+hjOVM1LYrlwrm6CSNG4o/RIqcA=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.105.0/go.mod h1:66cZFd4X8vQBTmvm1hPHxrSNHS474iUEsAVbYk9xQBU=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.105.0 h1:OYsGaSC9G7pAVYKTd1+D0f7HTHcxuQfoEHyQy+a1NKk=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.105.0/go.mod h1:WCesGEakYveZYZH4o3cUTLt3UB7JxE+yDiiphRHoJoc=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.105.0 h1:ScIwuYg6l79Ta+deOyZIADXrBlXSdeAZ7sp3MXhm7JY=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.105.0/go.mod h1:pranRmnWRkzDsn9a16BzSqX6HJ6XjjVVFmMhyZPEzt0=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.105.0 h1:mFAlBmDFELQJS8uj1M8csB/vQqjpq6W9/9k9izh9Hr4=
Expand Down
1 change: 1 addition & 0 deletions internal/component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ import (
_ "github.com/grafana/alloy/internal/component/otelcol/processor/deltatocumulative" // Import otelcol.processor.deltatocumulative
_ "github.com/grafana/alloy/internal/component/otelcol/processor/discovery" // Import otelcol.processor.discovery
_ "github.com/grafana/alloy/internal/component/otelcol/processor/filter" // Import otelcol.processor.filter
_ "github.com/grafana/alloy/internal/component/otelcol/processor/groupbyattrs" // Import otelcol.processor.groupbyattrs
_ "github.com/grafana/alloy/internal/component/otelcol/processor/k8sattributes" // Import otelcol.processor.k8sattributes
_ "github.com/grafana/alloy/internal/component/otelcol/processor/memorylimiter" // Import otelcol.processor.memory_limiter
_ "github.com/grafana/alloy/internal/component/otelcol/processor/probabilistic_sampler" // Import otelcol.processor.probabilistic_sampler
Expand Down
83 changes: 83 additions & 0 deletions internal/component/otelcol/processor/groupbyattrs/groupbyattrs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package groupbyattrs

import (
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/otelcol"
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/processor"
"github.com/grafana/alloy/internal/featuregate"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor"
otelcomponent "go.opentelemetry.io/collector/component"
otelextension "go.opentelemetry.io/collector/extension"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.processor.groupbyattrs",
Stability: featuregate.StabilityGenerallyAvailable,
Exports: otelcol.ConsumerExports{},
Args: Arguments{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := groupbyattrsprocessor.NewFactory()
return processor.New(opts, fact, args.(Arguments))
},
})
}

type Arguments struct {

// Keys is a list of attributes to group metrics by.
Keys []string `alloy:"keys,attr,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `alloy:"output,block"`

// DebugMetrics configures component internal metrics. Optional.
DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"`
}

var (
_ processor.Arguments = Arguments{}
)

// SetToDefault implements syntax.Defaulter.
func (args *Arguments) SetToDefault() {
*args = Arguments{
Keys: []string{},
}
args.DebugMetrics.SetToDefault()
}

// Validate implements syntax.Validator.
func (args *Arguments) Validate() error {
return nil
}

// Convert implements processor.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &groupbyattrsprocessor.Config{
GroupByKeys: args.Keys,
}, nil
}

// Extensions implements processor.Arguments.
func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension {
return nil
}

// Exporters implements processor.Arguments.
func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component {
return nil
}

// NextConsumers implements processor.Arguments.
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
return args.Output
}

// DebugMetricsConfig implements processor.Arguments.
func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments {
return args.DebugMetrics
}
Loading

0 comments on commit de79265

Please sign in to comment.