diff --git a/core/src/main/java/kafka/autobalancer/LoadRetriever.java b/core/src/main/java/kafka/autobalancer/LoadRetriever.java index 1a209e95fa..3195ecc347 100644 --- a/core/src/main/java/kafka/autobalancer/LoadRetriever.java +++ b/core/src/main/java/kafka/autobalancer/LoadRetriever.java @@ -15,6 +15,7 @@ import kafka.autobalancer.common.AutoBalancerThreadFactory; import kafka.autobalancer.common.Utils; import kafka.autobalancer.common.types.MetricTypes; +import kafka.autobalancer.common.types.MetricVersion; import kafka.autobalancer.config.AutoBalancerControllerConfig; import kafka.autobalancer.config.StaticAutoBalancerConfig; import kafka.autobalancer.config.StaticAutoBalancerConfigUtils; @@ -23,6 +24,7 @@ import kafka.autobalancer.metricsreporter.metric.BrokerMetrics; import kafka.autobalancer.metricsreporter.metric.MetricSerde; import kafka.autobalancer.metricsreporter.metric.TopicPartitionMetrics; +import kafka.autobalancer.model.BrokerUpdater; import kafka.autobalancer.model.ClusterModel; import kafka.autobalancer.services.AbstractResumableService; import org.apache.kafka.clients.consumer.Consumer; @@ -494,6 +496,10 @@ protected void updateClusterModel(AutoBalancerMetrics metrics) { switch (metrics.metricType()) { case MetricTypes.TOPIC_PARTITION_METRIC: TopicPartitionMetrics partitionMetrics = (TopicPartitionMetrics) metrics; + BrokerUpdater brokerUpdater = clusterModel.brokerUpdater(partitionMetrics.brokerId()); + if (brokerUpdater != null && brokerUpdater.metricVersion() == MetricVersion.V0) { + clusterModel.updateBrokerMetrics(partitionMetrics.brokerId(), new HashMap().entrySet(), partitionMetrics.time()); + } clusterModel.updateTopicPartitionMetrics(partitionMetrics.brokerId(), new TopicPartition(partitionMetrics.topic(), partitionMetrics.partition()), partitionMetrics.getMetricValueMap().entrySet(), partitionMetrics.time());