From dd9d71ead12ad1d226965c4111ee6f0dc4d289a9 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Tue, 18 Jun 2024 11:12:49 +0800 Subject: [PATCH] fix(auto_balancer): fix metrics compatibility for version V0 Signed-off-by: Shichao Nie --- core/src/main/java/kafka/autobalancer/LoadRetriever.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/java/kafka/autobalancer/LoadRetriever.java b/core/src/main/java/kafka/autobalancer/LoadRetriever.java index 1a209e95fa..3195ecc347 100644 --- a/core/src/main/java/kafka/autobalancer/LoadRetriever.java +++ b/core/src/main/java/kafka/autobalancer/LoadRetriever.java @@ -15,6 +15,7 @@ import kafka.autobalancer.common.AutoBalancerThreadFactory; import kafka.autobalancer.common.Utils; import kafka.autobalancer.common.types.MetricTypes; +import kafka.autobalancer.common.types.MetricVersion; import kafka.autobalancer.config.AutoBalancerControllerConfig; import kafka.autobalancer.config.StaticAutoBalancerConfig; import kafka.autobalancer.config.StaticAutoBalancerConfigUtils; @@ -23,6 +24,7 @@ import kafka.autobalancer.metricsreporter.metric.BrokerMetrics; import kafka.autobalancer.metricsreporter.metric.MetricSerde; import kafka.autobalancer.metricsreporter.metric.TopicPartitionMetrics; +import kafka.autobalancer.model.BrokerUpdater; import kafka.autobalancer.model.ClusterModel; import kafka.autobalancer.services.AbstractResumableService; import org.apache.kafka.clients.consumer.Consumer; @@ -494,6 +496,10 @@ protected void updateClusterModel(AutoBalancerMetrics metrics) { switch (metrics.metricType()) { case MetricTypes.TOPIC_PARTITION_METRIC: TopicPartitionMetrics partitionMetrics = (TopicPartitionMetrics) metrics; + BrokerUpdater brokerUpdater = clusterModel.brokerUpdater(partitionMetrics.brokerId()); + if (brokerUpdater != null && brokerUpdater.metricVersion() == MetricVersion.V0) { + clusterModel.updateBrokerMetrics(partitionMetrics.brokerId(), new HashMap().entrySet(), partitionMetrics.time()); + } clusterModel.updateTopicPartitionMetrics(partitionMetrics.brokerId(), new TopicPartition(partitionMetrics.topic(), partitionMetrics.partition()), partitionMetrics.getMetricValueMap().entrySet(), partitionMetrics.time());