Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otelcol.processor.batch: new component #2333

Merged
merged 4 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ Main (unreleased)
traces, metrics, and logs. Data can then be forwarded to other `otelcol`
components. (@rfratto)

- Flow: add `otelcol.processor.batch` component which batches data from
`otelcol` components before forwarding it to other `otelcol` components.
(@rfratto)

### Features

- Add `agentctl test-logs` command to allow testing log configurations by redirecting
Expand Down
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
_ "github.com/grafana/agent/component/discovery/kubernetes" // Import discovery.kubernetes
_ "github.com/grafana/agent/component/discovery/relabel" // Import discovery.relabel
_ "github.com/grafana/agent/component/local/file" // Import local.file
_ "github.com/grafana/agent/component/otelcol/processor/batch" // Import otelcol.processor.batch
_ "github.com/grafana/agent/component/otelcol/receiver/otlp" // Import otelcol.receiver.otlp
_ "github.com/grafana/agent/component/prometheus/integration/node_exporter" // Import prometheus.integration.node_exporter
_ "github.com/grafana/agent/component/prometheus/relabel" // Import prometheus.relabel
Expand Down
89 changes: 89 additions & 0 deletions component/otelcol/processor/batch/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Package batch provides an otelcol.processor.batch component.
package batch

import (
"fmt"
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/processor"
"github.com/grafana/agent/pkg/river"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/processor/batchprocessor"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.processor.batch",
Args: Arguments{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := batchprocessor.NewFactory()
return processor.New(opts, fact, args.(Arguments))
},
})
}

// Arguments configures the otelcol.processor.batch component.
type Arguments struct {
Timeout time.Duration `river:"timeout,attr,optional"`
SendBatchSize uint32 `river:"send_batch_size,attr,optional"`
SendBatchMaxSize uint32 `river:"send_batch_max_size,attr,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `river:"output,block"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to reuse the forward_to terminology we used for Prometheus, or is this more in-line with what OTel does?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I think we need to think about it after we have a good set of otelcol components in. It might make sense for us to use different terminology just to help prevent confusing things (i.e., assume that you can use forward_to to send data to a prometheus.* receiver)

}

var (
_ processor.Arguments = Arguments{}
_ river.Unmarshaler = (*Arguments)(nil)
)

// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
Timeout: 200 * time.Millisecond,
SendBatchSize: 8192,
}

// UnmarshalRiver implements river.Unmarshaler. It applies defaults to args and
// validates settings provided by the user.
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
*args = DefaultArguments

type arguments Arguments
if err := f((*arguments)(args)); err != nil {
return err
}

if args.SendBatchMaxSize > 0 && args.SendBatchMaxSize < args.SendBatchSize {
return fmt.Errorf("send_batch_max_size must be greater or equal to send_batch_size when not 0")
}
return nil
}

// Convert implements processor.Arguments.
func (args Arguments) Convert() otelconfig.Processor {
return &batchprocessor.Config{
ProcessorSettings: otelconfig.NewProcessorSettings(otelconfig.NewComponentID("batch")),
Timeout: args.Timeout,
SendBatchSize: args.SendBatchSize,
SendBatchMaxSize: args.SendBatchMaxSize,
}
}

// Extensions implements processor.Arguments.
func (args Arguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
return nil
}

// Exporters implements processor.Arguments.
func (args Arguments) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
return nil
}

// NextConsumers implements processor.Arguments.
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
return args.Output
}
117 changes: 117 additions & 0 deletions component/otelcol/processor/batch/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package batch_test

import (
"context"
"testing"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"
"github.com/grafana/agent/component/otelcol/processor/batch"
"github.com/grafana/agent/pkg/flow/componenttest"
"github.com/grafana/agent/pkg/river"
"github.com/grafana/agent/pkg/util"
"github.com/grafana/dskit/backoff"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// Test performs a basic integration test which runs the
// otelcol.processor.batch component and ensures that it can accept, process, and forward data.
func Test(t *testing.T) {
ctx := componenttest.TestContext(t)
l := util.TestLogger(t)

ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.batch")
require.NoError(t, err)

cfg := `
timeout = "10ms"

output {
// no-op: will be overridden by test code.
}
`
var args batch.Arguments
require.NoError(t, river.Unmarshal([]byte(cfg), &args))

// Override our arguments so traces get forwarded to traceCh.
traceCh := make(chan ptrace.Traces)
args.Output = makeTracesOutput(traceCh)

go func() {
err := ctrl.Run(ctx, args)
require.NoError(t, err)
}()

require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")
require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")

// Send traces in the background to our processor.
go func() {
exports := ctrl.Exports().(otelcol.ConsumerExports)

bo := backoff.New(ctx, backoff.Config{
MinBackoff: 10 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond,
})
for bo.Ongoing() {
err := exports.Input.ConsumeTraces(ctx, createTestTraces())
if err != nil {
level.Error(l).Log("msg", "failed to send traces", "err", err)
bo.Wait()
continue
}

return
}
}()

// Wait for our processor to finish and forward data to traceCh.
select {
case <-time.After(time.Second):
require.FailNow(t, "failed waiting for traces")
case tr := <-traceCh:
require.Equal(t, 1, tr.SpanCount())
}
}

// makeTracesOutput returns ConsumerArguments which will forward traces to the
// provided channel.
func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments {
traceConsumer := fakeconsumer.Consumer{
ConsumeTracesFunc: func(ctx context.Context, t ptrace.Traces) error {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- t:
return nil
}
},
}

return &otelcol.ConsumerArguments{
Traces: []otelcol.Consumer{&traceConsumer},
}
}

func createTestTraces() ptrace.Traces {
// Matches format from the protobuf definition:
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
var bb = `{
"resource_spans": [{
"scope_spans": [{
"spans": [{
"name": "TestSpan"
}]
}]
}]
}`

data, err := ptrace.NewJSONUnmarshaler().UnmarshalTraces([]byte(bb))
if err != nil {
panic(err)
}
return data
}
109 changes: 109 additions & 0 deletions docs/sources/flow/reference/components/otelcol.processor.batch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
---
aliases:
- /docs/agent/latest/flow/reference/components/otelcol.processor.batch
title: otelcol.processor.batch
---

# otelcol.processor.batch

`otelcol.processor.batch` accepts telemetry data from other `otelcol`
components and places them into batches. Batching improves the compression of
data and reduces the number of outgoing network requests required to transmit
data.

> **NOTE**: `otelcol.processor.batch` is a wrapper over the upstream
> OpenTelemetry Collector `batch` processor. Bug reports or feature requests
> may be redirected to the upstream repository.

Multiple `otelcol.processor.batch` components can be specified by giving them
different labels.

## Usage

```river
otelcol.processor.batch "LABEL" {
output {
metrics = [...]
logs = [...]
traces = [...]
}
}
```

## Arguments

`otelcol.processor.batch` supports the following arguments:

Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`timeout` | `duration` | How long to wait before flushing the batch. | `"200ms"` | no
`send_batch_size` | `number` | Amount of data to buffer before flushing the batch. | `8192` | no
`send_batch_max_size` | `number` | Upper limit of a batch size. | `0` | no
tpaschalis marked this conversation as resolved.
Show resolved Hide resolved

`otelcol.processor.batch` accumulates data into a batch until one of the
following events happens:

* The duration specified by `timeout` elapses since the time the last batch was
sent.

* The number of spans, log lines, or metric samples processed exceeds the
number specified by `send_batch_size`.

Use `send_batch_max_size` to limit the amount of data contained in a single
batch. When set to `0`, batches can be any size.

For example, assume `send_batch_size` is set to the default `8192` and there
are currently 8000 batched spans. If the batch processor receives 8000 more
spans at once, the total batch size would be 16,192 which would then be flushed
as a single batch. `send_batch_max_size` constrains how big a batch can get.
When set to a non-zero value, `send_batch_max_size` must be greater or equal to
`send_batch_size`.

## Blocks

The following blocks are supported inside the definition of
`otelcol.processor.batch`:

Hierarchy | Block | Description | Required
--------- | ----- | ----------- | --------
output | [output][] | Configures where to send received telemetry data. | **yes**

[output]: #output-block

### output block

The `output` block configures a set of components to send batched telemetry
data to.

The following arguments are supported:

Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`metrics` | `list(otelcol.Consumer)` | List of consumers to send metrics to. | `[]` | no
`logs` | `list(otelcol.Consumer)` | List of consumers to send logs to. | `[]` | no
`traces` | `list(otelcol.Consumer)` | List of consumers to send traces to. | `[]` | no

The `output` block must be specified, but all of its arguments are optional. By
default, telemetry data is dropped. To send telemetry data to other components,
configure the `metrics`, `logs`, and `traces` arguments accordingly.

## Exported fields

The following fields are exported and can be referenced by other components:

Name | Type | Description
---- | ---- | -----------
`input` | `otelcol.Consumer` | A value that other components can use to send telemetry data to.

`input` accepts `otelcol.Consumer` data for any telemetry signal (metrics,
logs, or traces).

## Component health

`otelcol.processor.batch` is only reported as unhealthy if given an invalid
configuration.

## Debug information

`otelcol.processor.batch` does not expose any component-specific debug
information.
20 changes: 20 additions & 0 deletions docs/sources/flow/reference/components/otelcol.receiver.otlp.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ grpc > keepalive > enforcement_policy | [enforcement_policy][] | Enforcement pol
http | [http][] | Configures the HTTP server to receive telemetry data. | no
http > tls | [tls][] | Configures TLS for the HTTP server. | no
http > cors | [cors][] | Configures CORS for the HTTP server. | no
output | [output][] | Configures where to send received telemetry data. | **yes**

The `>` symbol indicates deeper levels of nesting. For example, `grpc > tls`
refers to a `tls` block defined inside a `grpc` block.
Expand All @@ -62,6 +63,7 @@ refers to a `tls` block defined inside a `grpc` block.
[enforcement_policy]: #enforcement_policy-block
[http]: #http-block
[cors]: #cors-block
[output]: #output-block

### grpc block

Expand Down Expand Up @@ -168,6 +170,24 @@ request. The following headers are always implicitly allowed:

If `allowed_headers` includes `"*"`, all headers will be permitted.

### output block

The `output` block configures a set of components to send received telemetry
data to.

The following arguments are supported:

Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`metrics` | `list(otelcol.Consumer)` | List of consumers to send metrics to. | `[]` | no
`logs` | `list(otelcol.Consumer)` | List of consumers to send logs to. | `[]` | no
`traces` | `list(otelcol.Consumer)` | List of consumers to send traces to. | `[]` | no

The `output` block must be specified, but all of its arguments are optional. By
default, telemetry data will be dropped. To send telemetry data to other
components, configure the `metrics`, `logs`, and `traces` arguments
accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK that we don't have examples for otelcol.processor.batch and otelcol.processor.batch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm planning on adding examples after #2288 is implemented (since that would allow examples for an entire pipeline of OpenTelemetry Collector components)

## Exported fields

`otelcol.receiver.otlp` does not export any fields.
Expand Down