From dfde58ec659313eed877602bb9683c95b88af2bf Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Fri, 7 Oct 2022 11:45:49 -0400 Subject: [PATCH 1/4] otelcol.processor.batch: new component Introduce a `otelcol.processor.batch` component which wraps around the upstream batch processor. Closes #2285. --- CHANGELOG.md | 4 + component/all/all.go | 1 + component/otelcol/processor/batch/batch.go | 87 +++++++++++++ .../otelcol/processor/batch/batch_test.go | 117 ++++++++++++++++++ .../components/otelcol.processor.batch.md | 110 ++++++++++++++++ .../components/otelcol.receiver.otlp.md | 20 +++ 6 files changed, 339 insertions(+) create mode 100644 component/otelcol/processor/batch/batch.go create mode 100644 component/otelcol/processor/batch/batch_test.go create mode 100644 docs/sources/flow/reference/components/otelcol.processor.batch.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 8dc3543e08e2..aa6e88600496 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,10 @@ Main (unreleased) traces, metrics, and logs. Data can then be forwarded to other `otelcol` components. (@rfratto) +- Flow: add `otelcol.processor.batch` component which batches data from + `otelcol` components before forwarding it to other `otelcol` components. + (@rfratto) + ### Features - Add `agentctl test-logs` command to allow testing log configurations by redirecting diff --git a/component/all/all.go b/component/all/all.go index 7c18ed3c2481..69d1d7e20a3c 100644 --- a/component/all/all.go +++ b/component/all/all.go @@ -5,6 +5,7 @@ import ( _ "github.com/grafana/agent/component/discovery/kubernetes" // Import discovery.kubernetes _ "github.com/grafana/agent/component/discovery/relabel" // Import discovery.relabel _ "github.com/grafana/agent/component/local/file" // Import local.file + _ "github.com/grafana/agent/component/otelcol/processor/batch" // Import otelcol.processor.batch _ "github.com/grafana/agent/component/otelcol/receiver/otlp" // Import otelcol.receiver.otlp _ "github.com/grafana/agent/component/prometheus/integration/node_exporter" // Import prometheus.integration.node_exporter _ "github.com/grafana/agent/component/prometheus/relabel" // Import prometheus.relabel diff --git a/component/otelcol/processor/batch/batch.go b/component/otelcol/processor/batch/batch.go new file mode 100644 index 000000000000..ce9b687df11f --- /dev/null +++ b/component/otelcol/processor/batch/batch.go @@ -0,0 +1,87 @@ +// Package batch provides an otelcol.processor.batch component. +package batch + +import ( + "fmt" + "time" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/processor" + "github.com/grafana/agent/pkg/river" + otelcomponent "go.opentelemetry.io/collector/component" + otelconfig "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/processor/batchprocessor" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.processor.batch", + Args: Arguments{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + fact := batchprocessor.NewFactory() + return processor.New(opts, fact, args.(Arguments)) + }, + }) +} + +// Arguments configures the otelcol.processor.batch component. +type Arguments struct { + Timeout time.Duration `river:"timeout,attr,optional"` + SendBatchSize uint32 `river:"send_batch_size,attr,optional"` + SendBatchMaxSize uint32 `river:"send_batch_max_size,attr,optional"` + + // Output configures where to send processed data. Required. + Output *otelcol.ConsumerArguments `river:"output,block"` +} + +var ( + _ processor.Arguments = Arguments{} + _ river.Unmarshaler = (*Arguments)(nil) +) + +// DefaultArguments holds default settings for Arguments. +var DefaultArguments = Arguments{ + Timeout: 200 * time.Millisecond, + SendBatchSize: 8192, +} + +func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error { + *args = DefaultArguments + + type arguments Arguments + if err := f((*arguments)(args)); err != nil { + return err + } + + if args.SendBatchMaxSize > 0 && args.SendBatchMaxSize < args.SendBatchSize { + return fmt.Errorf("send_batch_max_size must be greater or equal to send_batch_size when not 0") + } + return nil +} + +// Convert implements receiver.Arguments. +func (args Arguments) Convert() otelconfig.Processor { + return &batchprocessor.Config{ + ProcessorSettings: otelconfig.NewProcessorSettings(otelconfig.NewComponentID("batch")), + Timeout: args.Timeout, + SendBatchSize: args.SendBatchSize, + SendBatchMaxSize: args.SendBatchMaxSize, + } +} + +// Extensions implements receiver.Arguments. +func (args Arguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension { + return nil +} + +// Exporters implements receiver.Arguments. +func (args Arguments) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter { + return nil +} + +// NextConsumers implements receiver.Arguments. +func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { + return args.Output +} diff --git a/component/otelcol/processor/batch/batch_test.go b/component/otelcol/processor/batch/batch_test.go new file mode 100644 index 000000000000..bcb89091cb10 --- /dev/null +++ b/component/otelcol/processor/batch/batch_test.go @@ -0,0 +1,117 @@ +package batch_test + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log/level" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/internal/fakeconsumer" + "github.com/grafana/agent/component/otelcol/processor/batch" + "github.com/grafana/agent/pkg/flow/componenttest" + "github.com/grafana/agent/pkg/river" + "github.com/grafana/agent/pkg/util" + "github.com/grafana/dskit/backoff" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// Test performs a basic integration test which runs the +// otelcol.processor.batch component and ensures that it can accept, process, and forward data. +func Test(t *testing.T) { + ctx := componenttest.TestContext(t) + l := util.TestLogger(t) + + ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.batch") + require.NoError(t, err) + + cfg := ` + timeout = "10ms" + + output { + // no-op: will be overridden by test code. + } + ` + var args batch.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + // Override our arguments so traces get forwarded to traceCh. + traceCh := make(chan ptrace.Traces) + args.Output = makeTracesOutput(traceCh) + + go func() { + err := ctrl.Run(ctx, args) + require.NoError(t, err) + }() + + require.NoError(t, ctrl.WaitRunning(time.Second), "component never started") + require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything") + + // Send traces in the background to our receiver + go func() { + exports := ctrl.Exports().(otelcol.ConsumerExports) + + bo := backoff.New(ctx, backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + }) + for bo.Ongoing() { + err := exports.Input.ConsumeTraces(ctx, createTestTraces()) + if err != nil { + level.Error(l).Log("msg", "failed to send traces", "err", err) + bo.Wait() + continue + } + + return + } + }() + + // Wait for our processor to finish and forward data to traceCh. + select { + case <-time.After(time.Second): + require.FailNow(t, "failed waiting for traces") + case tr := <-traceCh: + require.Equal(t, 1, tr.SpanCount()) + } +} + +// makeTracesOutput returns ConsumerArguments which will forward traces to the +// provided channel. +func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments { + traceConsumer := fakeconsumer.Consumer{ + ConsumeTracesFunc: func(ctx context.Context, t ptrace.Traces) error { + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- t: + return nil + } + }, + } + + return &otelcol.ConsumerArguments{ + Traces: []otelcol.Consumer{&traceConsumer}, + } +} + +func createTestTraces() ptrace.Traces { + // Matches format from the protobuf definition: + // https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto + var bb = `{ + "resource_spans": [{ + "scope_spans": [{ + "spans": [{ + "name": "TestSpan" + }] + }] + }] + }` + + data, err := ptrace.NewJSONUnmarshaler().UnmarshalTraces([]byte(bb)) + if err != nil { + panic(err) + } + return data +} diff --git a/docs/sources/flow/reference/components/otelcol.processor.batch.md b/docs/sources/flow/reference/components/otelcol.processor.batch.md new file mode 100644 index 000000000000..3a5ec93b23fb --- /dev/null +++ b/docs/sources/flow/reference/components/otelcol.processor.batch.md @@ -0,0 +1,110 @@ +--- +aliases: +- /docs/agent/latest/flow/reference/components/otelcol.processor.batch +title: otelcol.processor.batch +--- + +# otelcol.processor.batch + +`otelcol.processor.batch` accepts telemetry data from other `otelcol` +components and places them into batches. Batching improves the compression of +data and reduces the number of outgoing network requests required to transmit +data. + +> **NOTE**: `otelcol.processor.batch` is a wrapper over the upstream +> OpenTelemetry Collector `batch` processor. Bug reports or feature requests +> may be redirected to the upstream repository. + +Multiple `otelcol.processor.batch` components can be specified by giving them +different labels. + +## Usage + +```river +otelcol.processor.batch "LABEL" { + output { + metrics = [...] + logs = [...] + traces = [...] + } +} +``` + +## Arguments + +`otelcol.processor.batch` supports the following arguments: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`timeout` | `duration` | How long to wait before flushing the batch. | `"200ms"` | no +`send_batch_size` | `number` | Amount of data to buffer before flushing the batch. | `8192` | no +`send_batch_max_size` | `number` | Upper limit of a batch size. | `0` | no + +`otelcol.processor.batch` accumulates data into a batch until one of the +following events happens: + +* The duration specified by `timeout` elapses since the time the last batch was + sent. + +* The number of spans, log lines, or metric samples processed goes above the + number specified by `send_batch_size`. + +`send_batch_max_size` can be used to limit the amount of data contained in a +single batch. When set to `0`, batches are allowed to be any size. + +For example, assume `send_batch_size` is set to the default `8192` and there +are currently 8000 batched spans. If 8000 more spans are received at once, it +would bring the total batch size to 16,192, which would then be flushed as a +single batch. `send_batch_max_size` allows to constrain how big a batch can +get. When set to a non-zero value, `send_batch_max_size` must be greater or +equal to `send_batch_size`. + +## Blocks + +The following blocks are supported inside the definition of +`otelcol.processor.batch`: + +Hierarchy | Block | Description | Required +--------- | ----- | ----------- | -------- +output | [output][] | Configures where to send received telemetry data. | **yes** + +[output]: #output-block + +### output block + +The `output` block configures a set of components to send batched telemetry +data to. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`metrics` | `list(otelcol.Consumer)` | List of consumers to send metrics to. | `[]` | no +`logs` | `list(otelcol.Consumer)` | List of consumers to send logs to. | `[]` | no +`traces` | `list(otelcol.Consumer)` | List of consumers to send traces to. | `[]` | no + +The `output` block must be specified, but all of its arguments are optional. By +default, telemetry data will be dropped. To send telemetry data to other +components, configure the `metrics`, `logs`, and `traces` arguments +accordingly. + +## Exported fields + +The following fields are exported and can be referenced by other components: + +Name | Type | Description +---- | ---- | ----------- +`input` | `otelcol.Consumer` | A value which other components can use to send telemetry data to. + +`input` accepts `otelcol.Consumer` data for any telemetry signal (metrics, +logs, or traces). + +## Component health + +`otelcol.processor.batch` is only reported as unhealthy if given an invalid +configuration. + +## Debug information + +`otelcol.processor.batch` does not expose any component-specific debug +information. diff --git a/docs/sources/flow/reference/components/otelcol.receiver.otlp.md b/docs/sources/flow/reference/components/otelcol.receiver.otlp.md index ba9aed4ef9e3..78693ec7ba9c 100644 --- a/docs/sources/flow/reference/components/otelcol.receiver.otlp.md +++ b/docs/sources/flow/reference/components/otelcol.receiver.otlp.md @@ -51,6 +51,7 @@ grpc > keepalive > enforcement_policy | [enforcement_policy][] | Enforcement pol http | [http][] | Configures the HTTP server to receive telemetry data. | no http > tls | [tls][] | Configures TLS for the HTTP server. | no http > cors | [cors][] | Configures CORS for the HTTP server. | no +output | [output][] | Configures where to send received telemetry data. | **yes** The `>` symbol indicates deeper levels of nesting. For example, `grpc > tls` refers to a `tls` block defined inside a `grpc` block. @@ -62,6 +63,7 @@ refers to a `tls` block defined inside a `grpc` block. [enforcement_policy]: #enforcement_policy-block [http]: #http-block [cors]: #cors-block +[output]: #output-block ### grpc block @@ -168,6 +170,24 @@ request. The following headers are always implicitly allowed: If `allowed_headers` includes `"*"`, all headers will be permitted. +### output block + +The `output` block configures a set of components to send received telemetry +data to. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`metrics` | `list(otelcol.Consumer)` | List of consumers to send metrics to. | `[]` | no +`logs` | `list(otelcol.Consumer)` | List of consumers to send logs to. | `[]` | no +`traces` | `list(otelcol.Consumer)` | List of consumers to send traces to. | `[]` | no + +The `output` block must be specified, but all of its arguments are optional. By +default, telemetry data will be dropped. To send telemetry data to other +components, configure the `metrics`, `logs`, and `traces` arguments +accordingly. + ## Exported fields `otelcol.receiver.otlp` does not export any fields. From 5d409251604c55946339ef9ae0ffee88023903fd Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Fri, 7 Oct 2022 13:34:13 -0400 Subject: [PATCH 2/4] address review feedback --- .../components/otelcol.processor.batch.md | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/docs/sources/flow/reference/components/otelcol.processor.batch.md b/docs/sources/flow/reference/components/otelcol.processor.batch.md index 3a5ec93b23fb..38915544f1bb 100644 --- a/docs/sources/flow/reference/components/otelcol.processor.batch.md +++ b/docs/sources/flow/reference/components/otelcol.processor.batch.md @@ -46,18 +46,18 @@ following events happens: * The duration specified by `timeout` elapses since the time the last batch was sent. -* The number of spans, log lines, or metric samples processed goes above the +* The number of spans, log lines, or metric samples processed exceeds the number specified by `send_batch_size`. -`send_batch_max_size` can be used to limit the amount of data contained in a -single batch. When set to `0`, batches are allowed to be any size. +Use `send_batch_max_size` to limit the amount of data contained in a single +batch. When set to `0`, batches can be any size. For example, assume `send_batch_size` is set to the default `8192` and there -are currently 8000 batched spans. If 8000 more spans are received at once, it -would bring the total batch size to 16,192, which would then be flushed as a -single batch. `send_batch_max_size` allows to constrain how big a batch can -get. When set to a non-zero value, `send_batch_max_size` must be greater or -equal to `send_batch_size`. +are currently 8000 batched spans. If the batch processor receives 8000 more +spans at once, the total batch size would be 16,192 which would then be flushed +as a single batch. `send_batch_max_size` constrains how big a batch can get. +When set to a non-zero value, `send_batch_max_size` must be greater or equal to +`send_batch_size`. ## Blocks @@ -84,9 +84,8 @@ Name | Type | Description | Default | Required `traces` | `list(otelcol.Consumer)` | List of consumers to send traces to. | `[]` | no The `output` block must be specified, but all of its arguments are optional. By -default, telemetry data will be dropped. To send telemetry data to other -components, configure the `metrics`, `logs`, and `traces` arguments -accordingly. +default, telemetry data is dropped. To send telemetry data to other components, +configure the `metrics`, `logs`, and `traces` arguments accordingly. ## Exported fields @@ -94,7 +93,7 @@ The following fields are exported and can be referenced by other components: Name | Type | Description ---- | ---- | ----------- -`input` | `otelcol.Consumer` | A value which other components can use to send telemetry data to. +`input` | `otelcol.Consumer` | A value that other components can use to send telemetry data to. `input` accepts `otelcol.Consumer` data for any telemetry signal (metrics, logs, or traces). From bf0fabaa24f5684092ee832dfa9a8a940a7772e8 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 11 Oct 2022 08:09:19 -0400 Subject: [PATCH 3/4] fix misclassification of processor --- component/otelcol/processor/batch/batch.go | 8 ++++---- component/otelcol/processor/batch/batch_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/component/otelcol/processor/batch/batch.go b/component/otelcol/processor/batch/batch.go index ce9b687df11f..e9f939584614 100644 --- a/component/otelcol/processor/batch/batch.go +++ b/component/otelcol/processor/batch/batch.go @@ -61,7 +61,7 @@ func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error { return nil } -// Convert implements receiver.Arguments. +// Convert implements processor.Arguments. func (args Arguments) Convert() otelconfig.Processor { return &batchprocessor.Config{ ProcessorSettings: otelconfig.NewProcessorSettings(otelconfig.NewComponentID("batch")), @@ -71,17 +71,17 @@ func (args Arguments) Convert() otelconfig.Processor { } } -// Extensions implements receiver.Arguments. +// Extensions implements processor.Arguments. func (args Arguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension { return nil } -// Exporters implements receiver.Arguments. +// Exporters implements processor.Arguments. func (args Arguments) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter { return nil } -// NextConsumers implements receiver.Arguments. +// NextConsumers implements processor.Arguments. func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { return args.Output } diff --git a/component/otelcol/processor/batch/batch_test.go b/component/otelcol/processor/batch/batch_test.go index bcb89091cb10..9f8bed232414 100644 --- a/component/otelcol/processor/batch/batch_test.go +++ b/component/otelcol/processor/batch/batch_test.go @@ -48,7 +48,7 @@ func Test(t *testing.T) { require.NoError(t, ctrl.WaitRunning(time.Second), "component never started") require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything") - // Send traces in the background to our receiver + // Send traces in the background to our processor. go func() { exports := ctrl.Exports().(otelcol.ConsumerExports) From b0ca21fbcc81164faca3ff16b45cbdeee2a141d3 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 11 Oct 2022 08:29:02 -0400 Subject: [PATCH 4/4] fix lint error --- component/otelcol/processor/batch/batch.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/component/otelcol/processor/batch/batch.go b/component/otelcol/processor/batch/batch.go index e9f939584614..c3c716d839c4 100644 --- a/component/otelcol/processor/batch/batch.go +++ b/component/otelcol/processor/batch/batch.go @@ -47,6 +47,8 @@ var DefaultArguments = Arguments{ SendBatchSize: 8192, } +// UnmarshalRiver implements river.Unmarshaler. It applies defaults to args and +// validates settings provided by the user. func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error { *args = DefaultArguments