From 9188a9fd696f6539a5584157fae5dc8c0e0ef5c3 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Fri, 9 Dec 2022 12:10:03 -0500 Subject: [PATCH] otelcol.receiver.kafka: new component Introduce a `otelcol.receiver.kafka` component which receives telemetry data from a Kafka topic. The componetn wraps around the upstream Kafka receiver from the `otelcol-contrib` distribution. Closes #2291. --- CHANGELOG.md | 9 +- component/otelcol/receiver/kafka/kafka.go | 283 ++++++++++++++++ .../components/otelcol.receiver.kafka.md | 304 ++++++++++++++++++ 3 files changed, 593 insertions(+), 3 deletions(-) create mode 100644 component/otelcol/receiver/kafka/kafka.go create mode 100644 docs/sources/flow/reference/components/otelcol.receiver.kafka.md diff --git a/CHANGELOG.md b/CHANGELOG.md index e85cc28c98ba..c8f6aab8d74d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,8 @@ Main (unreleased) - `loki.process` receives log entries from other `loki` components and runs one or more processing stages. (@tpaschalis) + - `otelcol.receiver.kafka` receives telemetry data from Kafka. (@rfratto) + ### Enhancements - Integrations: Always use direct connection in mongodb_exporter integration. (@v-zhuravlev) @@ -83,7 +85,7 @@ Main (unreleased) - Native histograms are now supported in the static Grafana Agent and in `prometheus.*` Flow components. Native histograms will be automatically collected from supported targets. remote_write must be configured to forward - native histograms from the WAL to the specified endpoints. + native histograms from the WAL to the specified endpoints. (@rfratto) - Flow: metrics generated by upstream OpenTelemetry Collector components are now exposed at the `/metrics` endpoint of Grafana Agent Flow. (@rfratto) @@ -98,8 +100,9 @@ Main (unreleased) - Remove duplicate `oauth2` key from `metricsinstances` CRD. (@daper) -- Fix issue where on checking whether to restart integrations the Integration Manager was comparing - configs with secret values scrubbed, preventing reloads if only secrets were updated. (@spartan0x117) +- Fix issue where on checking whether to restart integrations the Integration + Manager was comparing configs with secret values scrubbed, preventing reloads + if only secrets were updated. (@spartan0x117) ### Other changes diff --git a/component/otelcol/receiver/kafka/kafka.go b/component/otelcol/receiver/kafka/kafka.go new file mode 100644 index 000000000000..dcb36f58be1a --- /dev/null +++ b/component/otelcol/receiver/kafka/kafka.go @@ -0,0 +1,283 @@ +// Package kafka provides an otelcol.receiver.kafka component. +package kafka + +import ( + "time" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/receiver" + "github.com/grafana/agent/pkg/flow/rivertypes" + "github.com/grafana/agent/pkg/river" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" + otelcomponent "go.opentelemetry.io/collector/component" + otelconfig "go.opentelemetry.io/collector/config" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.receiver.kafka", + Args: Arguments{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + fact := kafkareceiver.NewFactory() + return receiver.New(opts, fact, args.(Arguments)) + }, + }) +} + +// Arguments configures the otelcol.receiver.kafka component. +type Arguments struct { + Brokers []string `river:"brokers,attr"` + ProtocolVersion string `river:"protocol_version,attr"` + Topic string `river:"topic,attr,optional"` + Encoding string `river:"encoding,attr,optional"` + GroupID string `river:"group_id,attr,optional"` + ClientID string `river:"client_id,attr,optional"` + + Authentication AuthenticationArguments `river:"authentication,block,optional"` + Metadata MetadataArguments `river:"metadata,block,optional"` + AutoCommit AutoCommitArguments `river:"autocommit,block,optional"` + MessageMarking MessageMarkingArguments `river:"message_marking,block,optional"` + + // Output configures where to send received data. Required. + Output *otelcol.ConsumerArguments `river:"output,block"` +} + +var ( + _ river.Unmarshaler = (*Arguments)(nil) + _ receiver.Arguments = Arguments{} +) + +// DefaultArguments holds default values for Arguments. +var DefaultArguments = Arguments{ + // We use the defaults from the upstream OpenTelemetry Collector component + // for compatibility, even though that means using a client and group ID of + // "otel-collector". + + Topic: "otlp_spans", + Encoding: "otlp_proto", + Brokers: []string{"localhost:9092"}, + ClientID: "otel-collector", + GroupID: "otel-collector", + Metadata: MetadataArguments{ + IncludeAllTopics: true, + Retry: MetadataRetryArguments{ + MaxRetries: 3, + Backoff: 250 * time.Millisecond, + }, + }, + AutoCommit: AutoCommitArguments{ + Enable: true, + Interval: time.Second, + }, + MessageMarking: MessageMarkingArguments{ + AfterExecution: false, + IncludeUnsuccessful: false, + }, +} + +// UnmarshalRiver implements river.Unmarshaler and applies default settings. +func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error { + *args = DefaultArguments + + type arguments Arguments + return f((*arguments)(args)) +} + +// Convert implements receiver.Arguments. +func (args Arguments) Convert() otelconfig.Receiver { + return &kafkareceiver.Config{ + Brokers: args.Brokers, + ProtocolVersion: args.ProtocolVersion, + Topic: args.Topic, + Encoding: args.Encoding, + GroupID: args.GroupID, + ClientID: args.ClientID, + + Authentication: args.Authentication.Convert(), + Metadata: args.Metadata.Convert(), + AutoCommit: args.AutoCommit.Convert(), + MessageMarking: args.MessageMarking.Convert(), + } +} + +// Extensions implements receiver.Arguments. +func (args Arguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension { + return nil +} + +// Exporters implements receiver.Arguments. +func (args Arguments) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter { + return nil +} + +// NextConsumers implements receiver.Arguments. +func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { + return args.Output +} + +// AuthenticationArguments configures how to authenticate to the Kafka broker. +type AuthenticationArguments struct { + Plaintext *PlaintextArguments `river:"plaintext,block,optional"` + SASL *SASLArguments `river:"sasl,block,optional"` + TLS *otelcol.TLSClientArguments `river:"tls,block,optional"` + Kerberos *KerberosArguments `river:"kerberos,block,optional"` +} + +// Convert converts args into the upstream type. +func (args AuthenticationArguments) Convert() kafkaexporter.Authentication { + var res kafkaexporter.Authentication + + if args.Plaintext != nil { + conv := args.Plaintext.Convert() + res.PlainText = &conv + } + if args.SASL != nil { + conv := args.SASL.Convert() + res.SASL = &conv + } + if args.TLS != nil { + res.TLS = args.TLS.Convert() + } + if args.Kerberos != nil { + conv := args.Kerberos.Convert() + res.Kerberos = &conv + } + + return res +} + +// PlaintextArguments configures plaintext authentication against the Kafka +// broker. +type PlaintextArguments struct { + Username string `river:"username,attr"` + Password rivertypes.Secret `river:"password,attr"` +} + +// Convert converts args into the upstream type. +func (args PlaintextArguments) Convert() kafkaexporter.PlainTextConfig { + return kafkaexporter.PlainTextConfig{ + Username: args.Username, + Password: string(args.Password), + } +} + +// SASLArguments configures SASL authentication against the Kafka broker. +type SASLArguments struct { + Username string `river:"username,attr"` + Password rivertypes.Secret `river:"password,attr"` + Mechanism string `river:"mechanism,attr"` + AWSMSK AWSMSKArguments `river:"aws_msk,block,optional"` +} + +// Convert converts args into the upstream type. +func (args SASLArguments) Convert() kafkaexporter.SASLConfig { + return kafkaexporter.SASLConfig{ + Username: args.Username, + Password: string(args.Password), + Mechanism: args.Mechanism, + AWSMSK: args.AWSMSK.Convert(), + } +} + +// AWSMSKArguments exposes additional SASL authentication measures required to +// use the AWS_MSK_IAM mechanism. +type AWSMSKArguments struct { + Region string `river:"region,attr"` + BrokerAddr string `river:"broker_addr,attr"` +} + +// Convert converts args into the upstream type. +func (args AWSMSKArguments) Convert() kafkaexporter.AWSMSKConfig { + return kafkaexporter.AWSMSKConfig{ + Region: args.Region, + BrokerAddr: args.BrokerAddr, + } +} + +// KerberosArguments configures Kerberos authentication against the Kafka +// broker. +type KerberosArguments struct { + ServiceName string `river:"service_name,attr,optional"` + Realm string `river:"realm,attr,optional"` + UseKeyTab bool `river:"use_keytab,attr,optional"` + Username string `river:"username,attr"` + Password rivertypes.Secret `river:"password,attr,optional"` + ConfigPath string `river:"config_file,attr,optional"` + KeyTabPath string `river:"keytab_file,attr,optional"` +} + +// Convert converts args into the upstream type. +func (args KerberosArguments) Convert() kafkaexporter.KerberosConfig { + return kafkaexporter.KerberosConfig{ + ServiceName: args.ServiceName, + Realm: args.Realm, + UseKeyTab: args.UseKeyTab, + Username: args.Username, + Password: string(args.Password), + ConfigPath: args.ConfigPath, + KeyTabPath: args.KeyTabPath, + } +} + +// MetadataArguments configures how the otelcol.receiver.kafka component will +// retrieve metadata from the Kafka broker. +type MetadataArguments struct { + IncludeAllTopics bool `river:"include_all_topics,attr,optional"` + Retry MetadataRetryArguments `river:"retry,block,optional"` +} + +// Convert converts args into the upstream type. +func (args MetadataArguments) Convert() kafkaexporter.Metadata { + return kafkaexporter.Metadata{ + Full: args.IncludeAllTopics, + Retry: args.Retry.Convert(), + } +} + +// MetadataRetryArguments configures how to retry retrieving metadata from the +// Kafka broker. Retrying is useful to avoid race conditions when the Kafka +// broker is starting at the same time as the otelcol.receiver.kafka component. +type MetadataRetryArguments struct { + MaxRetries int `river:"max_retries,attr,optional"` + Backoff time.Duration `river:"backoff,attr,optional"` +} + +// Convert converts args into the upstream type. +func (args MetadataRetryArguments) Convert() kafkaexporter.MetadataRetry { + return kafkaexporter.MetadataRetry{ + Max: args.MaxRetries, + Backoff: args.Backoff, + } +} + +// AutoCommitArguments configures how to automatically commit updated topic +// offsets back to the Kafka broker. +type AutoCommitArguments struct { + Enable bool `river:"enable,attr,optional"` + Interval time.Duration `river:"interval,attr,optional"` +} + +// Convert converts args into the upstream type. +func (args AutoCommitArguments) Convert() kafkareceiver.AutoCommit { + return kafkareceiver.AutoCommit{ + Enable: args.Enable, + Interval: args.Interval, + } +} + +// MessageMarkingArguments configures when Kafka messages are marked as read. +type MessageMarkingArguments struct { + AfterExecution bool `river:"after_execution,attr,optional"` + IncludeUnsuccessful bool `river:"include_unsuccessful,attr,optional"` +} + +// Convert converts args into the upstream type. +func (args MessageMarkingArguments) Convert() kafkareceiver.MessageMarking { + return kafkareceiver.MessageMarking{ + After: args.AfterExecution, + OnError: args.IncludeUnsuccessful, + } +} diff --git a/docs/sources/flow/reference/components/otelcol.receiver.kafka.md b/docs/sources/flow/reference/components/otelcol.receiver.kafka.md new file mode 100644 index 000000000000..f4d9aac1003c --- /dev/null +++ b/docs/sources/flow/reference/components/otelcol.receiver.kafka.md @@ -0,0 +1,304 @@ +--- +aliases: +- /docs/agent/latest/flow/reference/components/otelcol.receiver.kafka +title: otelcol.receiver.kafka +--- + +# otelcol.receiver.kafka + +`otelcol.receiver.kafka` accepts telemetry data from a Kafka broker and +forwards it to other `otelcol.*` components. + +> **NOTE**: `otelcol.receiver.kafka` is a wrapper over the upstream +> OpenTelemetry Collector `kafka` receiver from the `otelcol-contrib` +> distribution. Bug reports or feature requests will be redirected to the +> upstream repository, if necessary. + +Multiple `otelcol.receiver.kafka` components can be specified by giving them +different labels. + +## Usage + +```river +otelcol.receiver.kafka "LABEL" { + brokers = ["BROKER_ADDR"] + protocol_version = "PROTOCOL_VERSION" + + output { + metrics = [...] + logs = [...] + traces = [...] + } +} +``` + +## Arguments + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`brokers` | `array(string)` | Kafka brokers to connect to. | | yes +`protocol_version` | `string` | Kafka protocol version to use. | | yes +`topic` | `string` | Kafka topic to read from. | `"otlp_spans"` | no +`encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no +`group_id` | `string` | Consumer group to consume messages from. | `"otel-collector"` | no +`client_id` | `string` | Consumer client ID to use. | `"otel-collector"` | no + +The `encoding` argument determines how to decode messages read from Kafka. +`encoding` must be one of the following strings: + +* `"otlp_proto"`: Decode messages as OTLP protobuf. +* `"jaeger_proto"`: Decode messages as a single Jaeger protobuf span. +* `"jaeger_json"`: Decode messages as a single Jaeger JSON span. +* `"zipkin_proto"`: Decode messages as a list of Zipkin protobuf spans. +* `"zipkin_json"`: Decode messages as a list of Zipkin JSON spans. +* `"zipkin_thrift"`: Decode messages as a list of Zipkin Thrift spans. +* `"raw"`: Copy the message bytes into the body of a log record. + +`"otlp_proto"` must be used to read all telemetry types from Kafka; other +encodings are signal-specific. + +## Blocks + +The following blocks are supported inside the definition of +`otelcol.receiver.kafka`: + +Hierarchy | Block | Description | Required +--------- | ----- | ----------- | -------- +authentication | [authentication][] | Configures authentication for connecting to Kafka brokers. | no +authentication > plaintext | [plaintext][] | Authenticates against Kafka brokers with plaintext. | no +authentication > sasl | [sasl][] | Authenticates against Kafka brokers with SASL. | no +authentication > sasl > aws_msk | [aws_msk][] | Additional SASL parameters when using AWS_MSK_IAM. | no +authentication > tls | [tls][] | Configures TLS for connecting to the Kafka brokers. | no +authentication > kerberos | [kerberos][] | Authenticates against Kafka brokers with Kerberos. | no +metadata | [metadata][] | Configures how to retrieve metadata from Kafka brokers. | no +metadata > retry | [retry][] | Configures how to retry metadata retrieval. | no +autocommit | [autocommit][] | Configures how to automatically commit updated topic offsets to back to the Kafka brokers. | no +message_marking | [message_marking][] | Configures when Kafka messages are marked as read. | no +output | [output][] | Configures where to send received telemetry data. | yes + +The `>` symbol indicates deeper levels of nesting. For example, `authentication +> tls` refers to a `tls` block defined inside an `authentication` block. + +[authentication]: #authentication-block +[plaintext]: #plaintext-block +[sasl]: #sasl-block +[aws_msk]: #aws_msk-block +[tls]: #tls-block +[kerberos]: #kerberos-block +[metadata]: #metadata-block +[retry]: #retry-block +[autocommit]: #autocommit-block +[message_marking]: #message_marking-block +[output]: #output-block + +### authentication block + +The `authentication` block holds the definition of different authentication +mechanisms to use when connecting to Kafka brokers. It doesn't support any +arguments and is configured fully through inner blocks. + +### plaintext block + +The `plaintext` block configures `PLAIN` authentication against Kafka brokers. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`username` | `string` | Username to use for `PLAIN` authentication. | | yes +`password` | `secret` | Password to use for `PLAIN` authentication. | | yes + +### sasl block + +The `sasl` block configures SASL authentication against Kafka brokers. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`username` | `string` | Username to use for `PLAIN` authentication. | | yes +`password` | `secret` | Password to use for `PLAIN` authentication. | | yes +`mechanism` | `string` | Mechanism to use when authenticating. | | yes + +The `mechanism` argument may be set to one of the following strings: + +* `"PLAIN"` +* `"AWS_MSK_IAM"` +* `"SCRAM-SHA-256"` +* `"SCRAM-SHA-512"` + +When `mechanism` is set to `"AWS_MSK_IAM"`, the [`aws_msk` child block][aws_msk] must also be provided. + +### aws_msk block + +The `aws_msk` block configures extra parameters for SASL authentication when +using the `AWS_MSK_IAM` mechanism. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`region` | `string` | AWS region the MSK cluster is based in. | | yes +`broker_addr` | `string` | MSK address to connect to for authentication. | | yes + +### tls block + +The `tls` block configures TLS settings used for connecting to the Kafka +brokers. If the `tls` block isn't provided, TLS won't be used for +communication. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`ca_file` | `string` | Path to the CA file. | | no +`cert_file` | `string` | Path to the TLS certificate. | | no +`key_file` | `string` | Path to the TLS certificate key. | | no +`min_version` | `string` | Minimum acceptable TLS version for connections. | `"TLS 1.2"` | no +`max_version` | `string` | Maximum acceptable TLS version for connections. | `"TLS 1.3"` | no +`reload_interval` | `duration` | Frequency to reload the certificates. | | no +`client_ca_file` | `string` | Path to the CA file used to authenticate client certificates. | | no + +### kerberos block + +The `kerberos` block configures Kerberos authentication against the Kafka +broker. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`service_name` | `string` | Kerberos service name. | | no +`realm` | `string` | Kerberos realm. | | no +`use_keytab` | `string` | Enables using keytab instead of password. | | no +`username` | `string` | Kerberos username to authenticate as. | | yes +`password` | `secret` | Kerberos password to authenticate with. | | no +`config_file` | `string` | Path to Kerberos location (for example, `/etc/krb5.conf`). | | no +`keytab_file` | `string` | Path to keytab file (for eaxmple, `/etc/security/kafka.keytab`). | | no + +When `use_keytab` is `false`, the `password` argument is required. When +`use_keytab` is `true`, the file pointed to by the `keytab_file` argument is +used for authentication instead. At most one of `password` or `keytab_file` +must be provided. + +### metadata block + +The `metadata` block configures how to retrieve and store metadata from the +Kafka broker. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`include_all_topics` | `bool` | When true, maintains metadata for all topics. | `true` | no + +If the `include_all_topics` argument is `true`, `otelcol.receiver.kafka` +maintains a full set of metadata for all topics rather than the minimal set +that has been necessary so far. Including the full set of metadata is more +convenient for users, can consume a substantial amount of memory if you have +many topics and partitions. + +Retrieving metadata may fail if the Kafka broker is starting up at the same +time as the `otelcol.receiver.kafka` component. The [`retry` child +block][retry] can be provided to customize retry behavior. + +### retry block + +The `retry` block configures how to retry retrieving metadata when retrieval +fails. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`max_retries` | `number` | How many times to reattempt retrieving metadata. | `3` | no +`backoff` | `duration` | Time to wait between retries. | `"250ms"` | no + +### autocommit block + +The `autocommit` block configures how to automatically commit updated topic +offsets back to the Kafka brokers. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`enable` | `bool` | Enable autocommitting updated topic offsets. | `true` | no +`interval` | `duration` | How frequently to autocommit. | `"1s"` | no + +### message_marking block + +The `message_marking` block configures when Kafka messages are marked as read. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`after_execution` | `bool` | Mark messages after forwarding telemetry data to other components. | `false` | no +`include_unsuccessful` | `bool` | Whether failed forwards should be marked as read. | `false` | no + +By default, a Kafka message is marked as read immediately after it is retrieved +from the Kafka broker. If the `after_execution` argument is true, messages are +only read after the telemetry data is forwarded to components specified in [the +`output` block][output]. + +When `after_execution` is true, messages are only marked as read when they are +decoded successfully and components where the data was forwarded did not return +an error. If the `include_unsuccessful` argument is true, messages are marked +as read even if decoding or forwarding failed. Setting `include_unsuccessful` +has no effect if `after_execution` is `false`. + +> **WARNING**: Setting `after_execution` to `true` and `include_unsuccessful` +> to `false` can block the entire Kafka partition if message processing returns +> a permanent error, such as failing to decode. + +### output block + +{{< docs/shared lookup="flow/reference/components/output-block.md" source="agent" >}} + +## Exported fields + +`otelcol.receiver.kafka` does not export any fields. + +## Component health + +`otelcol.receiver.kafka` is only reported as unhealthy if given an invalid +configuration. + +## Debug information + +`otelcol.receiver.kafka` does not expose any component-specific debug +information. + +## Example + +This example forwards read telemetry data through a batch processor before +finally sending it to an OTLP-capable endpoint: + +```river +otelcol.receiver.kafka "default" { + brokers = ["localhost:9092"] + + output { + metrics = [otelcol.processor.batch.default.input] + logs = [otelcol.processor.batch.default.input] + traces = [otelcol.processor.batch.default.input] + } +} + +otelcol.processor.batch "default" { + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} + +otelcol.exporter.otlp "default" { + client { + endpoint = env("OTLP_ENDPOINT") + } +} +```