Skip to content

Configurations

Adem Efe Gencer edited this page Jan 13, 2021 · 44 revisions

Contents

Configurations inherited from Kafka clients

The following configurations are inherited from the open source Kafka client configurations. They will be used by all the clients in Cruise Control to communicate with the Kafka cluster.

Name Type Required? Default Value Descriptions
bootstrap.servers String Y The bootstrap.servers of the Kafka cluster that Cruise Control should be managing. This configuration is also used by the SampleStore and CruiseControlMetricsReporterSampler if they are used.
metadata.max.age.ms Long N 300,000 The maximum time to cache the metadata of the Kafka cluster before it has to be refreshed. This configuration is used by all the clients communicating with the Kafka cluster.
client.id String N kafka-cruise-control The client id to be used when communicate to brokers for metadata refresh.
send.buffer.bytes Integer N 131072 The socket send buffer size. This configuration is used by all the clients communicating with the Kafka cluster.
receive.buffer.bytes Integer N 32768 The socket receive buffer size. This configuration is used by all the clients communicating with the Kafka cluster.
reconnect.backoff.ms Integer N 50 The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the client. This configuration is used by all the clients communicating with the Kafka cluster.
connections.max.idle.ms Integer N 540,000 Close idle connections after the number of milliseconds specified by this config. This configuration is used by all the clients communicating with the Kafka cluster.
request.timeout.ms Integer N 30,000 The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. This configuration is used by all the clients communicating with the Kafka cluster.
security.protocol String N PLAINTEXT Security protocol used to communicate with brokers.
ssl.protocol String N TLS The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.
ssl.provider String N The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
ssl.cipher.suites String N A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.
ssl.enabled.protocols String N TLSv1.2,TLSv1.1,TLSv1 The list of protocols enabled for SSL connections.
ssl.keystore.type String N JKS The file format of the key store file. This is optional for client.
ssl.keystore.location String N The location of the key store file. This is optional for client and can be used for two-way authentication for client.
ssl.keystore.password String N The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
ssl.key.password String N The password of the private key in the key store file. This is optional for client.
ssl.truststore.type String N JKS The location of the key store file. This is optional for client and can be used for two-way authentication for client.
ssl.keystore.password String N The store password for the key store file. ",+ "This is optional for client and only needed if ssl.keystore.location is configured.
ssl.key.password String N The password of the private key in the key store file. ",+ "This is optional for client.
ssl.truststore.type String N JKS The file format of the trust store file.
ssl.truststore.location String N The location of the trust store file.
ssl.truststore.password String N The password for the trust store file.
ssl.keymanager.algorithm String N SunX509 The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
ssl.trustmanager.algorithm String N SunX509 The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.
ssl.endpoint.identification.algorithm String N The endpoint identification algorithm to validate server hostname using server certificate.
ssl.secure.random.implementation String N The SecureRandom PRNG implementation to use for SSL cryptography operations.
zookeeper.security.enabled Boolean N false Specify if zookeeper is secured, true or false
network.client.provider.class Class N com.linkedin.kafka.cruisecontrol.common.KafkaNetworkClientProvider The network client provider class to generate a network client with given properties.

Cruise Control Configurations

Load Monitor Configurations

Name Type Required? Default Value Descriptions
num.metric.fetchers Integer N 1 The number of metric fetchers to fetch from the Kafka cluster.
metric.sampler.class Class N com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler The class name of the metric sampler
sampling.allow.cpu.capacity.estimation Boolean N true The flag to indicate whether sampling process allows CPU capacity estimation of brokers used for CPU utilization estimation.
metric.sampler.partition.assignor.class Class N com.linkedin.kafka.cruisecontrol.monitor.sampling.DefaultMetricSamplerPartitionAssignor The class used to assign the partitions to the metric samplers.
metric.sampling.interval.ms Integer N 60,000 The interval of metric sampling.
partition.metrics.window.ms Integer Y 3,600,000 The size of the window in milliseconds to aggregate the Kafka partition metrics. The window must be greater than the metric.sampling.interval.ms.
num.partition.metrics.windows Integer Y 5 The maximum number of partition window the load monitor would keep. Each window covers a time window defined by partition.metrics.window.ms.
skip.loading.samples Boolean N false Specify if sample loading will be skipped upon startup.
min.samples.per.partition.metrics.window Integer N 3 The minimum number of metric samples a valid partition window should have. If a partition does not have enough samples in a partition window, the topic of the partition will be removed from the window due to in sufficient data.
broker.metrics.window.ms Integer Y 3,600,000 The size of the window in milliseconds to aggregate the Kafka broker metrics. The window must be greater than the metric.sampling.interval.ms.
num.broker.metrics.windows Integer Y 5 The maximum number of broker window the load monitor would keep. Each window covers a time window defined by broker.metrics.window.ms.
min.samples.per.broker.metrics.window Integer N 3 The minimum number of metric samples a valid broker window should have. If a broker does not have enough samples in a broker window, this broker will be removed from the window due to in sufficient data.
broker.capacity.config.resolver.class Class N com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigFileResolver The broker capacity configuration resolver class name. The broker capacity configuration resolver is responsible for getting the broker capacity. The default implementation is a file based solution.
monitor.state.update.interval.ms Long N 30,000 The load monitor interval to refresh the monitor state.
min.valid.partition.ratio Double N 0.995 The minimum percentage of the total partitions required to be monitored in order to generate a valid load model. Because the topic and partitions in a Kafka cluster are dynamically changing. The load monitor will exclude some of the topics that does not have sufficient metric samples. This configuration defines the minimum required percentage of the partitions that must be included in the load model.
leader.network.inbound.weight.for.cpu.util Double N 0.6 Kafka Cruise Control uses the following model to derive replica level CPU utilization: REPLICA_CPU_UTIL = a * LEADER_BYTES_IN_RATE + b * LEADER_BYTES_OUT_RATE + c * FOLLOWER_BYTES_IN_RATE. This configuration will be used as the weight for LEADER_BYTES_IN_RATE.
leader.network.outbound.weight.for.cpu.util Double N 0.1 Kafka Cruise Control uses the following model to derive replica level CPU utilization: REPLICA_CPU_UTIL = a * LEADER_BYTES_IN_RATE + b * LEADER_BYTES_OUT_RATE + c * FOLLOWER_BYTES_IN_RATE. This configuration will be used as the weight for LEADER_BYTES_OUT_RATE.
follower.network.inbound.weight.for.cpu.util Double N 0.3 Kafka Cruise Control uses the following model to derive replica level CPU utilization: REPLICA_CPU_UTIL = a * LEADER_BYTES_IN_RATE + b * LEADER_BYTES_OUT_RATE + c * FOLLOWER_BYTES_IN_RATE. This configuration will be used as the weight for FOLLOWER_BYTES_IN_RATE.
sample.store.class Class N com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore The sample store class name. User may configure a sample store that persist the metric samples that have already been aggregated into Kafka Cruise Control. Later on the persisted samples can be reloaded from the sample store to Kafka Cruise Control.
max.allowed.extrapolations.per.partition Integer N 5 The maximum allowed number of extrapolations for each partition. A partition will be considered as invalid if the total number extrapolations in all the windows goes above this number.
max.allowed.extrapolations.per.broker Integer N 5 The maximum allowed number of extrapolations for each broker. A broker will be considered as invalid if the total number extrapolations in all the windows goes above this number.
partition.metric.sample.aggregator.completeness.cache.size Integer N 5 The metric sample aggregator caches the completeness metadata for fast query. The completeness describes the confidence level of the data in the metric sample aggregator. It is primarily measured by the validity of the metrics samples in different windows. This configuration configures The number of completeness cache slots to maintain.
broker.metric.sample.aggregator.completeness.cache.size Integer N 5 The metric sample aggregator caches the completeness metadata for fast query. The completeness describes the confidence level of the data in the metric sample aggregator. It is primarily measured by the validity of the metrics samples in different windows. This configuration configures The number of completeness cache slots to maintain.

Analyzer Configurations

Name Type Required? Default Value Descriptions
default.goals List Y com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal The list of inter-broker goals that will be used by default if no goal list is provided. This list of goals will also be used for proposal pre-computation.
goals List Y com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerEvenRackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal A set of case insensitive inter-broker goals. Inter-broker goals facilitate in distributing the load across brokers.
hard.goals List N com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal A list of case insensitive, inter-broker hard goals. Hard goals will be enforced to execute if Cruise Control runs in non-kafka-assigner mode and skip_hard_goal_check parameter is not set in request.
cpu.balance.threshold Double N 1.1 The maximum allowed extent of unbalance for CPU utilization. For example, 1.10 means the highest CPU usage of a broker should not be above 1.10x of average CPU utilization of all the brokers.
disk.balance.threshold Double N 1.1 The maximum allowed extent of unbalance for disk utilization. For example, 1.10 means the highest disk usage of a broker should not be above 1.10x of average disk utilization of all the brokers.
network.inbound.balance.threshold Double N 1.1 The maximum allowed extent of unbalance for network inbound usage. For example, 1.10 means the highest network inbound usage of a broker should not be above 1.10x of average network inbound usage of all the brokers.
network.outbound.balance.threshold Double N 1.1 The maximum allowed extent of unbalance for network outbound usage. For example, 1.10 means the highest network inbound usage of a broker should not be above 1.10x of average network inbound usage of all the brokers.
replica.count.balance.threshold Double N 1.1 The maximum allowed extent of unbalance for replica distribution. For example, 1.10 means the highest replica count of a broker should not be above 1.10x of average replica count of all brokers.
disk.capacity.threshold Double N 0.8 The maximum percentage of the total broker.disk.capacity that is allowed to be used on a broker. The analyzer will enforce a hard goal that the disk usage of a broker cannot be higher than (broker.disk.capacity * disk.capacity.threshold).
cpu.capacity.threshold Double N 0.7 The maximum percentage of the total broker.cpu.capacity that is allowed to be used on a broker. The analyzer will enforce a hard goal that the CPU utilization of a broker cannot be higher than (broker.cpu.capacity * cpu.capacity.threshold).
network.inbound.capacity.threshold Double N 0.8 The maximum percentage of the total broker.network.inbound.capacity that is allowed to be used on a broker. The analyzer will enforce a hard goal that the disk usage of a broker cannot be higher than (broker.network.inbound.capacity * network.inbound.capacity.threshold).
network.outbound.capacity.threshold Double N 0.8 The maximum percentage of the total broker.network.outbound.capacity that is allowed to be used on a broker. The analyzer will enforce a hard goal that the disk usage of a broker cannot be higher than (broker.network.outbound.capacity * network.outbound.capacity.threshold).
cpu.low.utilization.threshold Double N 0.0 The threshold to define the utilization of CPU is low enough that rebalance is not worthwhile. The cluster will only be in a low utilization state when all the brokers are below the low utilization threshold. Such a cluster is overprovisioned in terms of its CPU utilization. The threshold is in percentage.
disk.low.utilization.threshold Double N 0.0 The threshold to define the utilization of disk is low enough that rebalance is not worthwhile. The cluster will only be in a low utilization state when all the brokers are below the low utilization threshold. Such a cluster is overprovisioned in terms of its disk utilization. The threshold is in percentage.
network.inbound.low.utilization.threshold Double N 0.0 The threshold to define the utilization of network inbound rate is low enough that rebalance is not worthwhile. The cluster will only be in a low utilization state when all the brokers are below the low utilization threshold. Such a cluster is overprovisioned in terms of its network inbound rate. The threshold is in percentage.
network.outbound.low.utilization.threshold Double N 0.0 The threshold to define the utilization of network outbound rate is low enough that rebalance is not worthwhile. The cluster will only be in a low utilization state when all the brokers are below the low utilization threshold. Such a cluster is overprovisioned in terms of its network outbound rate. The threshold is in percentage.
max.proposal.candidates Integer N 10 Kafka cruise control precomputes the optimization proposal candidates continuously in the background. This config sets the maximum number of candidate proposals to precompute for each cluster workload model. The more proposal candidates are generated, the more likely a better optimization proposal will be found, but more CPU will be used as well.
proposal.expiration.ms Integer N 900,000 Kafka cruise control will cache one of the best proposal among all the optimization proposal candidates it recently computed. This configuration defines when will the cached proposal be invalidated and needs a recomputation. If proposal.expiration.ms is set to 0, cruise control will continuously compute the proposal candidates.
max.replicas.per.broker Integer N 10,000 The maximum number of replicas allowed to reside on a broker. The analyzer will enforce a hard goal that the number of replica on a broker cannot be higher than this config.
num.proposal.precompute.threads Integer N 1 The number of thread used to precompute the optimization proposal candidates. The more threads are used, the more memory and CPU resource will be used.
leader.replica.count.balance.threshold Double N 1.1 The maximum allowed extent of unbalance for leader replica distribution. For example, 1.10 means the highest leader replica count of a broker should not be above 1.10x of average leader replica count of all alive brokers.
topic.replica.count.balance.threshold Double N 3.0 The maximum allowed extent of unbalance for replica distribution from each topic. For example, 1.80 means the highest topic replica count of a broker should not be above 1.80x of average replica count of all brokers for the same topic.
topic.replica.count.balance.min.gap Integer N 2 The minimum allowed gap between a balance limit and the average replica count for each topic. A balance limit is set via topic.replica.count.balance.threshold config. If the difference between the computed limit and the average replica count for the relevant topic is smaller than the value specified by this config, the limit is adjusted accordingly.
topic.replica.count.balance.max.gap Integer N 40 The maximum allowed gap between a balance limit and the average replica count for each topic. A balance limit is set via topic.replica.count.balance.threshold config. If the difference between the computed limit and the average replica count for the relevant topic is greater than the value specified by this config, the limit is adjusted accordingly.
default.replica.movement.strategies List N [com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] The list of replica movement strategies that will be used by default if no replica movement strategy list is provided.
goal.balancedness.priority.weight Double N 1.1 The impact of having one level higher goal priority on the relative balancedness score. For example, 1.1 means that a goal with higher priority will have the 1.1x balancedness weight of the lower priority goal (assuming the same goal.balancedness.strictness.weight values for both goals).
goal.balancedness.strictness.weight Double N 1.5 The impact of strictness (i.e. hard or soft goal) on the relative balancedness score. For example, 1.5 means that a hard goal will have the 1.5x balancedness weight of a soft goal (assuming goal.balancedness.priority.weight is 1).
goal.violation.distribution.threshold.multiplier Double N 1.0 The multiplier applied to the threshold of distribution goals used for detecting and fixing anomalies. For example, 2.50 means the threshold for each distribution goal (i.e. Replica Distribution, Leader Replica Distribution, Resource Distribution, and Topic Replica Distribution Goals) will be 2.50x of the value used in manual goal optimization requests (e.g. rebalance).
overprovisioned.min.extra.racks Integer N 2 The minimum number of extra racks to consider a cluster as overprovisioned such that the cluster has at least the configured number of extra alive racks in addition to the number of racks needed to place replica of each partition to a separate rack -- e.g. a cluster with 7 racks is overprovisioned in terms of its number of racks if the maximum replication factor is 4 and this config is 3.
overprovisioned.min.brokers Integer N 3 The minimum number of alive brokers for the cluster to be eligible in overprovisioned consideration -- i.e. the cluster has at least the configured number of alive brokers
overprovisioned.max.replicas.per.broker Long N 800 The maximum number of replicas that should reside on each broker to consider a cluster as overprovisioned after balancing its replica distribution.

Executor Configurations

Name Type Required? Default Value Descriptions
zookeeper.connect String Y The zookeeper path used by the Kafka cluster (see https://kafka.apache.org/documentation/#zookeeper.connect).
num.concurrent.partition.movements.per.broker Integer N 5 The maximum number of partitions the executor will move to or out of a broker at the same time. e.g. setting the value to 10 means that the executor will at most allow 10 partitions move out of a broker and 10 partitions move into a broker at any given point. This is to avoid overwhelming the cluster by partition movements.
max.num.cluster.movements Integer N 1250 The maximum number of allowed movements (e.g. partition, leadership) in cluster. This global limit cannot be exceeded regardless of the per-broker replica movement concurrency. When determining this limit, ensure that the (number-of-allowed-movements * maximum-size-of-each-request) is smaller than the default zNode size limit.
num.concurrent.intra.broker.partition.movements Integer N 2 The maximum number of partitions the executor will move across disks within a broker at the same time. e.g. setting the value to 10 means that the executor will at most allow 10 partitions to move across disks within a broker at any given point. This is to avoid overwhelming the cluster by intra-broker partition movements.
num.concurrent.leader.movements Integer N 1000 The maximum number of leader movements the executor will take as one batch. This is mainly because the ZNode has a 1 MB size upper limit. And it will also reduce the controller burden.
execution.progress.check.interval.ms Integer N 10,000 The interval in milliseconds that the " +,"executor will check on the execution progress.
metric.anomaly.analyzer.metrics String N "" The metric ids that the metric anomaly detector should detect if they are violated.
topics.excluded.from.partition.movement String N "" The topics that should be excluded from the partition movement. It is a regex. Notice that this regex will be ignored when decommission a broker is invoked.
default.replication.throttle Long N null The replication throttle applied to replicas being moved, in bytes per second.
replica.movement.strategies List N [com.linkedin.kafka.cruisecontrol.executor.strategy.PostponeUrpReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeLargeReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeSmallReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] A list of supported strategies used to determine execution order for generated partition movement tasks.
default.replica.movement.strategies List N [com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] The list of replica movement strategies that will be used by default if no replica movement strategy list is provided.
executor.notifier.class Class N class com.linkedin.kafka.cruisecontrol.executor.ExecutorNoopNotifier The executor notifier class to trigger an alert when an execution finishes or is stopped (by a user or by Cruise Control).
demotion.history.retention.time.ms Long N 1209600000 The maximum time in milliseconds to retain the demotion history of brokers.
removal.history.retention.time.ms Long N 1209600000 The maximum time in milliseconds to retain the removal history of brokers.
logdir.response.timeout.ms Long N 10000 Timeout in ms for broker logdir to respond
leader.movement.timeout.ms Long N 180000 The maximum time to wait for a leader movement to finish. A leader movement will be marked as failed if it takes longer than this time to finish.
task.execution.alerting.threshold.ms Long N 90000 Threshold of execution time to alert a replica/leader movement task. If the task's execution time exceeds this threshold and the data movement rate is lower than the threshold set for inter-broker/intra-broker replica, alert will be sent out by notifier set via executor.notifier.class.
inter.broker.replica.movement.rate.alerting.threshold Double N 0.1 Threshold of data movement rate(in MB/s) for inter-broker replica movement task. If the task's data movement rate is lower than this and the task's execution time exceeds the threshold set via task.execution.alerting.threshold.ms, alert will be sent out by notifier set via executor.notifier.class.
intra.broker.replica.movement.rate.alerting.threshold Double N 0.2 Threshold of data movement rate(in MB/s) for intra-broker replica movement task. If the task's data movement rate is lower than this and the task's execution time exceeds the threshold set via task.execution.alerting.threshold.ms, alert will be sent out by notifier set via executor.notifier.class.
concurrency.adjuster.interval.ms Long N 360000 The interval of concurrency auto adjustment.
concurrency.adjuster.max.partition.movements.per.broker Integer N 12 The maximum number of partitions the concurrency auto adjustment will allow the executor to move in or out of a broker at the same time. It enforces a cap on the maximum concurrent inter-broker partition movements to avoid overwhelming the cluster. It must be greater than num.concurrent.partition.movements.per.broker and not more than max.num.cluster.movements.
concurrency.adjuster.max.leadership.movements Integer N 1100 The maximum number of leadership movements the concurrency auto adjustment will allow the executor to perform in one batch to avoid overwhelming the cluster. It cannot be (1) smaller than num.concurrent.leader.movements and (2) greater than max.num.cluster.movements.
concurrency.adjuster.enabled Boolean N false The flag to indicate whether the concurrency of all supported movements will be auto-adjusted based on dynamically changing broker metrics. It enables concurrency adjuster for all supported concurrency types, regardless of whether the particular concurrency type is disabled.
concurrency.adjuster.inter.broker.replica.enabled Boolean N false Enable concurrency adjuster for inter-broker replica reassignments.
concurrency.adjuster.leadership.enabled Boolean N false Enable concurrency adjuster for leadership reassignments.
concurrency.adjuster.limit.log.flush.time.ms Double N 2000.0 The limit on the 99.9th percentile broker metric value of log flush time. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent movements.
concurrency.adjuster.limit.follower.fetch.local.time.ms Double N 500.0 The limit on the 99.9th percentile broker metric value of follower fetch local time. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent movements.
concurrency.adjuster.limit.produce.local.time.ms Double N 1000.0 The limit on the 99.9th percentile broker metric value of produce local time. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent movements.
concurrency.adjuster.limit.consumer.fetch.local.time.ms Double N 500.0 The limit on the 99.9th percentile broker metric value of consumer fetch local time. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent movements.
concurrency.adjuster.limit.request.queue.size Double N 1000.0 The limit on the broker metric value of request queue size. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent movements.
concurrency.adjuster.additive.increase.inter.broker.replica Integer N 1 The fixed number by which the concurrency cap on inter-broker replica movements will be increased by the concurrency adjuster (if enabled) when all considered metrics are within the concurrency adjuster limit.
concurrency.adjuster.additive.increase.leadership Integer N 100 The fixed number by which the concurrency cap on leadership movements will be increased by the concurrency adjuster (if enabled) when all considered metrics are within the concurrency adjuster limit.
concurrency.adjuster.multiplicative.decrease.inter.broker.replica Integer N 2 The fixed number by which the concurrency cap on inter-broker replica movements will be divided by the concurrency adjuster (if enabled) when any considered metric exceeds the concurrency adjuster limit.
concurrency.adjuster.multiplicative.decrease.leadership Integer N 2 The fixed number by which the concurrency cap on leadership movements will be divided by the concurrency adjuster (if enabled) when any considered metric exceeds the concurrency adjuster limit.
min.execution.progress.check.interval.ms Double N 5000 The minimum execution progress check interval that users can dynamically set the execution progress check interval to.
slow.task.alerting.backoff.ms Double N 60000 The minimum interval between slow task alerts. This backoff helps bundling slow tasks to report rather than individually reporting them upon detection.

AnomalyDetector Configurations

Name Type Required? Default Value Description
metric.anomaly.finder.class List N com.linkedin.kafka.cruisecontrol.detector.NoopMetricAnomalyFinder A list of metric anomaly finder classes to find the current state to identify metric anomalies.
num.cached.recent.anomaly.states Integer N 10 The number of recent anomaly states cached for different anomaly types presented via the anomaly substate response of the state endpoint.
broker.failures.class Class N com.linkedin.kafka.cruisecontrol.detector.BrokerFailures The name of class that extends broker failures.
goal.violations.class Class N com.linkedin.kafka.cruisecontrol.detector.GoalViolations The name of class that extends goal violations.
disk.failures.class Class N com.linkedin.kafka.cruisecontrol.detector.DiskFailures The name of class that extends disk failures anomaly.
metric.anomaly.class Class N com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomaly The name of class that extends metric anomaly.
self.healing.goals List N [] The list of goals to be used for self-healing relevant anomalies. If empty, uses the default.goals for self healing.
anomaly.notifier.class Class N com.linkedin.kafka.cruisecontrol.detector.notifier.NoopNotifier The notifier class to trigger an alert when an anomaly is violated.
anomaly.detection.goals List N com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal The goals that anomaly detector should detect if they are violated.
self.healing.exclude.recently.demoted.brokers Boolean N false True if recently demoted brokers are excluded from optimizations during self healing, false otherwise.
self.healing.exclude.recently.removed.brokers Boolean N true True if recently removed brokers are excluded from optimizations during self healing, false otherwise.
failed.brokers.zk.path String N /CruiseControlBrokerList The zk path to store the failed broker list. This is to persist the broker failure time in case Cruise Control failed and restarted when some brokers are down.
fixable.failed.broker.count.threshold Short N 10 The upper boundary of concurrently failed broker count that are taken as fixable. If too many brokers are failing at the same time, it is often due to something more fundamental going wrong and removing replicas off failed brokers cannot alleviate the situation.
fixable.failed.broker.percentage.threshold Double N 0.4 The upper boundary of concurrently failed broker percentage that are taken as fixable. If large portion of brokers are failing at the same time, it is often due to something more fundamental going wrong and removing replicas off failed brokers cannot alleviate the situation.
anomaly.detection.interval.ms Long N 300000 The interval in millisecond that the detectors will run to detect the anomalies.
goal.violation.detection.interval.ms Long N value of anomaly.detection.interval.ms The interval in millisecond that goal violation detector will run to detect goal violations. If this interval time is not specified, goal violation detector will run with interval specified in anomaly.detection.interval.ms.
metric.anomaly.detection.interval.ms Long N value of anomaly.detection.interval.ms The interval in millisecond that metric anomaly detector will run to detect metric anomalies. If this interval time is not specified, metric anomaly detector will run with interval specified in anomaly.detection.interval.ms.
disk.failure.detection.interval.ms Long N value of anomaly.detection.interval.ms The interval in millisecond that disk failure detector will run to detect disk failures. If this interval time is not specified, disk failure detector will run with interval specified in anomaly.detection.interval.ms.
broker.failure.detection.backoff.ms Long N 300000 The backoff time in millisecond before broker failure detector triggers another broker failure detection if currently detected broker failure is not ready to fix.
anomaly.detection.allow.capacity.estimation Boolean N true The flag to indicate whether anomaly detection threads allow capacity estimation in the generated cluster model they use.
topic.anomaly.detection.interval.ms Long N value of anomaly.detection.interval.ms The interval in millisecond that topic anomaly detector will run to detect topic anomalies. If this interval time is not specified, topic anomaly detector will run with interval specified in anomaly.detection.interval.ms.
topic.anomaly.finder.class List N com.linkedin.kafka.cruisecontrol.detector.NoopTopicAnomalyFinder A list of topic anomaly finder classes to find the current state to identify topic anomalies.
maintenance.event.reader.class Class N com.linkedin.kafka.cruisecontrol.detector.NoopMaintenanceEventReader A maintenance event reader class to retrieve maintenance events from the user-defined store.
maintenance.event.class Class N com.linkedin.kafka.cruisecontrol.detector.MaintenanceEvent The name of class that extends maintenance event.
maintenance.event.enable.idempotence Boolean N true The flag to indicate whether maintenance event detector will drop the duplicate maintenance events detected within the configured retention period.
maintenance.event.idempotence.retention.ms Long N 180000 The maximum time in ms to store events retrieved from the MaintenanceEventReader. Relevant only if idempotency is enabled (see maintenance.event.enable.idempotence).
maintenance.event.max.idempotence.cache.size Integer N 25 The maximum number of maintenance events cached by the MaintenanceEventDetector within the past maintenance.event.idempotence.retention.ms ms. Relevant only if idempotency is enabled (see maintenance.event.enable.idempotence).
maintenance.event.stop.ongoing.execution Boolean N true The flag to indicate whether a maintenance event will gracefully stop the ongoing execution (if any) and wait until the execution stops before starting a fix for the anomaly.

UserTaskManager Configurations

Name Type Required? Default Value Description
completed.user.task.retention.time.ms Integer Y The maximum time in milliseconds to store the response and access details of a completed user task.
max.cached.completed.user.tasks Integer N 100 The maximum number of completed user tasks for which the response and access details will be cached.
max.active.user.tasks Integer N 5 The maximum number of user tasks for concurrently running in async endpoints across all users.
completed.kafka.monitor.user.task.retention.time.ms Long N null The maximum time in milliseconds to store the response and access details of a completed kafka monitoring user task. If this config is missing, the value set in config completed.user.task.retention.time.ms will be used.
completed.cruise.control.monitor.user.task.retention.time.ms Long N null The maximum time in milliseconds to store the response and access details of a completed cruise control monitoring user task. If this config is missing, the value set in config completed.user.task.retention.time.ms will be used.
completed.kafka.admin.user.task.retention.time.ms Long N null The maximum time in milliseconds to store the response and access details of a completed kafka administration user task. If this config is missing, the value set in config completed.user.task.retention.time.ms will be used.
completed.cruise.control.admin.user.task.retention.time.ms Long N null The maximum time in milliseconds to store the response and access details of a completed cruise control administration user task. If this config is missing, the value set in config completed.user.task.retention.time.ms will be used.
max.cached.completed.kafka.monitor.user.tasks Integer N null The maximum number of completed kafka monitoring user tasks for which the response and access details will be cached. If this config is missing, the value set in config max.cached.completed.user.tasks will be used.
max.cached.completed.cruise.control.monitor.user.tasks Integer N null The maximum number of completed cruise control monitoring user tasks for which the response and access details will be cached. If this config is missing, the value set in config max.cached.completed.user.tasks will be used.
max.cached.completed.kafka.admin.user.tasks Integer N null The maximum number of completed kafka administration user tasks for which the response and access details will be cached. If this config is missing, the value set in config max.cached.completed.user.tasks will be used.
max.cached.completed.cruise.control.admin.user.tasks Integer N null The maximum number of completed cruise control administration user tasks for which the response and access details will be cached. If this config is missing, the value set in config max.cached.completed.user.tasks will be used.

Servlet Configurations

Name Type Required? Default Value Description
topic.config.provider.class Class N com.linkedin.kafka.cruisecontrol.config.KafkaTopicConfigProvider The provider class that reports the active configuration of topics.
webserver.http.port Int N 9090 Cruise Control Webserver bind port.
webserver.http.address String N 127.0.0.1 Cruise Control Webserver bind ip address.
webserver.http.cors.enabled Boolean N false CORS enablement flag. true if enabled, false otherwise
webserver.http.cors.origin String N * Value for the Access-Control-Allow-Origin header.
webserver.http.cors.allowmethods String N OPTIONS, GET, POST Value for the Access-Control-Request-Method header.
webserver.http.cors.exposeheaders String N User-Task-ID Value for the Access-Control-Expose-Headers header.
webserver.api.urlprefix String N /kafkacruisecontrol/* REST API default url prefix
webserver.ui.diskpath String N ./cruise-control-ui/dist/ Location where the Cruise Control frontend is deployed
webserver.ui.urlprefix String N /* URL Path where UI is served from
webserver.request.maxBlockTimeMs Long N 10000 Time after which request is converted to Async
webserver.session.maxExpiryTimeMs Long N 60000 Default Session Expiry Period
webserver.session.path String N / Default Session Path (for cookies)
webserver.accesslog.enabled Boolean N true true if access log is enabled
webserver.accesslog.path String N access.log HTTP Request log path
webserver.accesslog.retention.days Integer N 7 HTTP Request log retention days
two.step.verification.enabled Boolean N false Enable two-step verification for processing POST requests.
two.step.purgatory.retention.time.ms Long N 1209600000 The maximum time in milliseconds to retain the requests in two-step (verification) purgatory.
two.step.purgatory.max.requests Integer N 25 The maximum number of requests in two-step (verification) purgatory.
request.reason.required Boolean N false Require specifying reason via for non-dryrun rebalance/add_broker/remove_broker/demote_broker/fix_offline_replicas/topic_configuration request.

Configurations under development and testing

We are still trying to improve cruise control. And following are some configurations that are for development and experiment.

Name Type Required? Default Value Description
use.linear.regression.model Boolean N false Whether to use the linear regression model to predict the broker CPU utilization.
linear.regression.model.cpu.util.bucket.size Integer N 5 The CPU utilization bucket size for linear regression model training data. The unit is percents.
linear.regression.model.required.samples.per.bucket Integer N 100 The number of training samples required in each CPU utilization bucket specified by linear.regression.model.cpu.util.bucket
linear.regression.model.min.num.cpu.util.buckets Integer N 5 The minimum number of full CPU utilization buckets required to generate a linear regression model.

Configurations of pluggable classes

CruiseControlMetricsReporterSampler configurations

Name Type Required? Default Value Description
metric.reporter.sampler.bootstrap.servers String N The same as bootstrap.servers config from Cruise Control The Kafka cluster to consume the interested metrics collected by CruiseControlMetricsReporter.
metric.reporter.topic String N "__CruiseControlMetrics" The exact topic name from which the sampler should be consuming the interested metrics from.
metric.reporter.sampler.group.id String N 60,000 The consumer group id to use for the consumers to consume from the Kafka cluster.

PrometheusMetricSampler configurations

Name Type Required? Default Value Description
prometheus.server.endpoint String Y The HTTP endpoint of the Prometheus server which is to be used as a source for sampling metrics.
prometheus.query.resolution.step.ms Integer N 60,000 The resolution of the Prometheus query made to the server. If this is set to 30 seconds for a 2 minutes query interval, the query returns with 4 values, which are then aggregated into the metric sample.
prometheus.query.supplier Class N com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.DefaultPrometheusQuerySupplier The class that supplies the Prometheus queries corresponding to Kafka raw metrics. If there are no customizations done when configuring Prometheus node exporter, the default class should work fine.

KafkaSampleStore configurations

Name Type Required? Default Value Description
partition.metric.sample.store.topic String Y The topic in which Cruise Control will store its processed metric samples as a backup. When Cruise Control is rebooted, it will load the metrics from this topic to populate the load monitor.
broker.metric.sample.store.topic String Y The topic in which Cruise Control will store its broker metric samples as a backup. When Cruise Control is rebooted, it will load the broker metric samples from this topic to train its cluster model.
num.sample.loading.threads Integer N 8 The number of threads to load from the sample store topics
sample.store.topic.replication.factor Integer N 2 The config for the replication factor of Kafka sample store topics
partition.sample.store.topic.partition.count Integer N 32 The config for the number of partition for Kafka partition sample store topic
broker.sample.store.topic.partition.count Integer N 32 The config for the number of partition for Kafka broker sample store topic
min.partition.sample.store.topic.retention.time.ms Integer N 3600000 The config for the minimal retention time for Kafka partition sample store topic
min.broker.sample.store.topic.retention.time.ms Integer N 3600000 The config for the minimal retention time for Kafka broker sample store topic
skip.sample.store.topic.rack.awareness.check Boolean N false The config to skip rack awareness sanity check for sample store topics

MaintenanceEventTopicReader configurations

Name Type Required? Default Value Description
maintenance.event.topic String N __MaintenanceEvent The name of the Kafka topic to consume maintenance events from
maintenance.event.topic.replication.factor Short N min(2, #alive-brokers) The replication factor of the maintenance event topic
maintenance.event.topic.partition.count Integer N 8 The partition count of the maintenance event topic
maintenance.event.topic.retention.ms Long N 21600000 The retention of the maintenance event topic

BrokerCapacityConfigurationFileResolver configurations

Name Type Required? Default Value Description
capacity.config.file String Y config/capacityJBOD.json The path to the configuration JSON file that provides the capacity of the brokers.

Populating the Capacity Config File

Overview: Broker capacity resolution is handled by a pluggable component. The default capacity resolver implementation requires users to manually populate a capacity file with broker capacities. If users have access to an external resolver for broker capacities, they can also customize their resolver to use this source for the capacity information. The rest of this section describes how to use one of these options.

Option-1 (default): The following steps allow users to set the initial broker capacities or update capacities upon addition of new brokers out-of-the-box:

  1. Check cruisecontrol.properties file to verify that the capacity.config.file config points to the file you are modifying. By default, this config is set to use config/capacityJBOD.json. If you have
    • a non-JBOD deployment, where each broker has the same number of cores, you may use config/capacity.json, or
    • a non-JBOD deployment, where brokers may have varying number of cores, you may use config/capacityCores.json and modify this file.
  2. Ensure that the capacity file picked in step-1 contains either a default capacity for missing broker ids, or explicitly specifies capacities for brokers with specific ids provided by user.
  3. Finally, bounce your Cruise Control instance.

Option-2 (implement your own resolver): BrokerCapacityConfigResolver is a pluggable component. Hence, you can write your own pluggable capacity resolver to dynamically resolve the broker capacities if you have a source to provide the capacity information. Using this approach, you may avoid restarting your CC instance upon a (1) capacity change or (2) addition of a broker with a non-default capacity. See:

  1. The relevant interface,
  2. Its default implementation by BrokerCapacityConfigFileResolver, and
  3. The relevant configuration to set the config resolver to be used.

SelfHealingNotifier configurations

Name Type Required? Default Value Description
broker.failure.alert.threshold.ms Long N 900,000 Defines the threshold to mark a broker as dead. If a non-empty broker leaves the cluster at time T and did not join the cluster before T + broker.failure.alert.threshold.ms, the broker is defined as dead broker since T. An alert will be triggered in this case.
broker.failure.self.healing.threshold.ms Long N 1,800,000 If self-healing is enabled and a broker is dead at T,,self-healing will be triggered at T + broker.failure.self.healing.threshold.ms.
self.healing.enabled Boolean N false Whether to enable Enable self healing for all anomaly detectors, unless the particular anomaly detector is explicitly disabled.
self.healing.broker.failure.enabled Boolean N <self.healing.enabled> Whether enable self-healing for detected broker failure or not. If disabled, the SelfHealingNotifier will only log the anomaly, but not take a fix action.
self.healing.goal.violation.enabled Boolean N <self.healing.enabled> Whether enable self-healing for detected goal violation or not. If disabled, the SelfHealingNotifier will only log the anomaly, but not take a fix action.
self.healing.metric.anomaly.enabled Boolean N <self.healing.enabled> Whether enable self-healing for detected metric anomaly or not. If disabled, the SelfHealingNotifier will only log the anomaly, but not take a fix action.
self.healing.disk.failure.enabled Boolean N <self.healing.enabled> Whether enable self-healing for detected disk failure or not. If disabled, the SelfHealingNotifier will only log the anomaly, but not take a fix action.
self.healing.topic.anomaly.enabled Boolean N <self.healing.enabled> Whether enable self-healing for detected topic anomaly or not. If disabled, the SelfHealingNotifier will only log the anomaly, but not take a fix action.
self.healing.maintenance.event.enabled Boolean N <self.healing.enabled> Whether enable self-healing for detected maintenance event or not. If disabled, the SelfHealingNotifier will only log the anomaly, but not take a fix action.

CruiseControlMetricsReporter configurations

Name Type Required? Default Value Description
cruise.control.metrics.topic String N "__CruiseControlMetrics" The topic to which CruiseControlMetricsReporter will produce the interested metrics. The metrics can be consumed by com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler to derive the partition level workload.
cruise.control.metrics.reporter.bootstrap.servers String Y The Kafka cluster to which CruiseControlMetricsReporter should produce the interested metrics. It is usually just the hosting Kafka cluster where the metrics reporter is running, but users can choose to produce to another cluster if they want to.
cruise.control.metrics.reporter.metrics.reporting.interval.ms Long N 60,000 The interval of collecting and sending the interested metrics.
cruise.control.metrics.reporter.kubernetes.mode Boolean N false Whether the CruiseControlMetricsReporter should report metrics using methods that are aware of container boundaries.
cruise.control.metrics.topic.auto.create Boolean N false Whether the metrics reporter should enforce the creation of the topic at launch.
cruise.control.metrics.topic.auto.create.timeout.ms Long N 10000 Timeout on the Cruise Control metrics topic creation.
cruise.control.metrics.topic.auto.create.retries Integer N 5 The number of retries the metrics reporter will attempt for the topic creation.
cruise.control.metrics.topic.num.partitions Integer Y NA The number of partitions of Cruise Control metrics topic.
cruise.control.metrics.topic.replication.factor Short Y NA The replication factor of Cruise Control metrics topic.
cruise.control.metrics.topic.retention.ms Long N 18000000 The retention time in milliseconds for metric messages in the Cruise Control metrics topic.
cruise.control.metrics.topic.min.insync.replicas Integer N Kafka cluster default The minimum number of insync replicas for the Cruise Control metrics topic.

PercentileMetricAnomalyFinderConfig configurations

Name Type Required? Default Value Description
metric.anomaly.percentile.upper.threshold Double N 95.0 The upper percentile threshold for the metric anomaly detector to identify an increase in the metric values of a broker as a metric anomaly.
metric.anomaly.percentile.lower.threshold Double N 2.0 The lower percentile threshold for the metric anomaly detector to identify an increase in the metric values of a broker as a metric anomaly.

SlowBrokerFinder configurations

Name Type Required? Default Value Description
self.healing.slow.broker.removal.enabled Boolean N false Whether to allow broker removal as self-healing operation for detected slow broker anomaly.
slow.broker.bytes.in.rate.detection.threshold Double N 1024.0 The bytes in rate threshold in unit of kilobytes per second to determine whether to include broker in slow broker detection.
slow.broker.log.flush.time.threshold.ms Double N 150.0 The log flush time threshold in unit of millisecond to determine whether to detect a broker as slow broker.
slow.broker.metric.history.percentile.threshold Double N 90.0 The percentile threshold used to compare latest metric value against historical value in slow broker detection.
slow.broker.metric.history.margin Double N 3.0 The margin used to compare latest metric value against historical value in slow broker detection.
slow.broker.peer.metric.percentile.threshold Double N 50.0 The percentile threshold used to compare last metric value against peers' latest value in slow broker detection.
slow.broker.peer.metric.margin Double N 10.0 The margin used to compare last metric value against peers' latest value in slow broker detection.
slow.broker.demotion.score Integer N 5 The score threshold to trigger a demotion for slow broker.
slow.broker.decommission.score Integer N 50 The score threshold to trigger a removal for slow broker.
slow.broker.self.healing.unfixable.ratio Double N 0.1 The maximum ratio of slow broker in the cluster to trigger self-healing operation.

Besides the above configurations, CruiseControlMetricsReporter takes all the configurations for vanilla KafkaProducer with a prefix of "cruise.control.metrics.reporter."

Resolving environment variables as config values

It is often required to resolve an environment variable as a config value. Such case is when a password is set by the environment to avoid putting them into config files.

For instance the following config will resolve the SSL_KEYSTORE_PASSWORD environment variable and set its value as the actual value of the webserver.ssl.keystore.password config:

webserver.ssl.keystore.password=${env:SSL_KEYSTORE_PASSWORD}