Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make concurrentFetchers change its concurrency dynamically #9437

36 changes: 28 additions & 8 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6650,22 +6650,42 @@
},
{
"kind": "field",
"name": "fetch_concurrency",
"name": "startup_fetch_concurrency",
"required": false,
"desc": "The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup.",
"desc": "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.",
"fieldValue": null,
"fieldDefaultValue": 1,
"fieldFlag": "ingest-storage.kafka.fetch-concurrency",
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.startup-fetch-concurrency",
"fieldType": "int"
},
{
"kind": "field",
"name": "records_per_fetch",
"name": "startup_records_per_fetch",
"required": false,
"desc": "The number of records to fetch from Kafka in a single request.",
"desc": "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 128,
"fieldFlag": "ingest-storage.kafka.records-per-fetch",
"fieldDefaultValue": 2500,
"fieldFlag": "ingest-storage.kafka.startup-records-per-fetch",
"fieldType": "int"
},
{
"kind": "field",
"name": "ongoing_fetch_concurrency",
"required": false,
"desc": "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.ongoing-fetch-concurrency",
"fieldType": "int"
},
{
"kind": "field",
"name": "ongoing_records_per_fetch",
"required": false,
"desc": "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 30,
"fieldFlag": "ingest-storage.kafka.ongoing-records-per-fetch",
"fieldType": "int"
},
{
Expand Down
12 changes: 8 additions & 4 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1363,8 +1363,6 @@ Usage of ./cmd/mimir/mimir:
How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left. (default 1s)
-ingest-storage.kafka.dial-timeout duration
The maximum time allowed to open a connection to a Kafka broker. (default 2s)
-ingest-storage.kafka.fetch-concurrency int
The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup. (default 1)
-ingest-storage.kafka.ingestion-concurrency int
The number of concurrent ingestion streams to the TSDB head. 0 to disable.
-ingest-storage.kafka.ingestion-concurrency-batch-size int
Expand All @@ -1375,12 +1373,18 @@ Usage of ./cmd/mimir/mimir:
How long to retry a failed request to get the last produced offset. (default 10s)
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.
-ingest-storage.kafka.ongoing-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30)
-ingest-storage.kafka.producer-max-buffered-bytes int
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.records-per-fetch int
The number of records to fetch from Kafka in a single request. (default 128)
-ingest-storage.kafka.startup-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.
-ingest-storage.kafka.startup-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0. (default 2500)
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
Expand Down
12 changes: 8 additions & 4 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,6 @@ Usage of ./cmd/mimir/mimir:
How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left. (default 1s)
-ingest-storage.kafka.dial-timeout duration
The maximum time allowed to open a connection to a Kafka broker. (default 2s)
-ingest-storage.kafka.fetch-concurrency int
The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup. (default 1)
-ingest-storage.kafka.ingestion-concurrency int
The number of concurrent ingestion streams to the TSDB head. 0 to disable.
-ingest-storage.kafka.ingestion-concurrency-batch-size int
Expand All @@ -445,12 +443,18 @@ Usage of ./cmd/mimir/mimir:
How long to retry a failed request to get the last produced offset. (default 10s)
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.
-ingest-storage.kafka.ongoing-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30)
-ingest-storage.kafka.producer-max-buffered-bytes int
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.records-per-fetch int
The number of records to fetch from Kafka in a single request. (default 128)
-ingest-storage.kafka.startup-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.
-ingest-storage.kafka.startup-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0. (default 2500)
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
Expand Down
31 changes: 23 additions & 8 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3860,14 +3860,29 @@ kafka:
# CLI flag: -ingest-storage.kafka.wait-strong-read-consistency-timeout
[wait_strong_read_consistency_timeout: <duration> | default = 20s]

# The number of concurrent fetch requests that the ingester sends to Kafka
# when catching up during startup.
# CLI flag: -ingest-storage.kafka.fetch-concurrency
[fetch_concurrency: <int> | default = 1]

# The number of records to fetch from Kafka in a single request.
# CLI flag: -ingest-storage.kafka.records-per-fetch
[records_per_fetch: <int> | default = 128]
# The number of concurrent fetch requests that the ingester makes when reading
# data from Kafka during startup. 0 to disable.
# CLI flag: -ingest-storage.kafka.startup-fetch-concurrency
[startup_fetch_concurrency: <int> | default = 0]

# The number of records per fetch request that the ingester makes when reading
# data from Kafka during startup. Depends on
# ingest-storage.kafka.startup-fetch-concurrency being greater than 0.
# CLI flag: -ingest-storage.kafka.startup-records-per-fetch
[startup_records_per_fetch: <int> | default = 2500]

# The number of concurrent fetch requests that the ingester makes when reading
# data continuously from Kafka after startup. Is disabled unless
# ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be
# greater than 0.
# CLI flag: -ingest-storage.kafka.ongoing-fetch-concurrency
[ongoing_fetch_concurrency: <int> | default = 0]

# The number of records per fetch request that the ingester makes when reading
# data continuously from Kafka after startup. Depends on
# ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0.
# CLI flag: -ingest-storage.kafka.ongoing-records-per-fetch
[ongoing_records_per_fetch: <int> | default = 30]

# When enabled, the fetch request MaxBytes field is computed using the
# compressed size of previous records. When disabled, MaxBytes is computed
Expand Down
27 changes: 23 additions & 4 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,11 @@ type KafkaConfig struct {
// Used when logging unsampled client errors. Set from ingester's ErrorSampleRate.
FallbackClientErrorSampleRate int64 `yaml:"-"`

FetchConcurrency int `yaml:"fetch_concurrency"`
RecordsPerFetch int `yaml:"records_per_fetch"`
StartupFetchConcurrency int `yaml:"startup_fetch_concurrency"`
StartupRecordsPerFetch int `yaml:"startup_records_per_fetch"`
OngoingFetchConcurrency int `yaml:"ongoing_fetch_concurrency"`
OngoingRecordsPerFetch int `yaml:"ongoing_records_per_fetch"`

UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"`
IngestionConcurrency int `yaml:"ingestion_concurrency"`
IngestionConcurrencyBatchSize int `yaml:"ingestion_concurrency_batch_size"`
Expand Down Expand Up @@ -132,8 +135,12 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.")

f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+".wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.")
f.IntVar(&cfg.FetchConcurrency, prefix+".fetch-concurrency", 1, "The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup.")
f.IntVar(&cfg.RecordsPerFetch, prefix+".records-per-fetch", 128, "The number of records to fetch from Kafka in a single request.")

f.IntVar(&cfg.StartupFetchConcurrency, prefix+".startup-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.")
f.IntVar(&cfg.StartupRecordsPerFetch, prefix+".startup-records-per-fetch", 2500, "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on "+prefix+".startup-fetch-concurrency being greater than 0.")
f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. It must be greater than 0.")
f.IntVar(&cfg.OngoingRecordsPerFetch, prefix+".ongoing-records-per-fetch", 30, "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on "+prefix+".ongoing-fetch-concurrency being greater than 0.")

f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.")
f.IntVar(&cfg.IngestionConcurrency, prefix+".ingestion-concurrency", 0, "The number of concurrent ingestion streams to the TSDB head. 0 to disable.")
f.IntVar(&cfg.IngestionConcurrencyBatchSize, prefix+".ingestion-concurrency-batch-size", 128, "The number of timeseries to batch together before ingesting into TSDB. This is only used when ingestion-concurrency is greater than 0.")
Expand Down Expand Up @@ -172,6 +179,18 @@ func (cfg *KafkaConfig) Validate() error {
return ErrInvalidMaxConsumerLagAtStartup
}

if cfg.StartupFetchConcurrency < 0 {
return fmt.Errorf("ingest-storage.kafka.startup-fetch-concurrency must be greater or equal to 0")
}

if cfg.StartupFetchConcurrency > 0 && cfg.OngoingFetchConcurrency <= 0 {
return fmt.Errorf("ingest-storage.kafka.ongoing-fetch-concurrency must be greater than 0 when startup-fetch-concurrency is greater than 0")
}

if cfg.StartupRecordsPerFetch <= 0 || cfg.OngoingRecordsPerFetch <= 0 {
return fmt.Errorf("ingest-storage.kafka.startup-records-per-fetch and ingest-storage.kafka.ongoing-records-per-fetch must be greater than 0")
}

return nil
}

Expand Down
Loading
Loading