Skip to content

Commit

Permalink
otelcol.processor.batch: new component
Browse files Browse the repository at this point in the history
Introduce a `otelcol.processor.batch` component which wraps around the
upstream batch processor.

Closes #2285.
  • Loading branch information
rfratto committed Oct 7, 2022
1 parent 54c3fea commit dfde58e
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ Main (unreleased)
traces, metrics, and logs. Data can then be forwarded to other `otelcol`
components. (@rfratto)

- Flow: add `otelcol.processor.batch` component which batches data from
`otelcol` components before forwarding it to other `otelcol` components.
(@rfratto)

### Features

- Add `agentctl test-logs` command to allow testing log configurations by redirecting
Expand Down
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
_ "github.com/grafana/agent/component/discovery/kubernetes" // Import discovery.kubernetes
_ "github.com/grafana/agent/component/discovery/relabel" // Import discovery.relabel
_ "github.com/grafana/agent/component/local/file" // Import local.file
_ "github.com/grafana/agent/component/otelcol/processor/batch" // Import otelcol.processor.batch
_ "github.com/grafana/agent/component/otelcol/receiver/otlp" // Import otelcol.receiver.otlp
_ "github.com/grafana/agent/component/prometheus/integration/node_exporter" // Import prometheus.integration.node_exporter
_ "github.com/grafana/agent/component/prometheus/relabel" // Import prometheus.relabel
Expand Down
87 changes: 87 additions & 0 deletions component/otelcol/processor/batch/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Package batch provides an otelcol.processor.batch component.
package batch

import (
"fmt"
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/processor"
"github.com/grafana/agent/pkg/river"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/processor/batchprocessor"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.processor.batch",
Args: Arguments{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := batchprocessor.NewFactory()
return processor.New(opts, fact, args.(Arguments))
},
})
}

// Arguments configures the otelcol.processor.batch component.
type Arguments struct {
Timeout time.Duration `river:"timeout,attr,optional"`
SendBatchSize uint32 `river:"send_batch_size,attr,optional"`
SendBatchMaxSize uint32 `river:"send_batch_max_size,attr,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `river:"output,block"`
}

var (
_ processor.Arguments = Arguments{}
_ river.Unmarshaler = (*Arguments)(nil)
)

// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
Timeout: 200 * time.Millisecond,
SendBatchSize: 8192,
}

func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
*args = DefaultArguments

type arguments Arguments
if err := f((*arguments)(args)); err != nil {
return err
}

if args.SendBatchMaxSize > 0 && args.SendBatchMaxSize < args.SendBatchSize {
return fmt.Errorf("send_batch_max_size must be greater or equal to send_batch_size when not 0")
}
return nil
}

// Convert implements receiver.Arguments.
func (args Arguments) Convert() otelconfig.Processor {
return &batchprocessor.Config{
ProcessorSettings: otelconfig.NewProcessorSettings(otelconfig.NewComponentID("batch")),
Timeout: args.Timeout,
SendBatchSize: args.SendBatchSize,
SendBatchMaxSize: args.SendBatchMaxSize,
}
}

// Extensions implements receiver.Arguments.
func (args Arguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
return nil
}

// Exporters implements receiver.Arguments.
func (args Arguments) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
return nil
}

// NextConsumers implements receiver.Arguments.
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
return args.Output
}
117 changes: 117 additions & 0 deletions component/otelcol/processor/batch/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package batch_test

import (
"context"
"testing"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"
"github.com/grafana/agent/component/otelcol/processor/batch"
"github.com/grafana/agent/pkg/flow/componenttest"
"github.com/grafana/agent/pkg/river"
"github.com/grafana/agent/pkg/util"
"github.com/grafana/dskit/backoff"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// Test performs a basic integration test which runs the
// otelcol.processor.batch component and ensures that it can accept, process, and forward data.
func Test(t *testing.T) {
ctx := componenttest.TestContext(t)
l := util.TestLogger(t)

ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.batch")
require.NoError(t, err)

cfg := `
timeout = "10ms"
output {
// no-op: will be overridden by test code.
}
`
var args batch.Arguments
require.NoError(t, river.Unmarshal([]byte(cfg), &args))

// Override our arguments so traces get forwarded to traceCh.
traceCh := make(chan ptrace.Traces)
args.Output = makeTracesOutput(traceCh)

go func() {
err := ctrl.Run(ctx, args)
require.NoError(t, err)
}()

require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")
require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")

// Send traces in the background to our receiver
go func() {
exports := ctrl.Exports().(otelcol.ConsumerExports)

bo := backoff.New(ctx, backoff.Config{
MinBackoff: 10 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond,
})
for bo.Ongoing() {
err := exports.Input.ConsumeTraces(ctx, createTestTraces())
if err != nil {
level.Error(l).Log("msg", "failed to send traces", "err", err)
bo.Wait()
continue
}

return
}
}()

// Wait for our processor to finish and forward data to traceCh.
select {
case <-time.After(time.Second):
require.FailNow(t, "failed waiting for traces")
case tr := <-traceCh:
require.Equal(t, 1, tr.SpanCount())
}
}

// makeTracesOutput returns ConsumerArguments which will forward traces to the
// provided channel.
func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments {
traceConsumer := fakeconsumer.Consumer{
ConsumeTracesFunc: func(ctx context.Context, t ptrace.Traces) error {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- t:
return nil
}
},
}

return &otelcol.ConsumerArguments{
Traces: []otelcol.Consumer{&traceConsumer},
}
}

func createTestTraces() ptrace.Traces {
// Matches format from the protobuf definition:
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
var bb = `{
"resource_spans": [{
"scope_spans": [{
"spans": [{
"name": "TestSpan"
}]
}]
}]
}`

data, err := ptrace.NewJSONUnmarshaler().UnmarshalTraces([]byte(bb))
if err != nil {
panic(err)
}
return data
}
110 changes: 110 additions & 0 deletions docs/sources/flow/reference/components/otelcol.processor.batch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
---
aliases:
- /docs/agent/latest/flow/reference/components/otelcol.processor.batch
title: otelcol.processor.batch
---

# otelcol.processor.batch

`otelcol.processor.batch` accepts telemetry data from other `otelcol`
components and places them into batches. Batching improves the compression of
data and reduces the number of outgoing network requests required to transmit
data.

> **NOTE**: `otelcol.processor.batch` is a wrapper over the upstream
> OpenTelemetry Collector `batch` processor. Bug reports or feature requests
> may be redirected to the upstream repository.
Multiple `otelcol.processor.batch` components can be specified by giving them
different labels.

## Usage

```river
otelcol.processor.batch "LABEL" {
output {
metrics = [...]
logs = [...]
traces = [...]
}
}
```

## Arguments

`otelcol.processor.batch` supports the following arguments:

Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`timeout` | `duration` | How long to wait before flushing the batch. | `"200ms"` | no
`send_batch_size` | `number` | Amount of data to buffer before flushing the batch. | `8192` | no
`send_batch_max_size` | `number` | Upper limit of a batch size. | `0` | no

`otelcol.processor.batch` accumulates data into a batch until one of the
following events happens:

* The duration specified by `timeout` elapses since the time the last batch was
sent.

* The number of spans, log lines, or metric samples processed goes above the
number specified by `send_batch_size`.

`send_batch_max_size` can be used to limit the amount of data contained in a
single batch. When set to `0`, batches are allowed to be any size.

For example, assume `send_batch_size` is set to the default `8192` and there
are currently 8000 batched spans. If 8000 more spans are received at once, it
would bring the total batch size to 16,192, which would then be flushed as a
single batch. `send_batch_max_size` allows to constrain how big a batch can
get. When set to a non-zero value, `send_batch_max_size` must be greater or
equal to `send_batch_size`.

## Blocks

The following blocks are supported inside the definition of
`otelcol.processor.batch`:

Hierarchy | Block | Description | Required
--------- | ----- | ----------- | --------
output | [output][] | Configures where to send received telemetry data. | **yes**

[output]: #output-block

### output block

The `output` block configures a set of components to send batched telemetry
data to.

The following arguments are supported:

Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`metrics` | `list(otelcol.Consumer)` | List of consumers to send metrics to. | `[]` | no
`logs` | `list(otelcol.Consumer)` | List of consumers to send logs to. | `[]` | no
`traces` | `list(otelcol.Consumer)` | List of consumers to send traces to. | `[]` | no

The `output` block must be specified, but all of its arguments are optional. By
default, telemetry data will be dropped. To send telemetry data to other
components, configure the `metrics`, `logs`, and `traces` arguments
accordingly.

## Exported fields

The following fields are exported and can be referenced by other components:

Name | Type | Description
---- | ---- | -----------
`input` | `otelcol.Consumer` | A value which other components can use to send telemetry data to.

`input` accepts `otelcol.Consumer` data for any telemetry signal (metrics,
logs, or traces).

## Component health

`otelcol.processor.batch` is only reported as unhealthy if given an invalid
configuration.

## Debug information

`otelcol.processor.batch` does not expose any component-specific debug
information.
20 changes: 20 additions & 0 deletions docs/sources/flow/reference/components/otelcol.receiver.otlp.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ grpc > keepalive > enforcement_policy | [enforcement_policy][] | Enforcement pol
http | [http][] | Configures the HTTP server to receive telemetry data. | no
http > tls | [tls][] | Configures TLS for the HTTP server. | no
http > cors | [cors][] | Configures CORS for the HTTP server. | no
output | [output][] | Configures where to send received telemetry data. | **yes**

The `>` symbol indicates deeper levels of nesting. For example, `grpc > tls`
refers to a `tls` block defined inside a `grpc` block.
Expand All @@ -62,6 +63,7 @@ refers to a `tls` block defined inside a `grpc` block.
[enforcement_policy]: #enforcement_policy-block
[http]: #http-block
[cors]: #cors-block
[output]: #output-block

### grpc block

Expand Down Expand Up @@ -168,6 +170,24 @@ request. The following headers are always implicitly allowed:

If `allowed_headers` includes `"*"`, all headers will be permitted.

### output block

The `output` block configures a set of components to send received telemetry
data to.

The following arguments are supported:

Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`metrics` | `list(otelcol.Consumer)` | List of consumers to send metrics to. | `[]` | no
`logs` | `list(otelcol.Consumer)` | List of consumers to send logs to. | `[]` | no
`traces` | `list(otelcol.Consumer)` | List of consumers to send traces to. | `[]` | no

The `output` block must be specified, but all of its arguments are optional. By
default, telemetry data will be dropped. To send telemetry data to other
components, configure the `metrics`, `logs`, and `traces` arguments
accordingly.

## Exported fields

`otelcol.receiver.otlp` does not export any fields.
Expand Down

0 comments on commit dfde58e

Please sign in to comment.