diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index fa2575d9d8ba2..baba44f943b2f 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import kafka.api._ import kafka.common._ import kafka.cluster.Broker -import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback} +import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback, ZkMigrationStateMetricName} import kafka.coordinator.transaction.ZkProducerIdManager import kafka.server._ import kafka.server.metadata.ZkFinalizedFeatureCache @@ -44,6 +44,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.metadata.LeaderRecoveryState +import org.apache.kafka.metadata.migration.ZkMigrationState import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.KafkaScheduler @@ -81,9 +82,11 @@ object KafkaController extends Logging { private val ReplicasIneligibleToDeleteCountMetricName = "ReplicasIneligibleToDeleteCount" private val ActiveBrokerCountMetricName = "ActiveBrokerCount" private val FencedBrokerCountMetricName = "FencedBrokerCount" + private val ZkMigrationStateMetricName = "ZkMigrationState" // package private for testing private[controller] val MetricNames = Set( + ZkMigrationStateMetricName, ActiveControllerCountMetricName, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, @@ -172,6 +175,7 @@ class KafkaController(val config: KafkaConfig, /* single-thread scheduler to clean expired tokens */ private val tokenCleanScheduler = new KafkaScheduler(1, true, "delegation-token-cleaner") + metricsGroup.newGauge(ZkMigrationStateMetricName, () => ZkMigrationState.ZK) metricsGroup.newGauge(ActiveControllerCountMetricName, () => if (isActive) 1 else 0) metricsGroup.newGauge(OfflinePartitionsCountMetricName, () => offlinePartitionCount) metricsGroup.newGauge(PreferredReplicaImbalanceCountMetricName, () => preferredReplicaImbalanceCount) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 199c679cb35c0..6a608d1dd7995 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1273,6 +1273,8 @@ public static List generateActivationRecords( "has been completed."); } break; + default: + throw new IllegalStateException("Unsupported ZkMigrationState " + featureControl.zkMigrationState()); } } else { if (zkMigrationEnabled) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java index 84f2bc1f9bbf7..37906b34b8bb5 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java @@ -39,6 +39,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable { "KafkaController", "FencedBrokerCount"); private final static MetricName ACTIVE_BROKER_COUNT = getMetricName( "KafkaController", "ActiveBrokerCount"); + private final static MetricName MIGRATING_ZK_BROKER_COUNT = getMetricName( + "KafkaController", "MigratingZkBrokerCount"); private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName( "KafkaController", "GlobalTopicCount"); private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName( @@ -55,6 +57,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable { private final Optional registry; private final AtomicInteger fencedBrokerCount = new AtomicInteger(0); private final AtomicInteger activeBrokerCount = new AtomicInteger(0); + private final AtomicInteger migratingZkBrokerCount = new AtomicInteger(0); private final AtomicInteger globalTopicCount = new AtomicInteger(0); private final AtomicInteger globalPartitionCount = new AtomicInteger(0); private final AtomicInteger offlinePartitionCount = new AtomicInteger(0); @@ -65,7 +68,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable { /** * Create a new ControllerMetadataMetrics object. * - * @param registry The metrics registry, or Optional.empty if this is a test and we don't have one. + * @param registry The metrics registry, or Optional.empty if this is a test and we don't have one. */ public ControllerMetadataMetrics(Optional registry) { this.registry = registry; @@ -117,6 +120,14 @@ public Integer value() { return (int) zkMigrationState(); } })); + + registry.ifPresent(r -> r.newGauge(MIGRATING_ZK_BROKER_COUNT, new Gauge() { + @Override + public Integer value() { + return migratingZkBrokerCount(); + } + })); + } public void setFencedBrokerCount(int brokerCount) { @@ -143,6 +154,18 @@ public int activeBrokerCount() { return this.activeBrokerCount.get(); } + public void setMigratingZkBrokerCount(int brokerCount) { + this.migratingZkBrokerCount.set(brokerCount); + } + + public void addToMigratingZkBrokerCount(int brokerCountDelta) { + this.migratingZkBrokerCount.addAndGet(brokerCountDelta); + } + + public int migratingZkBrokerCount() { + return this.migratingZkBrokerCount.get(); + } + public void setGlobalTopicCount(int topicCount) { this.globalTopicCount.set(topicCount); } @@ -212,6 +235,7 @@ public void close() { registry.ifPresent(r -> Arrays.asList( FENCED_BROKER_COUNT, ACTIVE_BROKER_COUNT, + MIGRATING_ZK_BROKER_COUNT, GLOBAL_TOPIC_COUNT, GLOBAL_PARTITION_COUNT, OFFLINE_PARTITION_COUNT, diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java index c72bdd9818f09..7459fe657af1b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java @@ -124,15 +124,21 @@ private void publishSnapshot(MetadataImage newImage) { metrics.setGlobalTopicCount(newImage.topics().topicsById().size()); int fencedBrokers = 0; int activeBrokers = 0; + int zkBrokers = 0; for (BrokerRegistration broker : newImage.cluster().brokers().values()) { if (broker.fenced()) { fencedBrokers++; } else { activeBrokers++; } + if (broker.isMigratingZkBroker()) { + zkBrokers++; + } } metrics.setFencedBrokerCount(fencedBrokers); metrics.setActiveBrokerCount(activeBrokers); + metrics.setMigratingZkBrokerCount(zkBrokers); + int totalPartitions = 0; int offlinePartitions = 0; int partitionsWithoutPreferredLeader = 0; diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java index b3bfb44076cfe..12956b3d61016 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java @@ -43,6 +43,7 @@ static int delta(boolean prev, boolean next) { private int fencedBrokersChange = 0; private int activeBrokersChange = 0; + private int migratingZkBrokersChange = 0; private int globalTopicsChange = 0; private int globalPartitionsChange = 0; private int offlinePartitionsChange = 0; @@ -56,6 +57,10 @@ public int activeBrokersChange() { return activeBrokersChange; } + public int migratingZkBrokersChange() { + return migratingZkBrokersChange; + } + public int globalTopicsChange() { return globalTopicsChange; } @@ -75,18 +80,23 @@ public int partitionsWithoutPreferredLeaderChange() { void handleBrokerChange(BrokerRegistration prev, BrokerRegistration next) { boolean wasFenced = false; boolean wasActive = false; + boolean wasZk = false; if (prev != null) { wasFenced = prev.fenced(); wasActive = !prev.fenced(); + wasZk = prev.isMigratingZkBroker(); } boolean isFenced = false; boolean isActive = false; + boolean isZk = false; if (next != null) { isFenced = next.fenced(); isActive = !next.fenced(); + isZk = next.isMigratingZkBroker(); } fencedBrokersChange += delta(wasFenced, isFenced); activeBrokersChange += delta(wasActive, isActive); + migratingZkBrokersChange += delta(wasZk, isZk); } void handleDeletedTopic(TopicImage deletedTopic) { @@ -141,6 +151,9 @@ void apply(ControllerMetadataMetrics metrics) { if (activeBrokersChange != 0) { metrics.addToActiveBrokerCount(activeBrokersChange); } + if (migratingZkBrokersChange != 0) { + metrics.addToMigratingZkBrokerCount(migratingZkBrokersChange); + } if (globalTopicsChange != 0) { metrics.addToGlobalTopicCount(globalTopicsChange); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java index 6c7aa581a7939..225d6d0fb8a51 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java @@ -45,7 +45,11 @@ public class QuorumControllerMetrics implements AutoCloseable { private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName( "ControllerEventManager", "EventQueueProcessingTimeMs"); private final static MetricName ZK_WRITE_BEHIND_LAG = getMetricName( - "KafkaController", "ZKWriteBehindLag"); + "KafkaController", "ZkWriteBehindLag"); + private final static MetricName ZK_WRITE_SNAPSHOT_TIME_MS = getMetricName( + "KafkaController", "ZkWriteSnapshotTimeMs"); + private final static MetricName ZK_WRITE_DELTA_TIME_MS = getMetricName( + "KafkaController", "ZkWriteDeltaTimeMs"); private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName( "KafkaController", "LastAppliedRecordOffset"); private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName( @@ -71,6 +75,9 @@ public class QuorumControllerMetrics implements AutoCloseable { private final AtomicLong dualWriteOffset = new AtomicLong(0); private final Consumer eventQueueTimeUpdater; private final Consumer eventQueueProcessingTimeUpdater; + private final Consumer zkWriteSnapshotTimeHandler; + private final Consumer zkWriteDeltaTimeHandler; + private final AtomicLong timedOutHeartbeats = new AtomicLong(0); private final AtomicLong operationsStarted = new AtomicLong(0); private final AtomicLong operationsTimedOut = new AtomicLong(0); @@ -88,7 +95,7 @@ private Consumer newHistogram(MetricName name, boolean biased) { public QuorumControllerMetrics( Optional registry, Time time, - boolean zkMigrationState + boolean zkMigrationEnabled ) { this.registry = registry; this.active = false; @@ -148,7 +155,8 @@ public Long value() { return newActiveControllers(); } })); - if (zkMigrationState) { + + if (zkMigrationEnabled) { registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new Gauge() { @Override public Long value() { @@ -158,6 +166,11 @@ public Long value() { else return lastCommittedRecordOffset() - dualWriteOffset(); } })); + this.zkWriteSnapshotTimeHandler = newHistogram(ZK_WRITE_SNAPSHOT_TIME_MS, true); + this.zkWriteDeltaTimeHandler = newHistogram(ZK_WRITE_DELTA_TIME_MS, true); + } else { + this.zkWriteSnapshotTimeHandler = __ -> { }; + this.zkWriteDeltaTimeHandler = __ -> { }; } } @@ -177,6 +190,14 @@ public void updateEventQueueProcessingTime(long durationMs) { eventQueueProcessingTimeUpdater.accept(durationMs); } + public void updateZkWriteSnapshotTimeMs(long durationMs) { + zkWriteSnapshotTimeHandler.accept(durationMs); + } + + public void updateZkWriteDeltaTimeMs(long durationMs) { + zkWriteDeltaTimeHandler.accept(durationMs); + } + public void setLastAppliedRecordOffset(long offset) { lastAppliedRecordOffset.set(offset); } @@ -255,7 +276,9 @@ public void close() { EVENT_QUEUE_OPERATIONS_STARTED_COUNT, EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT, NEW_ACTIVE_CONTROLLERS_COUNT, - ZK_WRITE_BEHIND_LAG + ZK_WRITE_BEHIND_LAG, + ZK_WRITE_SNAPSHOT_TIME_MS, + ZK_WRITE_DELTA_TIME_MS ).forEach(r::removeMetric)); } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 740c3e4b1dd58..93379f3aa013c 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -486,12 +486,17 @@ public void run() throws Exception { } Map dualWriteCounts = new TreeMap<>(); + long startTime = time.nanoseconds(); if (isSnapshot) { zkMetadataWriter.handleSnapshot(image, countingOperationConsumer( - dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + controllerMetrics.updateZkWriteSnapshotTimeMs(NANOSECONDS.toMillis(time.nanoseconds() - startTime)); } else { - zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer( - dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + if (zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer( + dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation))) { + // Only record delta write time if we changed something. Otherwise, no-op records will skew timings. + controllerMetrics.updateZkWriteDeltaTimeMs(NANOSECONDS.toMillis(time.nanoseconds() - startTime)); + } } if (dualWriteCounts.isEmpty()) { log.trace("Did not make any ZK writes when handling KRaft {}", isSnapshot ? "snapshot" : "delta"); @@ -567,6 +572,8 @@ public void run() throws Exception { log.error("KRaft controller indicates a completed migration, but the migration driver is somehow active."); transitionTo(MigrationDriverState.INACTIVE); break; + default: + throw new IllegalStateException("Unsupported ZkMigrationState " + zkMigrationState); } } } @@ -669,8 +676,11 @@ public void run() throws Exception { if (migrationState == MigrationDriverState.SYNC_KRAFT_TO_ZK) { log.info("Performing a full metadata sync from KRaft to ZK."); Map dualWriteCounts = new TreeMap<>(); + long startTime = time.nanoseconds(); zkMetadataWriter.handleSnapshot(image, countingOperationConsumer( - dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + long endTime = time.nanoseconds(); + controllerMetrics.updateZkWriteSnapshotTimeMs(NANOSECONDS.toMillis(startTime - endTime)); log.info("Made the following ZK writes when reconciling with KRaft state: {}", dualWriteCounts); transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM); } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java index 8dd5d6741b712..3cb95ec5bea51 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java @@ -93,27 +93,34 @@ public void handleSnapshot(MetadataImage image, KRaftMigrationOperationConsumer handleAclsSnapshot(image.acls(), operationConsumer); } - public void handleDelta( + public boolean handleDelta( MetadataImage previousImage, MetadataImage image, MetadataDelta delta, KRaftMigrationOperationConsumer operationConsumer ) { + boolean updated = false; if (delta.topicsDelta() != null) { handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, image.topics(), delta.topicsDelta(), operationConsumer); + updated = true; } if (delta.configsDelta() != null) { handleConfigsDelta(image.configs(), delta.configsDelta(), operationConsumer); + updated = true; } if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) { handleClientQuotasDelta(image, delta, operationConsumer); + updated = true; } if (delta.producerIdsDelta() != null) { handleProducerIdDelta(delta.producerIdsDelta(), operationConsumer); + updated = true; } if (delta.aclsDelta() != null) { handleAclsDelta(image.acls(), delta.aclsDelta(), operationConsumer); + updated = true; } + return updated; } /** diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java index 8d5e584831c65..ff8ebd08b38a2 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java @@ -62,7 +62,14 @@ public enum ZkMigrationState { * will persist indefinitely after the migration. In operational terms, this is the same as the NONE * state. */ - POST_MIGRATION((byte) 3); + POST_MIGRATION((byte) 3), + + /** + * The controller is a ZK controller. No migration has been performed. This state is never persisted + * and is only used by KafkaController in order to have a unified metric that indicates what kind of + * metadata state the controller is in. + */ + ZK((byte) 4); private final byte value; diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java index 47ffcc3589f08..9bec14d0c698b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java @@ -41,6 +41,7 @@ public void testMetricNames() { new HashSet<>(Arrays.asList( "kafka.controller:type=KafkaController,name=ActiveBrokerCount", "kafka.controller:type=KafkaController,name=FencedBrokerCount", + "kafka.controller:type=KafkaController,name=MigratingZkBrokerCount", "kafka.controller:type=KafkaController,name=GlobalPartitionCount", "kafka.controller:type=KafkaController,name=GlobalTopicCount", "kafka.controller:type=KafkaController,name=MetadataErrorCount", diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java index ef75dc6117075..2362629ae4aa0 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java @@ -51,13 +51,27 @@ private static BrokerRegistration brokerRegistration( boolean fenced ) { return new BrokerRegistration(brokerId, - 100L, - Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"), - Collections.emptyList(), - Collections.emptyMap(), - Optional.empty(), - fenced, - false); + 100L, + Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"), + Collections.emptyList(), + Collections.emptyMap(), + Optional.empty(), + fenced, + false); + } + + private static BrokerRegistration zkBrokerRegistration( + int brokerId + ) { + return new BrokerRegistration(brokerId, + 100L, + Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"), + Collections.emptyList(), + Collections.emptyMap(), + Optional.empty(), + false, + false, + true); } @Test @@ -103,6 +117,20 @@ public void testHandleBrokerUnfencing() { assertEquals(1, changes.activeBrokersChange()); } + @Test + public void testHandleZkBroker() { + ControllerMetricsChanges changes = new ControllerMetricsChanges(); + changes.handleBrokerChange(null, zkBrokerRegistration(1)); + assertEquals(1, changes.migratingZkBrokersChange()); + changes.handleBrokerChange(null, zkBrokerRegistration(2)); + changes.handleBrokerChange(null, zkBrokerRegistration(3)); + assertEquals(3, changes.migratingZkBrokersChange()); + + changes.handleBrokerChange(zkBrokerRegistration(3), brokerRegistration(3, true)); + changes.handleBrokerChange(brokerRegistration(3, true), brokerRegistration(3, false)); + assertEquals(2, changes.migratingZkBrokersChange()); + } + @Test public void testHandleDeletedTopic() { ControllerMetricsChanges changes = new ControllerMetricsChanges(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java index 9258577a0ddf3..936009e47d94d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java @@ -59,7 +59,9 @@ public void testMetricNames(boolean inMigration) { "kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount" )); if (inMigration) { - expected.add("kafka.controller:type=KafkaController,name=ZKWriteBehindLag"); + expected.add("kafka.controller:type=KafkaController,name=ZkWriteBehindLag"); + expected.add("kafka.controller:type=KafkaController,name=ZkWriteSnapshotTimeMs"); + expected.add("kafka.controller:type=KafkaController,name=ZkWriteDeltaTimeMs"); } ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller", expected); } @@ -144,7 +146,7 @@ public void testLastAppliedRecordMetrics() { @SuppressWarnings("unchecked") Gauge zkWriteBehindLag = (Gauge) registry .allMetrics() - .get(metricName("KafkaController", "ZKWriteBehindLag")); + .get(metricName("KafkaController", "ZkWriteBehindLag")); assertEquals(10L, zkWriteBehindLag.value()); @SuppressWarnings("unchecked") @@ -184,8 +186,8 @@ public void testUpdateZKWriteBehindLag() { metrics.updateDualWriteOffset(0); @SuppressWarnings("unchecked") Gauge zkWriteBehindLag = (Gauge) registry - .allMetrics() - .get(metricName("KafkaController", "ZKWriteBehindLag")); + .allMetrics() + .get(metricName("KafkaController", "ZkWriteBehindLag")); assertEquals(0, zkWriteBehindLag.value()); } finally { registry.shutdown(); @@ -197,8 +199,8 @@ public void testUpdateZKWriteBehindLag() { metrics.setLastCommittedRecordOffset(100); @SuppressWarnings("unchecked") Gauge zkWriteBehindLag = (Gauge) registry - .allMetrics() - .get(metricName("KafkaController", "ZKWriteBehindLag")); + .allMetrics() + .get(metricName("KafkaController", "ZkWriteBehindLag")); assertEquals(10, zkWriteBehindLag.value()); } finally { registry.shutdown();