Skip to content

Commit

Permalink
feat: Implement WAL segment ingestion via Kafka with partition ring (#…
Browse files Browse the repository at this point in the history
…14043)

Co-authored-by: George Robinson <george.robinson@grafana.com>
  • Loading branch information
cyriltovena and grobinson-grafana authored Sep 10, 2024
1 parent 55e374e commit d178f4c
Show file tree
Hide file tree
Showing 105 changed files with 23,877 additions and 629 deletions.
191 changes: 98 additions & 93 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1031,74 +1031,56 @@ metastore_client:
# Configures the gRPC client used to communicate with the metastore.
[grpc_client_config: <grpc_client>]

partition_ring:
# The key-value store used to share the hash ring across multiple instances.
# This option needs be set on ingesters, distributors, queriers, and rulers
# when running in microservices mode.
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
# inmemory, memberlist, multi.
# CLI flag: -ingester.partition-ring.store
[store: <string> | default = "memberlist"]

# The prefix for the keys in the store. Should end with a /.
# CLI flag: -ingester.partition-ring.prefix
[prefix: <string> | default = "collectors/"]

# Configuration for a Consul client. Only applies if the selected kvstore is
# consul.
# The CLI flags prefix for this block configuration is:
# ingester.partition-ring.consul
[consul: <consul>]

# Configuration for an ETCD v3 client. Only applies if the selected kvstore
# is etcd.
# The CLI flags prefix for this block configuration is:
# ingester.partition-ring.etcd
[etcd: <etcd>]

multi:
# Primary backend storage used by multi-client.
# CLI flag: -ingester.partition-ring.multi.primary
[primary: <string> | default = ""]

# Secondary backend storage used by multi-client.
# CLI flag: -ingester.partition-ring.multi.secondary
[secondary: <string> | default = ""]

# Mirror writes to secondary store.
# CLI flag: -ingester.partition-ring.multi.mirror-enabled
[mirror_enabled: <boolean> | default = false]

# Timeout for storing value to secondary store.
# CLI flag: -ingester.partition-ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]

# Minimum number of owners to wait before a PENDING partition gets switched to
# ACTIVE.
# CLI flag: -ingester.partition-ring.min-partition-owners-count
[min_partition_owners_count: <int> | default = 1]

# How long the minimum number of owners are enforced before a PENDING
# partition gets switched to ACTIVE.
# CLI flag: -ingester.partition-ring.min-partition-owners-duration
[min_partition_owners_duration: <duration> | default = 10s]

# How long to wait before an INACTIVE partition is eligible for deletion. The
# partition is deleted only if it has been in INACTIVE state for at least the
# configured duration and it has no owners registered. A value of 0 disables
# partitions deletion.
# CLI flag: -ingester.partition-ring.delete-inactive-partition-after
[delete_inactive_partition_after: <duration> | default = 13h]

kafka_config:
# the kafka endpoint to connect to
# CLI flag: -address
# The Kafka backend address.
# CLI flag: -kafka.address
[address: <string> | default = "localhost:9092"]

# The Kafka topic name.
# CLI flag: -.topic
[topic: <string> | default = "loki.push"]
# CLI flag: -kafka.topic
[topic: <string> | default = ""]

# The Kafka client ID.
# CLI flag: -kafka.client-id
[client_id: <string> | default = ""]

# The maximum time allowed to open a connection to a Kafka broker.
# CLI flag: -kafka.dial-timeout
[dial_timeout: <duration> | default = 2s]

# How long to wait for an incoming write request to be successfully committed
# to the Kafka backend.
# CLI flag: -kafka.write-timeout
[write_timeout: <duration> | default = 10s]

# The consumer group used by the consumer to track the last consumed offset.
# The consumer group must be different for each ingester. If the configured
# consumer group contains the '<partition>' placeholder, it is replaced with
# the actual partition ID owned by the ingester. When empty (recommended),
# Mimir uses the ingester instance ID to guarantee uniqueness.
# CLI flag: -kafka.consumer-group
[consumer_group: <string> | default = ""]

# How long to retry a failed request to get the last produced offset.
# CLI flag: -kafka.last-produced-offset-retry-timeout
[last_produced_offset_retry_timeout: <duration> | default = 10s]

# Enable auto-creation of Kafka topic if it doesn't exist.
# CLI flag: -kafka.auto-create-topic-enabled
[auto_create_topic_enabled: <boolean> | default = true]

# The maximum size of a Kafka record data that should be generated by the
# producer. An incoming write request larger than this size is split into
# multiple Kafka records. We strongly recommend to not change this setting
# unless for testing purposes.
# CLI flag: -kafka.producer-max-record-size-bytes
[producer_max_record_size_bytes: <int> | default = 15983616]

# The maximum size of (uncompressed) buffered and unacknowledged produced
# records sent to Kafka. The produce request fails once this limit is reached.
# This limit is per Kafka client. 0 to disable the limit.
# CLI flag: -kafka.producer-max-buffered-bytes
[producer_max_buffered_bytes: <int> | default = 1073741824]

kafka_ingester:
# Whether the kafka ingester is enabled.
Expand Down Expand Up @@ -1251,46 +1233,75 @@ kafka_ingester:
# CLI flag: -kafka-ingester.shutdown-marker-path
[shutdown_marker_path: <string> | default = ""]

# The interval at which the ingester will flush and commit offsets to Kafka.
# If not set, the default flush interval will be used.
# CLI flag: -kafka-ingester.flush-interval
[flush_interval: <duration> | default = 15s]

# The size at which the ingester will flush and commit offsets to Kafka. If
# not set, the default flush size will be used.
# CLI flag: -kafka-ingester.flush-size
[flush_size: <int> | default = 314572800]

partition_ring:
# The key-value store used to share the hash ring across multiple instances.
# This option needs be set on ingesters, distributors, queriers, and rulers
# when running in microservices mode.
kvstore:
[store: <string> | default = ""]
# Backend storage to use for the ring. Supported values are: consul, etcd,
# inmemory, memberlist, multi.
# CLI flag: -ingester.partition-ring.store
[store: <string> | default = "memberlist"]

[prefix: <string> | default = ""]
# The prefix for the keys in the store. Should end with a /.
# CLI flag: -ingester.partition-ring.prefix
[prefix: <string> | default = "collectors/"]

# Configuration for a Consul client. Only applies if the selected kvstore
# is consul.
# The CLI flags prefix for this block configuration is:
# common.storage.ring.consul
# ingester.partition-ring.consul
[consul: <consul>]

# Configuration for an ETCD v3 client. Only applies if the selected
# kvstore is etcd.
# The CLI flags prefix for this block configuration is:
# common.storage.ring.etcd
# ingester.partition-ring.etcd
[etcd: <etcd>]

multi:
# Primary backend storage used by multi-client.
# CLI flag: -ingester.partition-ring.multi.primary
[primary: <string> | default = ""]

# Secondary backend storage used by multi-client.
# CLI flag: -ingester.partition-ring.multi.secondary
[secondary: <string> | default = ""]

[mirror_enabled: <boolean>]

[mirror_timeout: <duration>]

[min_partition_owners_count: <int>]
# Mirror writes to secondary store.
# CLI flag: -ingester.partition-ring.multi.mirror-enabled
[mirror_enabled: <boolean> | default = false]

[min_partition_owners_duration: <duration>]
# Timeout for storing value to secondary store.
# CLI flag: -ingester.partition-ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]

[delete_inactive_partition_after: <duration>]
# Minimum number of owners to wait before a PENDING partition gets switched
# to ACTIVE.
# CLI flag: -ingester.partition-ring.min-partition-owners-count
[min_partition_owners_count: <int> | default = 1]

kafkaconfig:
[address: <string> | default = ""]
# How long the minimum number of owners are enforced before a PENDING
# partition gets switched to ACTIVE.
# CLI flag: -ingester.partition-ring.min-partition-owners-duration
[min_partition_owners_duration: <duration> | default = 10s]

[topic: <string> | default = ""]
# How long to wait before an INACTIVE partition is eligible for deletion.
# The partition is deleted only if it has been in INACTIVE state for at
# least the configured duration and it has no owners registered. A value of
# 0 disables partitions deletion.
# CLI flag: -ingester.partition-ring.delete-inactive-partition-after
[delete_inactive_partition_after: <duration> | default = 13h]

# Configuration for 'runtime config' module, responsible for reloading runtime
# configuration file.
Expand Down Expand Up @@ -2244,10 +2255,14 @@ ring:
# Configuration for a Consul client. Only applies if the selected kvstore is
# consul.
# The CLI flags prefix for this block configuration is:
# common.storage.ring.consul
[consul: <consul>]
# Configuration for an ETCD v3 client. Only applies if the selected kvstore
# is etcd.
# The CLI flags prefix for this block configuration is:
# common.storage.ring.etcd
[etcd: <etcd>]
multi:
Expand Down Expand Up @@ -3578,26 +3593,16 @@ The `ingester_client` block configures how the distributor will connect to inges
```yaml
# Configures how connections are pooled.
pool_config:
# How frequently to clean up clients for ingesters that have gone away.
# CLI flag: -distributor.client-cleanup-period
[client_cleanup_period: <duration> | default = 15s]
[client_cleanup_period: <duration>]
# Run a health check on each ingester client during periodic cleanup.
# CLI flag: -distributor.health-check-ingesters
[health_check_ingesters: <boolean> | default = true]
[health_check_ingesters: <boolean>]
# How quickly a dead client will be removed after it has been detected to
# disappear. Set this to a value to allow time for a secondary health check to
# recover the missing client.
# CLI flag: -ingester.client.healthcheck-timeout
[remote_timeout: <duration> | default = 1s]
[remote_timeout: <duration>]
# The remote request timeout on the client side.
# CLI flag: -ingester.client.timeout
[remote_timeout: <duration> | default = 5s]
[remote_timeout: <duration>]
# Configures how the gRPC connection to ingesters work as a client.
# The CLI flags prefix for this block configuration is: ingester.client
# The CLI flags prefix for this block configuration is: ingester-rf1.client
[grpc_client_config: <grpc_client>]
```

Expand Down
9 changes: 6 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ require (
github.com/shirou/gopsutil/v4 v4.24.0-alpha.1
github.com/thanos-io/objstore v0.0.0-20240818203309-0363dadfdfb1
github.com/twmb/franz-go v1.17.1
github.com/twmb/franz-go/pkg/kadm v1.13.0
github.com/twmb/franz-go/pkg/kfake v0.0.0-20240821035758-b77dd13e2bfa
github.com/twmb/franz-go/pkg/kmsg v1.8.0
github.com/twmb/franz-go/plugin/kotel v1.5.0
github.com/twmb/franz-go/plugin/kprom v1.1.0
github.com/willf/bloom v2.0.3+incompatible
go.opentelemetry.io/collector/pdata v1.12.0
Expand Down Expand Up @@ -178,7 +182,6 @@ require (
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.8.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect
Expand Down Expand Up @@ -347,9 +350,9 @@ require (
go.opentelemetry.io/collector/semconv v0.105.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/mod v0.19.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1816,8 +1816,14 @@ github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+l
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/twmb/franz-go v1.17.1 h1:0LwPsbbJeJ9R91DPUHSEd4su82WJWcTY1Zzbgbg4CeQ=
github.com/twmb/franz-go v1.17.1/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8XMQBEC+60=
github.com/twmb/franz-go/pkg/kadm v1.13.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20240821035758-b77dd13e2bfa h1:OmQ4DJhqeOPdIH60Psut1vYU8A6LGyxJbF09w5RAa2w=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20240821035758-b77dd13e2bfa/go.mod h1:nkBI/wGFp7t1NJnnCeJdS4sX5atPAqwCPpDXKuI7SC8=
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/twmb/franz-go/plugin/kotel v1.5.0 h1:TiPfGUbQK384OO7ZYGdo7JuPCbJn+/8njQ/D9Je9CDE=
github.com/twmb/franz-go/plugin/kotel v1.5.0/go.mod h1:wRXzRo76x1myOUMaVHAyraXoGBdEcvlLChGTVv5+DWU=
github.com/twmb/franz-go/plugin/kprom v1.1.0 h1:grGeIJbm4llUBF8jkDjTb/b8rKllWSXjMwIqeCCcNYQ=
github.com/twmb/franz-go/plugin/kprom v1.1.0/go.mod h1:cTDrPMSkyrO99LyGx3AtiwF9W6+THHjZrkDE2+TEBIU=
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
Expand Down
Loading

0 comments on commit d178f4c

Please sign in to comment.