From bcc87ea50a40368eed0573332707f7a1c146431a Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 17 Jul 2023 11:45:31 -0400 Subject: [PATCH 1/5] Add dual-write snapshot and delta timing metrics Also add metadata type metric. --- .../kafka/controller/KafkaController.scala | 3 +- .../main/scala/kafka/server/KafkaBroker.scala | 7 +++- .../scala/kafka/server/SharedServer.scala | 5 ++- .../metrics/ControllerMetadataMetrics.java | 28 ++++++++++++- .../ControllerMetadataMetricsPublisher.java | 6 +++ .../metrics/ControllerMetricsChanges.java | 11 ++++++ .../metrics/QuorumControllerMetrics.java | 39 +++++++++++++++++-- .../migration/KRaftMigrationDriver.java | 16 ++++++-- .../migration/KRaftMigrationZkWriter.java | 9 ++++- ...ontrollerMetadataMetricsPublisherTest.java | 2 +- .../ControllerMetadataMetricsTest.java | 31 +++++++++++++-- .../metrics/QuorumControllerMetricsTest.java | 14 ++++--- .../server/metrics/MetadataTypeMetric.java | 26 +++++++++++++ 13 files changed, 174 insertions(+), 23 deletions(-) create mode 100644 server-common/src/main/java/org/apache/kafka/server/metrics/MetadataTypeMetric.java diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index c3f76e83d122c..8d3f6beff98bf 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -46,7 +46,7 @@ import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, Leade import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.common.ProducerIdsBlock -import org.apache.kafka.server.metrics.KafkaMetricsGroup +import org.apache.kafka.server.metrics.{KafkaMetricsGroup, MetadataTypeMetric} import org.apache.kafka.server.util.KafkaScheduler import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code @@ -173,6 +173,7 @@ class KafkaController(val config: KafkaConfig, /* single-thread scheduler to clean expired tokens */ private val tokenCleanScheduler = new KafkaScheduler(1, true, "delegation-token-cleaner") + metricsGroup.newGauge(MetadataTypeMetric.METRIC_NAME, () => MetadataTypeMetric.ZK) metricsGroup.newGauge(ActiveControllerCountMetricName, () => if (isActive) 1 else 0) metricsGroup.newGauge(OfflinePartitionsCountMetricName, () => offlinePartitionCount) metricsGroup.newGauge(PreferredReplicaImbalanceCountMetricName, () => preferredReplicaImbalanceCount) diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala index cb66a9eb0895f..c7f1bf928a43e 100644 --- a/core/src/main/scala/kafka/server/KafkaBroker.scala +++ b/core/src/main/scala/kafka/server/KafkaBroker.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.utils.Time import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.metadata.BrokerState import org.apache.kafka.server.authorizer.Authorizer -import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} +import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, MetadataTypeMetric} import org.apache.kafka.server.util.Scheduler import java.util @@ -101,6 +101,11 @@ trait KafkaBroker extends Logging { } } + metricsGroup.newGauge(MetadataTypeMetric.METRIC_NAME, () => this match { + case _: BrokerServer => MetadataTypeMetric.KRAFT + case _: KafkaServer => MetadataTypeMetric.ZK + case _ => throw new IllegalStateException() + }) metricsGroup.newGauge("BrokerState", () => brokerState.value) metricsGroup.newGauge("ClusterId", () => clusterId) metricsGroup.newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size) diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index e58b33a8d5719..4703f1f183936 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -242,7 +242,10 @@ class SharedServer( brokerMetrics = BrokerServerMetrics(metrics) } if (sharedServerConfig.processRoles.contains(ControllerRole)) { - controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry())) + controllerServerMetrics = new ControllerMetadataMetrics( + Optional.of(KafkaYammerMetrics.defaultRegistry()), + sharedServerConfig.migrationEnabled + ) } val _raftManager = new KafkaRaftManager[ApiMessageAndVersion]( metaProps, diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java index 84f2bc1f9bbf7..47e5f79a4bd8f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java @@ -39,6 +39,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable { "KafkaController", "FencedBrokerCount"); private final static MetricName ACTIVE_BROKER_COUNT = getMetricName( "KafkaController", "ActiveBrokerCount"); + private final static MetricName MIGRATING_ZK_BROKER_COUNT = getMetricName( + "KafkaController", "MigratingZkBrokerCount"); private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName( "KafkaController", "GlobalTopicCount"); private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName( @@ -55,6 +57,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable { private final Optional registry; private final AtomicInteger fencedBrokerCount = new AtomicInteger(0); private final AtomicInteger activeBrokerCount = new AtomicInteger(0); + private final AtomicInteger migratingZkBrokerCount = new AtomicInteger(0); private final AtomicInteger globalTopicCount = new AtomicInteger(0); private final AtomicInteger globalPartitionCount = new AtomicInteger(0); private final AtomicInteger offlinePartitionCount = new AtomicInteger(0); @@ -67,7 +70,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable { * * @param registry The metrics registry, or Optional.empty if this is a test and we don't have one. */ - public ControllerMetadataMetrics(Optional registry) { + public ControllerMetadataMetrics(Optional registry, boolean zkMigrationEnabled) { this.registry = registry; registry.ifPresent(r -> r.newGauge(FENCED_BROKER_COUNT, new Gauge() { @Override @@ -117,6 +120,15 @@ public Integer value() { return (int) zkMigrationState(); } })); + + if (zkMigrationEnabled) { + registry.ifPresent(r -> r.newGauge(MIGRATING_ZK_BROKER_COUNT, new Gauge() { + @Override + public Integer value() { + return migratingZkBrokerCount(); + } + })); + } } public void setFencedBrokerCount(int brokerCount) { @@ -143,6 +155,19 @@ public int activeBrokerCount() { return this.activeBrokerCount.get(); } + + public void setMigratingZkBrokerCount(int brokerCount) { + this.migratingZkBrokerCount.set(brokerCount); + } + + public void addToMigratingZkBrokerCount(int brokerCountDelta) { + this.migratingZkBrokerCount.addAndGet(brokerCountDelta); + } + + public int migratingZkBrokerCount() { + return this.migratingZkBrokerCount.get(); + } + public void setGlobalTopicCount(int topicCount) { this.globalTopicCount.set(topicCount); } @@ -212,6 +237,7 @@ public void close() { registry.ifPresent(r -> Arrays.asList( FENCED_BROKER_COUNT, ACTIVE_BROKER_COUNT, + MIGRATING_ZK_BROKER_COUNT, GLOBAL_TOPIC_COUNT, GLOBAL_PARTITION_COUNT, OFFLINE_PARTITION_COUNT, diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java index c72bdd9818f09..7459fe657af1b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java @@ -124,15 +124,21 @@ private void publishSnapshot(MetadataImage newImage) { metrics.setGlobalTopicCount(newImage.topics().topicsById().size()); int fencedBrokers = 0; int activeBrokers = 0; + int zkBrokers = 0; for (BrokerRegistration broker : newImage.cluster().brokers().values()) { if (broker.fenced()) { fencedBrokers++; } else { activeBrokers++; } + if (broker.isMigratingZkBroker()) { + zkBrokers++; + } } metrics.setFencedBrokerCount(fencedBrokers); metrics.setActiveBrokerCount(activeBrokers); + metrics.setMigratingZkBrokerCount(zkBrokers); + int totalPartitions = 0; int offlinePartitions = 0; int partitionsWithoutPreferredLeader = 0; diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java index b3bfb44076cfe..0a6a22a2364d0 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java @@ -43,6 +43,7 @@ static int delta(boolean prev, boolean next) { private int fencedBrokersChange = 0; private int activeBrokersChange = 0; + private int migratingZkBrokersChange = 0; private int globalTopicsChange = 0; private int globalPartitionsChange = 0; private int offlinePartitionsChange = 0; @@ -56,6 +57,10 @@ public int activeBrokersChange() { return activeBrokersChange; } + public int migratingZkBrokersChange() { + return migratingZkBrokersChange; + } + public int globalTopicsChange() { return globalTopicsChange; } @@ -85,6 +90,9 @@ void handleBrokerChange(BrokerRegistration prev, BrokerRegistration next) { isFenced = next.fenced(); isActive = !next.fenced(); } + if (prev == null && next != null && next.isMigratingZkBroker()) { + migratingZkBrokersChange += 1; + } fencedBrokersChange += delta(wasFenced, isFenced); activeBrokersChange += delta(wasActive, isActive); } @@ -141,6 +149,9 @@ void apply(ControllerMetadataMetrics metrics) { if (activeBrokersChange != 0) { metrics.addToActiveBrokerCount(activeBrokersChange); } + if (migratingZkBrokersChange != 0) { + metrics.addToMigratingZkBrokerCount(migratingZkBrokersChange); + } if (globalTopicsChange != 0) { metrics.addToGlobalTopicCount(globalTopicsChange); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java index e267ebdfb9a08..16be770ce8c1b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java @@ -23,6 +23,7 @@ import com.yammer.metrics.core.MetricsRegistry; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.metrics.KafkaYammerMetrics; +import org.apache.kafka.server.metrics.MetadataTypeMetric; import java.util.Arrays; import java.util.Optional; @@ -38,6 +39,8 @@ * @link{org.apache.kafka.controller.metrics.ControllerMetadataMetrics}, not here. */ public class QuorumControllerMetrics implements AutoCloseable { + private final static MetricName METADATA_TYPE = getMetricName( + "KafkaController", MetadataTypeMetric.METRIC_NAME); private final static MetricName ACTIVE_CONTROLLER_COUNT = getMetricName( "KafkaController", "ActiveControllerCount"); private final static MetricName EVENT_QUEUE_TIME_MS = getMetricName( @@ -45,7 +48,11 @@ public class QuorumControllerMetrics implements AutoCloseable { private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName( "ControllerEventManager", "EventQueueProcessingTimeMs"); private final static MetricName ZK_WRITE_BEHIND_LAG = getMetricName( - "KafkaController", "ZKWriteBehindLag"); + "KafkaController", "ZkWriteBehindLag"); + private final static MetricName ZK_WRITE_SNAPSHOT_TIME_MS = getMetricName( + "KafkaController", "ZkWriteSnapshotTimeMs"); + private final static MetricName ZK_WRITE_DELTA_TIME_MS = getMetricName( + "KafkaController", "ZkWriteDeltaTimeMs"); private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName( "KafkaController", "LastAppliedRecordOffset"); private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName( @@ -63,6 +70,9 @@ public class QuorumControllerMetrics implements AutoCloseable { private final AtomicLong dualWriteOffset = new AtomicLong(0); private final Consumer eventQueueTimeUpdater; private final Consumer eventQueueProcessingTimeUpdater; + private final Consumer zkWriteSnapshotTimeHandler; + private final Consumer zkWriteDeltaTimeHandler; + private final AtomicLong timedOutHeartbeats = new AtomicLong(0); private Consumer newHistogram(MetricName name, boolean biased) { @@ -77,10 +87,16 @@ private Consumer newHistogram(MetricName name, boolean biased) { public QuorumControllerMetrics( Optional registry, Time time, - boolean zkMigrationState + boolean zkMigrationEnabled ) { this.registry = registry; this.active = false; + registry.ifPresent(r -> r.newGauge(METADATA_TYPE, new Gauge() { + @Override + public Integer value() { + return MetadataTypeMetric.KRAFT; + } + })); registry.ifPresent(r -> r.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge() { @Override public Integer value() { @@ -114,7 +130,7 @@ public Long value() { } })); - if (zkMigrationState) { + if (zkMigrationEnabled) { registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new Gauge() { @Override public Long value() { @@ -124,6 +140,11 @@ public Long value() { else return lastCommittedRecordOffset() - dualWriteOffset(); } })); + this.zkWriteSnapshotTimeHandler = newHistogram(ZK_WRITE_SNAPSHOT_TIME_MS, true); + this.zkWriteDeltaTimeHandler = newHistogram(ZK_WRITE_DELTA_TIME_MS, true); + } else { + this.zkWriteSnapshotTimeHandler = __ -> { }; + this.zkWriteDeltaTimeHandler = __ -> { }; } } @@ -143,6 +164,14 @@ public void updateEventQueueProcessingTime(long durationMs) { eventQueueProcessingTimeUpdater.accept(durationMs); } + public void updateZkWriteSnapshotTimeMs(long durationMs) { + zkWriteSnapshotTimeHandler.accept(durationMs); + } + + public void updateZkWriteDeltaTimeMs(long durationMs) { + zkWriteDeltaTimeHandler.accept(durationMs); + } + public void setLastAppliedRecordOffset(long offset) { lastAppliedRecordOffset.set(offset); } @@ -197,7 +226,9 @@ public void close() { LAST_COMMITTED_RECORD_OFFSET, LAST_APPLIED_RECORD_TIMESTAMP, LAST_APPLIED_RECORD_LAG_MS, - ZK_WRITE_BEHIND_LAG + ZK_WRITE_BEHIND_LAG, + ZK_WRITE_SNAPSHOT_TIME_MS, + ZK_WRITE_DELTA_TIME_MS ).forEach(r::removeMetric)); } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 740c3e4b1dd58..1012dedd176df 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -486,12 +486,17 @@ public void run() throws Exception { } Map dualWriteCounts = new TreeMap<>(); + long startTime = time.nanoseconds(); if (isSnapshot) { zkMetadataWriter.handleSnapshot(image, countingOperationConsumer( - dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + controllerMetrics.updateZkWriteSnapshotTimeMs(NANOSECONDS.toMillis(time.nanoseconds() - startTime)); } else { - zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer( - dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + if (zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer( + dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation))) { + // Only record delta write time if we changed something. Otherwise, no-op records will skew timings. + controllerMetrics.updateZkWriteDeltaTimeMs(NANOSECONDS.toMillis(time.nanoseconds() - startTime)); + } } if (dualWriteCounts.isEmpty()) { log.trace("Did not make any ZK writes when handling KRaft {}", isSnapshot ? "snapshot" : "delta"); @@ -669,8 +674,11 @@ public void run() throws Exception { if (migrationState == MigrationDriverState.SYNC_KRAFT_TO_ZK) { log.info("Performing a full metadata sync from KRaft to ZK."); Map dualWriteCounts = new TreeMap<>(); + long startTime = time.nanoseconds(); zkMetadataWriter.handleSnapshot(image, countingOperationConsumer( - dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + long endTime = time.nanoseconds(); + controllerMetrics.updateZkWriteSnapshotTimeMs(NANOSECONDS.toMillis(startTime - endTime)); log.info("Made the following ZK writes when reconciling with KRaft state: {}", dualWriteCounts); transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM); } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java index 8dd5d6741b712..3cb95ec5bea51 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java @@ -93,27 +93,34 @@ public void handleSnapshot(MetadataImage image, KRaftMigrationOperationConsumer handleAclsSnapshot(image.acls(), operationConsumer); } - public void handleDelta( + public boolean handleDelta( MetadataImage previousImage, MetadataImage image, MetadataDelta delta, KRaftMigrationOperationConsumer operationConsumer ) { + boolean updated = false; if (delta.topicsDelta() != null) { handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, image.topics(), delta.topicsDelta(), operationConsumer); + updated = true; } if (delta.configsDelta() != null) { handleConfigsDelta(image.configs(), delta.configsDelta(), operationConsumer); + updated = true; } if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) { handleClientQuotasDelta(image, delta, operationConsumer); + updated = true; } if (delta.producerIdsDelta() != null) { handleProducerIdDelta(delta.producerIdsDelta(), operationConsumer); + updated = true; } if (delta.aclsDelta() != null) { handleAclsDelta(image.acls(), delta.aclsDelta(), operationConsumer); + updated = true; } + return updated; } /** diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java index e05123365f6d0..d534a0d4a4763 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java @@ -53,7 +53,7 @@ static class TestEnv implements AutoCloseable { MockFaultHandler faultHandler = new MockFaultHandler("ControllerMetadataMetricsPublisher"); ControllerMetadataMetrics metrics = - new ControllerMetadataMetrics(Optional.empty()); + new ControllerMetadataMetrics(Optional.empty(), false); ControllerMetadataMetricsPublisher publisher = new ControllerMetadataMetricsPublisher(metrics, faultHandler); diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java index 47ffcc3589f08..cd42ce84a6cda 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java @@ -36,7 +36,7 @@ public class ControllerMetadataMetricsTest { public void testMetricNames() { MetricsRegistry registry = new MetricsRegistry(); try { - try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry))) { + try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry), false)) { ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller:", new HashSet<>(Arrays.asList( "kafka.controller:type=KafkaController,name=ActiveBrokerCount", @@ -56,10 +56,35 @@ public void testMetricNames() { } } + @Test + public void testZkMigrationMetricNames() { + MetricsRegistry registry = new MetricsRegistry(); + try { + try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry), true)) { + ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller:", + new HashSet<>(Arrays.asList( + "kafka.controller:type=KafkaController,name=ActiveBrokerCount", + "kafka.controller:type=KafkaController,name=FencedBrokerCount", + "kafka.controller:type=KafkaController,name=MigratingZkBrokerCount", + "kafka.controller:type=KafkaController,name=GlobalPartitionCount", + "kafka.controller:type=KafkaController,name=GlobalTopicCount", + "kafka.controller:type=KafkaController,name=MetadataErrorCount", + "kafka.controller:type=KafkaController,name=OfflinePartitionsCount", + "kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount", + "kafka.controller:type=KafkaController,name=ZkMigrationState" + ))); + } + ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "KafkaController", + Collections.emptySet()); + } finally { + registry.shutdown(); + } + } + @Test public void testMetadataErrorCount() { MetricsRegistry registry = new MetricsRegistry(); - try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry))) { + try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry), false)) { @SuppressWarnings("unchecked") Gauge metadataErrorCount = (Gauge) registry .allMetrics() @@ -84,7 +109,7 @@ private void testIntGaugeMetric( BiConsumer incrementer ) { MetricsRegistry registry = new MetricsRegistry(); - try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry))) { + try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry), false)) { assertEquals(0, metricsGetter.apply(metrics)); assertEquals(0, registryGetter.apply(registry)); setter.accept(metrics, 123); diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java index bec023020a418..01bef439eb4e2 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java @@ -73,8 +73,10 @@ public void testMetricNamesInMigrationState() { "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset", "kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp", "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset", - "kafka.controller:type=KafkaController,name=ZKWriteBehindLag" - ))); + "kafka.controller:type=KafkaController,name=ZkWriteBehindLag", + "kafka.controller:type=KafkaController,name=ZkWriteSnapshotTimeMs", + "kafka.controller:type=KafkaController,name=ZkWriteDeltaTimeMs" + ))); } ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller", Collections.emptySet()); @@ -154,8 +156,8 @@ public void testUpdateZKWriteBehindLag() { metrics.updateDualWriteOffset(0); @SuppressWarnings("unchecked") Gauge zkWriteBehindLag = (Gauge) registry - .allMetrics() - .get(metricName("KafkaController", "ZKWriteBehindLag")); + .allMetrics() + .get(metricName("KafkaController", "ZkWriteBehindLag")); assertEquals(0, zkWriteBehindLag.value()); } finally { registry.shutdown(); @@ -167,8 +169,8 @@ public void testUpdateZKWriteBehindLag() { metrics.setLastCommittedRecordOffset(100); @SuppressWarnings("unchecked") Gauge zkWriteBehindLag = (Gauge) registry - .allMetrics() - .get(metricName("KafkaController", "ZKWriteBehindLag")); + .allMetrics() + .get(metricName("KafkaController", "ZkWriteBehindLag")); assertEquals(10, zkWriteBehindLag.value()); } finally { registry.shutdown(); diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/MetadataTypeMetric.java b/server-common/src/main/java/org/apache/kafka/server/metrics/MetadataTypeMetric.java new file mode 100644 index 0000000000000..00e19359fd3e2 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/MetadataTypeMetric.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +/** + * Integer values to represent Kafka metadata types as defined in KIP-866 + */ +public class MetadataTypeMetric { + public static final String METRIC_NAME = "MetadataType"; + public static final int ZK = 1; + public static final int KRAFT = 2; +} From f78ea2858c4912c6206bc75345502e99df49e15c Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 17 Jul 2023 14:59:23 -0400 Subject: [PATCH 2/5] update test --- .../metrics/ControllerMetricsChanges.java | 8 ++-- .../metrics/ControllerMetricsChangesTest.java | 42 +++++++++++++++---- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java index 0a6a22a2364d0..12956b3d61016 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java @@ -80,21 +80,23 @@ public int partitionsWithoutPreferredLeaderChange() { void handleBrokerChange(BrokerRegistration prev, BrokerRegistration next) { boolean wasFenced = false; boolean wasActive = false; + boolean wasZk = false; if (prev != null) { wasFenced = prev.fenced(); wasActive = !prev.fenced(); + wasZk = prev.isMigratingZkBroker(); } boolean isFenced = false; boolean isActive = false; + boolean isZk = false; if (next != null) { isFenced = next.fenced(); isActive = !next.fenced(); - } - if (prev == null && next != null && next.isMigratingZkBroker()) { - migratingZkBrokersChange += 1; + isZk = next.isMigratingZkBroker(); } fencedBrokersChange += delta(wasFenced, isFenced); activeBrokersChange += delta(wasActive, isActive); + migratingZkBrokersChange += delta(wasZk, isZk); } void handleDeletedTopic(TopicImage deletedTopic) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java index ef75dc6117075..2362629ae4aa0 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java @@ -51,13 +51,27 @@ private static BrokerRegistration brokerRegistration( boolean fenced ) { return new BrokerRegistration(brokerId, - 100L, - Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"), - Collections.emptyList(), - Collections.emptyMap(), - Optional.empty(), - fenced, - false); + 100L, + Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"), + Collections.emptyList(), + Collections.emptyMap(), + Optional.empty(), + fenced, + false); + } + + private static BrokerRegistration zkBrokerRegistration( + int brokerId + ) { + return new BrokerRegistration(brokerId, + 100L, + Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"), + Collections.emptyList(), + Collections.emptyMap(), + Optional.empty(), + false, + false, + true); } @Test @@ -103,6 +117,20 @@ public void testHandleBrokerUnfencing() { assertEquals(1, changes.activeBrokersChange()); } + @Test + public void testHandleZkBroker() { + ControllerMetricsChanges changes = new ControllerMetricsChanges(); + changes.handleBrokerChange(null, zkBrokerRegistration(1)); + assertEquals(1, changes.migratingZkBrokersChange()); + changes.handleBrokerChange(null, zkBrokerRegistration(2)); + changes.handleBrokerChange(null, zkBrokerRegistration(3)); + assertEquals(3, changes.migratingZkBrokersChange()); + + changes.handleBrokerChange(zkBrokerRegistration(3), brokerRegistration(3, true)); + changes.handleBrokerChange(brokerRegistration(3, true), brokerRegistration(3, false)); + assertEquals(2, changes.migratingZkBrokersChange()); + } + @Test public void testHandleDeletedTopic() { ControllerMetricsChanges changes = new ControllerMetricsChanges(); From 6736ecc436fd6be3441e69f9ef27aea2f10800ca Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 18 Jul 2023 09:30:50 -0400 Subject: [PATCH 3/5] fixup unit tests --- core/src/main/scala/kafka/controller/KafkaController.scala | 1 + .../kafka/controller/metrics/QuorumControllerMetrics.java | 1 + .../kafka/controller/metrics/QuorumControllerMetricsTest.java | 2 ++ 3 files changed, 4 insertions(+) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 8d3f6beff98bf..0bdd554a90bac 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -85,6 +85,7 @@ object KafkaController extends Logging { // package private for testing private[controller] val MetricNames = Set( + MetadataTypeMetric.METRIC_NAME, ActiveControllerCountMetricName, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java index 16be770ce8c1b..09703e0a09c10 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java @@ -219,6 +219,7 @@ public long timedOutHeartbeats() { @Override public void close() { registry.ifPresent(r -> Arrays.asList( + METADATA_TYPE, ACTIVE_CONTROLLER_COUNT, EVENT_QUEUE_TIME_MS, EVENT_QUEUE_PROCESSING_TIME_MS, diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java index 01bef439eb4e2..fd0179c9cd7e2 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java @@ -42,6 +42,7 @@ public void testMetricNamesNotInMigrationState() { new HashSet<>(Arrays.asList( "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs", "kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs", + "kafka.controller:type=KafkaController,name=MetadataType", "kafka.controller:type=KafkaController,name=ActiveControllerCount", "kafka.controller:type=KafkaController,name=LastCommittedRecordOffset", "kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs", @@ -67,6 +68,7 @@ public void testMetricNamesInMigrationState() { new HashSet<>(Arrays.asList( "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs", "kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs", + "kafka.controller:type=KafkaController,name=MetadataType", "kafka.controller:type=KafkaController,name=ActiveControllerCount", "kafka.controller:type=KafkaController,name=LastCommittedRecordOffset", "kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs", From f2351bf39ef31b16c659551997b0f8ac92b7f1d9 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 25 Jul 2023 14:32:57 -0400 Subject: [PATCH 4/5] PR feedback --- .../kafka/controller/KafkaController.scala | 10 ++++--- .../main/scala/kafka/server/KafkaBroker.scala | 7 +---- .../scala/kafka/server/SharedServer.scala | 5 +--- .../metrics/ControllerMetadataMetrics.java | 20 +++++++------- .../metrics/QuorumControllerMetrics.java | 10 ------- .../metadata/migration/ZkMigrationState.java | 9 ++++++- ...ontrollerMetadataMetricsPublisherTest.java | 2 +- .../ControllerMetadataMetricsTest.java | 8 +++--- .../server/metrics/MetadataTypeMetric.java | 26 ------------------- 9 files changed, 30 insertions(+), 67 deletions(-) delete mode 100644 server-common/src/main/java/org/apache/kafka/server/metrics/MetadataTypeMetric.java diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 032a177c04490..baba44f943b2f 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import kafka.api._ import kafka.common._ import kafka.cluster.Broker -import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback} +import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback, ZkMigrationStateMetricName} import kafka.coordinator.transaction.ZkProducerIdManager import kafka.server._ import kafka.server.metadata.ZkFinalizedFeatureCache @@ -44,8 +44,9 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.metadata.LeaderRecoveryState +import org.apache.kafka.metadata.migration.ZkMigrationState import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock} -import org.apache.kafka.server.metrics.{KafkaMetricsGroup, MetadataTypeMetric} +import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.KafkaScheduler import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code @@ -81,10 +82,11 @@ object KafkaController extends Logging { private val ReplicasIneligibleToDeleteCountMetricName = "ReplicasIneligibleToDeleteCount" private val ActiveBrokerCountMetricName = "ActiveBrokerCount" private val FencedBrokerCountMetricName = "FencedBrokerCount" + private val ZkMigrationStateMetricName = "ZkMigrationState" // package private for testing private[controller] val MetricNames = Set( - MetadataTypeMetric.METRIC_NAME, + ZkMigrationStateMetricName, ActiveControllerCountMetricName, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, @@ -173,7 +175,7 @@ class KafkaController(val config: KafkaConfig, /* single-thread scheduler to clean expired tokens */ private val tokenCleanScheduler = new KafkaScheduler(1, true, "delegation-token-cleaner") - metricsGroup.newGauge(MetadataTypeMetric.METRIC_NAME, () => MetadataTypeMetric.ZK) + metricsGroup.newGauge(ZkMigrationStateMetricName, () => ZkMigrationState.ZK) metricsGroup.newGauge(ActiveControllerCountMetricName, () => if (isActive) 1 else 0) metricsGroup.newGauge(OfflinePartitionsCountMetricName, () => offlinePartitionCount) metricsGroup.newGauge(PreferredReplicaImbalanceCountMetricName, () => preferredReplicaImbalanceCount) diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala index c7f1bf928a43e..cb66a9eb0895f 100644 --- a/core/src/main/scala/kafka/server/KafkaBroker.scala +++ b/core/src/main/scala/kafka/server/KafkaBroker.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.utils.Time import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.metadata.BrokerState import org.apache.kafka.server.authorizer.Authorizer -import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, MetadataTypeMetric} +import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.util.Scheduler import java.util @@ -101,11 +101,6 @@ trait KafkaBroker extends Logging { } } - metricsGroup.newGauge(MetadataTypeMetric.METRIC_NAME, () => this match { - case _: BrokerServer => MetadataTypeMetric.KRAFT - case _: KafkaServer => MetadataTypeMetric.ZK - case _ => throw new IllegalStateException() - }) metricsGroup.newGauge("BrokerState", () => brokerState.value) metricsGroup.newGauge("ClusterId", () => clusterId) metricsGroup.newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size) diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 4703f1f183936..e58b33a8d5719 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -242,10 +242,7 @@ class SharedServer( brokerMetrics = BrokerServerMetrics(metrics) } if (sharedServerConfig.processRoles.contains(ControllerRole)) { - controllerServerMetrics = new ControllerMetadataMetrics( - Optional.of(KafkaYammerMetrics.defaultRegistry()), - sharedServerConfig.migrationEnabled - ) + controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry())) } val _raftManager = new KafkaRaftManager[ApiMessageAndVersion]( metaProps, diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java index 47e5f79a4bd8f..37906b34b8bb5 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java @@ -68,9 +68,9 @@ public final class ControllerMetadataMetrics implements AutoCloseable { /** * Create a new ControllerMetadataMetrics object. * - * @param registry The metrics registry, or Optional.empty if this is a test and we don't have one. + * @param registry The metrics registry, or Optional.empty if this is a test and we don't have one. */ - public ControllerMetadataMetrics(Optional registry, boolean zkMigrationEnabled) { + public ControllerMetadataMetrics(Optional registry) { this.registry = registry; registry.ifPresent(r -> r.newGauge(FENCED_BROKER_COUNT, new Gauge() { @Override @@ -121,14 +121,13 @@ public Integer value() { } })); - if (zkMigrationEnabled) { - registry.ifPresent(r -> r.newGauge(MIGRATING_ZK_BROKER_COUNT, new Gauge() { - @Override - public Integer value() { - return migratingZkBrokerCount(); - } - })); - } + registry.ifPresent(r -> r.newGauge(MIGRATING_ZK_BROKER_COUNT, new Gauge() { + @Override + public Integer value() { + return migratingZkBrokerCount(); + } + })); + } public void setFencedBrokerCount(int brokerCount) { @@ -155,7 +154,6 @@ public int activeBrokerCount() { return this.activeBrokerCount.get(); } - public void setMigratingZkBrokerCount(int brokerCount) { this.migratingZkBrokerCount.set(brokerCount); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java index 09703e0a09c10..9313f6958c2f9 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java @@ -23,7 +23,6 @@ import com.yammer.metrics.core.MetricsRegistry; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.metrics.KafkaYammerMetrics; -import org.apache.kafka.server.metrics.MetadataTypeMetric; import java.util.Arrays; import java.util.Optional; @@ -39,8 +38,6 @@ * @link{org.apache.kafka.controller.metrics.ControllerMetadataMetrics}, not here. */ public class QuorumControllerMetrics implements AutoCloseable { - private final static MetricName METADATA_TYPE = getMetricName( - "KafkaController", MetadataTypeMetric.METRIC_NAME); private final static MetricName ACTIVE_CONTROLLER_COUNT = getMetricName( "KafkaController", "ActiveControllerCount"); private final static MetricName EVENT_QUEUE_TIME_MS = getMetricName( @@ -91,12 +88,6 @@ public QuorumControllerMetrics( ) { this.registry = registry; this.active = false; - registry.ifPresent(r -> r.newGauge(METADATA_TYPE, new Gauge() { - @Override - public Integer value() { - return MetadataTypeMetric.KRAFT; - } - })); registry.ifPresent(r -> r.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge() { @Override public Integer value() { @@ -219,7 +210,6 @@ public long timedOutHeartbeats() { @Override public void close() { registry.ifPresent(r -> Arrays.asList( - METADATA_TYPE, ACTIVE_CONTROLLER_COUNT, EVENT_QUEUE_TIME_MS, EVENT_QUEUE_PROCESSING_TIME_MS, diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java index 8d5e584831c65..ff8ebd08b38a2 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java @@ -62,7 +62,14 @@ public enum ZkMigrationState { * will persist indefinitely after the migration. In operational terms, this is the same as the NONE * state. */ - POST_MIGRATION((byte) 3); + POST_MIGRATION((byte) 3), + + /** + * The controller is a ZK controller. No migration has been performed. This state is never persisted + * and is only used by KafkaController in order to have a unified metric that indicates what kind of + * metadata state the controller is in. + */ + ZK((byte) 4); private final byte value; diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java index d534a0d4a4763..e05123365f6d0 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java @@ -53,7 +53,7 @@ static class TestEnv implements AutoCloseable { MockFaultHandler faultHandler = new MockFaultHandler("ControllerMetadataMetricsPublisher"); ControllerMetadataMetrics metrics = - new ControllerMetadataMetrics(Optional.empty(), false); + new ControllerMetadataMetrics(Optional.empty()); ControllerMetadataMetricsPublisher publisher = new ControllerMetadataMetricsPublisher(metrics, faultHandler); diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java index cd42ce84a6cda..c0f3b04f4ff6d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java @@ -36,7 +36,7 @@ public class ControllerMetadataMetricsTest { public void testMetricNames() { MetricsRegistry registry = new MetricsRegistry(); try { - try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry), false)) { + try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry))) { ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller:", new HashSet<>(Arrays.asList( "kafka.controller:type=KafkaController,name=ActiveBrokerCount", @@ -60,7 +60,7 @@ public void testMetricNames() { public void testZkMigrationMetricNames() { MetricsRegistry registry = new MetricsRegistry(); try { - try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry), true)) { + try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry))) { ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller:", new HashSet<>(Arrays.asList( "kafka.controller:type=KafkaController,name=ActiveBrokerCount", @@ -84,7 +84,7 @@ public void testZkMigrationMetricNames() { @Test public void testMetadataErrorCount() { MetricsRegistry registry = new MetricsRegistry(); - try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry), false)) { + try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry))) { @SuppressWarnings("unchecked") Gauge metadataErrorCount = (Gauge) registry .allMetrics() @@ -109,7 +109,7 @@ private void testIntGaugeMetric( BiConsumer incrementer ) { MetricsRegistry registry = new MetricsRegistry(); - try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry), false)) { + try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry))) { assertEquals(0, metricsGetter.apply(metrics)); assertEquals(0, registryGetter.apply(registry)); setter.accept(metrics, 123); diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/MetadataTypeMetric.java b/server-common/src/main/java/org/apache/kafka/server/metrics/MetadataTypeMetric.java deleted file mode 100644 index 00e19359fd3e2..0000000000000 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/MetadataTypeMetric.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.server.metrics; - -/** - * Integer values to represent Kafka metadata types as defined in KIP-866 - */ -public class MetadataTypeMetric { - public static final String METRIC_NAME = "MetadataType"; - public static final int ZK = 1; - public static final int KRAFT = 2; -} From e9831b58148873503ad7c1a554f97fb422f6116c Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 25 Jul 2023 15:39:29 -0400 Subject: [PATCH 5/5] throw if we actually see ZkMigrationState.ZK --- .../main/java/org/apache/kafka/controller/QuorumController.java | 2 ++ .../apache/kafka/metadata/migration/KRaftMigrationDriver.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 199c679cb35c0..6a608d1dd7995 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1273,6 +1273,8 @@ public static List generateActivationRecords( "has been completed."); } break; + default: + throw new IllegalStateException("Unsupported ZkMigrationState " + featureControl.zkMigrationState()); } } else { if (zkMigrationEnabled) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 1012dedd176df..93379f3aa013c 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -572,6 +572,8 @@ public void run() throws Exception { log.error("KRaft controller indicates a completed migration, but the migration driver is somehow active."); transitionTo(MigrationDriverState.INACTIVE); break; + default: + throw new IllegalStateException("Unsupported ZkMigrationState " + zkMigrationState); } } }