diff --git a/README.md b/README.md
index 9b85f0835084f..49dd49ef6594e 100644
--- a/README.md
+++ b/README.md
@@ -288,4 +288,4 @@ See [vagrant/README.md](vagrant/README.md).
Apache Kafka is interested in building the community; we would welcome any thoughts or [patches](https://issues.apache.org/jira/browse/KAFKA). You can reach us [on the Apache mailing lists](http://kafka.apache.org/contact.html).
To contribute follow the instructions here:
- * https://kafka.apache.org/contributing.html
+ * https://kafka.apache.org/contributing.html
diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml
index 2cbb5504293b0..73b74933a9469 100644
--- a/checkstyle/import-control-metadata.xml
+++ b/checkstyle/import-control-metadata.xml
@@ -63,7 +63,6 @@
-
@@ -73,7 +72,6 @@
-
@@ -93,13 +91,17 @@
-
+
+
+
+
+
@@ -122,6 +124,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index e58b33a8d5719..892c528885a44 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -25,7 +25,9 @@ import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
+import org.apache.kafka.image.MetadataProvenance
import org.apache.kafka.image.loader.MetadataLoader
+import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.RaftConfig.AddressSpec
@@ -34,7 +36,7 @@ import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, Process
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import java.util
-import java.util.{Collections, Optional}
+import java.util.Optional
import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
@@ -106,6 +108,7 @@ class SharedServer(
val snapshotsDisabledReason = new AtomicReference[String](null)
@volatile var snapshotEmitter: SnapshotEmitter = _
@volatile var snapshotGenerator: SnapshotGenerator = _
+ @volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _
def isUsed(): Boolean = synchronized {
usedByController || usedByBroker
@@ -259,15 +262,24 @@ class SharedServer(
raftManager = _raftManager
_raftManager.startup()
+ metadataLoaderMetrics = if (brokerMetrics != null) {
+ new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+ elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs),
+ batchSize => brokerMetrics.updateBatchSize(batchSize),
+ brokerMetrics.lastAppliedImageProvenance)
+ } else {
+ new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+ _ => {},
+ _ => {},
+ new AtomicReference[MetadataProvenance](MetadataProvenance.EMPTY))
+ }
val loaderBuilder = new MetadataLoader.Builder().
setNodeId(metaProps.nodeId).
setTime(time).
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
setFaultHandler(metadataLoaderFaultHandler).
- setHighWaterMarkAccessor(() => _raftManager.client.highWatermark())
- if (brokerMetrics != null) {
- loaderBuilder.setMetadataLoaderMetrics(brokerMetrics)
- }
+ setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()).
+ setMetrics(metadataLoaderMetrics)
loader = loaderBuilder.build()
snapshotEmitter = new SnapshotEmitter.Builder().
setNodeId(metaProps.nodeId).
@@ -282,15 +294,15 @@ class SharedServer(
setDisabledReason(snapshotsDisabledReason).
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
build()
- _raftManager.register(loader)
try {
- loader.installPublishers(Collections.singletonList(snapshotGenerator))
+ loader.installPublishers(util.Arrays.asList(snapshotGenerator)).get()
} catch {
case t: Throwable => {
error("Unable to install metadata publishers", t)
throw new RuntimeException("Unable to install metadata publishers.", t)
}
}
+ _raftManager.register(loader)
debug("Completed SharedServer startup.")
started = true
} catch {
@@ -326,6 +338,10 @@ class SharedServer(
CoreUtils.swallow(loader.close(), this)
loader = null
}
+ if (metadataLoaderMetrics != null) {
+ CoreUtils.swallow(metadataLoaderMetrics.close(), this)
+ metadataLoaderMetrics = null
+ }
if (snapshotGenerator != null) {
CoreUtils.swallow(snapshotGenerator.close(), this)
snapshotGenerator = null
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
index 212909101f270..ff1833241663d 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -23,7 +23,6 @@ import org.apache.kafka.common.metrics.Gauge
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.MetricConfig
import org.apache.kafka.image.MetadataProvenance
-import org.apache.kafka.image.loader.MetadataLoaderMetrics
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import java.util.Collections
@@ -31,7 +30,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS
final class BrokerServerMetrics private (
metrics: Metrics
-) extends MetadataLoaderMetrics {
+) extends AutoCloseable {
import BrokerServerMetrics._
private val batchProcessingTimeHistName = KafkaMetricsGroup.explicitMetricName("kafka.server",
@@ -123,15 +122,15 @@ final class BrokerServerMetrics private (
).foreach(metrics.removeMetric)
}
- override def updateBatchProcessingTime(elapsedNs: Long): Unit =
+ def updateBatchProcessingTime(elapsedNs: Long): Unit =
batchProcessingTimeHist.update(NANOSECONDS.toMicros(elapsedNs))
- override def updateBatchSize(size: Int): Unit = batchSizeHist.update(size)
+ def updateBatchSize(size: Int): Unit = batchSizeHist.update(size)
- override def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit =
+ def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit =
lastAppliedImageProvenance.set(provenance)
- override def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().lastContainedOffset()
+ def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().lastContainedOffset()
def lastAppliedTimestamp(): Long = lastAppliedImageProvenance.get().lastContainedLogTimeMs()
}
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 37bd66cd2ae61..94e6118d7d476 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, DescribeClusterResponse}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{Cluster, Endpoint, Reconfigurable, TopicPartition, TopicPartitionInfo}
-import org.apache.kafka.controller.QuorumController
+import org.apache.kafka.controller.{QuorumController, QuorumControllerIntegrationTestUtils}
import org.apache.kafka.image.ClusterImage
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer._
@@ -1153,7 +1153,7 @@ class KRaftClusterTest {
val controller = cluster.controllers().values().iterator().next()
controller.controller.waitForReadyBrokers(3).get()
TestUtils.retry(60000) {
- val latch = controller.controller.asInstanceOf[QuorumController].pause()
+ val latch = QuorumControllerIntegrationTestUtils.pause(controller.controller.asInstanceOf[QuorumController])
Thread.sleep(1001)
latch.countDown()
assertEquals(0, controller.sharedServer.controllerServerMetrics.fencedBrokerCount())
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 111937b3fa3db..199c679cb35c0 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -126,7 +126,6 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -461,6 +460,12 @@ private void handleEventEnd(String name, long startProcessingTimeNs) {
private Throwable handleEventException(String name,
OptionalLong startProcessingTimeNs,
Throwable exception) {
+ if (!startProcessingTimeNs.isPresent() &&
+ ControllerExceptions.isTimeoutException(exception)) {
+ // If the event never started, and the exception is a timeout, increment the timed
+ // out metric.
+ controllerMetrics.incrementOperationsTimedOut();
+ }
Throwable externalException =
ControllerExceptions.toExternalException(exception, () -> latestController());
if (!startProcessingTimeNs.isPresent()) {
@@ -492,6 +497,15 @@ private Throwable handleEventException(String name,
return externalException;
}
+ private long updateEventStartMetricsAndGetTime(OptionalLong eventCreatedTimeNs) {
+ long now = time.nanoseconds();
+ controllerMetrics.incrementOperationsStarted();
+ if (eventCreatedTimeNs.isPresent()) {
+ controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs.getAsLong()));
+ }
+ return now;
+ }
+
/**
* A controller event for handling internal state changes, such as Raft inputs.
*/
@@ -508,9 +522,8 @@ class ControllerEvent implements EventQueue.Event {
@Override
public void run() throws Exception {
- long now = time.nanoseconds();
- controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
- startProcessingTimeNs = OptionalLong.of(now);
+ startProcessingTimeNs = OptionalLong.of(
+ updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
log.debug("Executing {}.", this);
handler.run();
handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
@@ -527,11 +540,16 @@ public String toString() {
}
}
- private void appendControlEvent(String name, Runnable handler) {
+ void appendControlEvent(String name, Runnable handler) {
ControllerEvent event = new ControllerEvent(name, handler);
queue.append(event);
}
+ void appendControlEventWithDeadline(String name, Runnable handler, long deadlineNs) {
+ ControllerEvent event = new ControllerEvent(name, handler);
+ queue.appendWithDeadline(deadlineNs, event);
+ }
+
/**
* A controller event that reads the committed internal state in order to expose it
* to an API.
@@ -555,9 +573,8 @@ CompletableFuture future() {
@Override
public void run() throws Exception {
- long now = time.nanoseconds();
- controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
- startProcessingTimeNs = OptionalLong.of(now);
+ startProcessingTimeNs = OptionalLong.of(
+ updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
T value = handler.get();
handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
future.complete(value);
@@ -692,12 +709,11 @@ CompletableFuture future() {
@Override
public void run() throws Exception {
- long now = time.nanoseconds();
- if (!flags.contains(DOES_NOT_UPDATE_QUEUE_TIME)) {
- // We exclude deferred events from the event queue time metric to prevent
- // incorrectly including the deferral time in the queue time.
- controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
- }
+ // Deferred events set the DOES_NOT_UPDATE_QUEUE_TIME flag to prevent incorrectly
+ // including their deferral time in the event queue time.
+ startProcessingTimeNs = OptionalLong.of(
+ updateEventStartMetricsAndGetTime(flags.contains(DOES_NOT_UPDATE_QUEUE_TIME) ?
+ OptionalLong.empty() : OptionalLong.of(eventCreatedTimeNs)));
int controllerEpoch = curClaimEpoch;
if (!isActiveController(controllerEpoch)) {
throw ControllerExceptions.newWrongControllerException(latestController());
@@ -706,7 +722,6 @@ public void run() throws Exception {
log.info("Cannot run write operation {} in pre-migration mode. Returning NOT_CONTROLLER.", name);
throw ControllerExceptions.newPreMigrationException(latestController());
}
- startProcessingTimeNs = OptionalLong.of(now);
ControllerResult result = op.generateRecordsAndResult();
if (result.records().isEmpty()) {
op.processBatchEndOffset(writeOffset);
@@ -1063,6 +1078,9 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
appendRaftEvent("handleLeaderChange[" + newLeader.epoch() + "]", () -> {
final String newLeaderName = newLeader.leaderId().isPresent() ?
String.valueOf(newLeader.leaderId().getAsInt()) : "(none)";
+ if (newLeader.leaderId().isPresent()) {
+ controllerMetrics.incrementNewActiveControllers();
+ }
if (isActiveController()) {
if (newLeader.isLeader(nodeId)) {
log.warn("We were the leader in epoch {}, and are still the leader " +
@@ -1308,7 +1326,7 @@ private void updateLastCommittedState(
}
}
- private void renounce() {
+ void renounce() {
try {
if (curClaimEpoch == -1) {
throw new RuntimeException("Cannot renounce leadership because we are not the " +
@@ -2302,20 +2320,12 @@ public void close() throws InterruptedException {
}
// VisibleForTesting
- public CountDownLatch pause() {
- final CountDownLatch latch = new CountDownLatch(1);
- appendControlEvent("pause", () -> {
- try {
- latch.await();
- } catch (InterruptedException e) {
- log.info("Interrupted while waiting for unpause.", e);
- }
- });
- return latch;
+ Time time() {
+ return time;
}
// VisibleForTesting
- Time time() {
- return time;
+ QuorumControllerMetrics controllerMetrics() {
+ return controllerMetrics;
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index e267ebdfb9a08..6c7aa581a7939 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -45,7 +45,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private final static MetricName ZK_WRITE_BEHIND_LAG = getMetricName(
- "KafkaController", "ZKWriteBehindLag");
+ "KafkaController", "ZKWriteBehindLag");
private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
@@ -54,6 +54,14 @@ public class QuorumControllerMetrics implements AutoCloseable {
"KafkaController", "LastAppliedRecordTimestamp");
private final static MetricName LAST_APPLIED_RECORD_LAG_MS = getMetricName(
"KafkaController", "LastAppliedRecordLagMs");
+ private final static MetricName TIMED_OUT_BROKER_HEARTBEAT_COUNT = getMetricName(
+ "KafkaController", "TimedOutBrokerHeartbeatCount");
+ private final static MetricName EVENT_QUEUE_OPERATIONS_STARTED_COUNT = getMetricName(
+ "KafkaController", "EventQueueOperationsStartedCount");
+ private final static MetricName EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT = getMetricName(
+ "KafkaController", "EventQueueOperationsTimedOutCount");
+ private final static MetricName NEW_ACTIVE_CONTROLLERS_COUNT = getMetricName(
+ "KafkaController", "NewActiveControllersCount");
private final Optional registry;
private volatile boolean active;
@@ -64,6 +72,9 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final Consumer eventQueueTimeUpdater;
private final Consumer eventQueueProcessingTimeUpdater;
private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
+ private final AtomicLong operationsStarted = new AtomicLong(0);
+ private final AtomicLong operationsTimedOut = new AtomicLong(0);
+ private final AtomicLong newActiveControllers = new AtomicLong(0);
private Consumer newHistogram(MetricName name, boolean biased) {
if (registry.isPresent()) {
@@ -113,7 +124,30 @@ public Long value() {
return time.milliseconds() - lastAppliedRecordTimestamp();
}
}));
-
+ registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, new Gauge() {
+ @Override
+ public Long value() {
+ return timedOutHeartbeats();
+ }
+ }));
+ registry.ifPresent(r -> r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge() {
+ @Override
+ public Long value() {
+ return operationsStarted();
+ }
+ }));
+ registry.ifPresent(r -> r.newGauge(EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT, new Gauge() {
+ @Override
+ public Long value() {
+ return operationsTimedOut();
+ }
+ }));
+ registry.ifPresent(r -> r.newGauge(NEW_ACTIVE_CONTROLLERS_COUNT, new Gauge() {
+ @Override
+ public Long value() {
+ return newActiveControllers();
+ }
+ }));
if (zkMigrationState) {
registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new Gauge() {
@Override
@@ -176,17 +210,37 @@ public long dualWriteOffset() {
}
public void incrementTimedOutHeartbeats() {
- timedOutHeartbeats.addAndGet(1);
- }
-
- public void setTimedOutHeartbeats(long heartbeats) {
- timedOutHeartbeats.set(heartbeats);
+ timedOutHeartbeats.incrementAndGet();
}
public long timedOutHeartbeats() {
return timedOutHeartbeats.get();
}
+ public void incrementOperationsStarted() {
+ operationsStarted.incrementAndGet();
+ }
+
+ public long operationsStarted() {
+ return operationsStarted.get();
+ }
+
+ public void incrementOperationsTimedOut() {
+ operationsTimedOut.incrementAndGet();
+ }
+
+ public long operationsTimedOut() {
+ return operationsTimedOut.get();
+ }
+
+ public void incrementNewActiveControllers() {
+ newActiveControllers.incrementAndGet();
+ }
+
+ public long newActiveControllers() {
+ return newActiveControllers.get();
+ }
+
@Override
public void close() {
registry.ifPresent(r -> Arrays.asList(
@@ -197,6 +251,10 @@ public void close() {
LAST_COMMITTED_RECORD_OFFSET,
LAST_APPLIED_RECORD_TIMESTAMP,
LAST_APPLIED_RECORD_LAG_MS,
+ TIMED_OUT_BROKER_HEARTBEAT_COUNT,
+ EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
+ EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
+ NEW_ACTIVE_CONTROLLERS_COUNT,
ZK_WRITE_BEHIND_LAG
).forEach(r::removeMetric));
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 768fcb2574b2f..c2c066418b952 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -22,6 +22,7 @@
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.image.writer.ImageReWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
@@ -35,14 +36,17 @@
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.snapshot.SnapshotReader;
+import org.apache.kafka.snapshot.Snapshots;
import org.slf4j.Logger;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -69,28 +73,7 @@ public static class Builder {
private Time time = Time.SYSTEM;
private LogContext logContext = null;
private FaultHandler faultHandler = (m, e) -> new FaultHandlerException(m, e);
- private MetadataLoaderMetrics metrics = new MetadataLoaderMetrics() {
- private volatile long lastAppliedOffset = -1L;
-
- @Override
- public void updateBatchProcessingTime(long elapsedNs) { }
-
- @Override
- public void updateBatchSize(int size) { }
-
- @Override
- public void updateLastAppliedImageProvenance(MetadataProvenance provenance) {
- this.lastAppliedOffset = provenance.lastContainedOffset();
- }
-
- @Override
- public long lastAppliedOffset() {
- return lastAppliedOffset;
- }
-
- @Override
- public void close() throws Exception { }
- };
+ private MetadataLoaderMetrics metrics = null;
private Supplier highWaterMarkAccessor = null;
public Builder setNodeId(int nodeId) {
@@ -113,13 +96,13 @@ public Builder setFaultHandler(FaultHandler faultHandler) {
return this;
}
- public Builder setMetadataLoaderMetrics(MetadataLoaderMetrics metrics) {
- this.metrics = metrics;
+ public Builder setHighWaterMarkAccessor(Supplier highWaterMarkAccessor) {
+ this.highWaterMarkAccessor = highWaterMarkAccessor;
return this;
}
- public Builder setHighWaterMarkAccessor(Supplier highWaterMarkAccessor) {
- this.highWaterMarkAccessor = highWaterMarkAccessor;
+ public Builder setMetrics(MetadataLoaderMetrics metrics) {
+ this.metrics = metrics;
return this;
}
@@ -130,6 +113,12 @@ public MetadataLoader build() {
if (highWaterMarkAccessor == null) {
throw new RuntimeException("You must set the high water mark accessor.");
}
+ if (metrics == null) {
+ metrics = new MetadataLoaderMetrics(Optional.empty(),
+ __ -> { },
+ __ -> { },
+ new AtomicReference<>(MetadataProvenance.EMPTY));
+ }
return new MetadataLoader(
time,
logContext,
@@ -221,6 +210,11 @@ private MetadataLoader(
new ShutdownEvent());
}
+ // VisibleForTesting
+ MetadataLoaderMetrics metrics() {
+ return metrics;
+ }
+
private boolean stillNeedToCatchUp(String where, long offset) {
if (!catchingUp) {
log.trace("{}: we are not in the initial catching up state.", where);
@@ -349,6 +343,9 @@ public void handleCommit(BatchReader reader) {
}
}
metrics.updateLastAppliedImageProvenance(image.provenance());
+ if (delta.featuresDelta() != null) {
+ metrics.setCurrentMetadataVersion(image.features().metadataVersion());
+ }
if (uninitializedPublishers.isEmpty()) {
scheduleInitializeNewPublishers(0);
}
@@ -406,7 +403,7 @@ LogDeltaManifest loadLogDelta(
MetadataProvenance provenance =
new MetadataProvenance(lastOffset, lastEpoch, lastContainedLogTimeMs);
long elapsedNs = time.nanoseconds() - startNs;
- metrics.updateBatchProcessingTime(elapsedNs);
+ metrics.updateBatchProcessingTimeNs(elapsedNs);
return new LogDeltaManifest(provenance,
currentLeaderAndEpoch,
numBatches,
@@ -418,24 +415,30 @@ LogDeltaManifest loadLogDelta(
public void handleLoadSnapshot(SnapshotReader reader) {
eventQueue.append(() -> {
try {
+ long numLoaded = metrics.incrementHandleLoadSnapshotCount();
+ String snapshotName = Snapshots.filenameFromSnapshotId(reader.snapshotId());
+ log.info("handleLoadSnapshot({}): incrementing HandleLoadSnapshotCount to {}.",
+ snapshotName, numLoaded);
MetadataDelta delta = new MetadataDelta.Builder().
setImage(image).
build();
SnapshotManifest manifest = loadSnapshot(delta, reader);
- log.info("handleLoadSnapshot: generated a metadata delta from a snapshot at offset {} " +
- "in {} us.", manifest.provenance().lastContainedOffset(),
+ log.info("handleLoadSnapshot({}): generated a metadata delta between offset {} " +
+ "and this snapshot in {} us.", snapshotName,
+ image.provenance().lastContainedOffset(),
NANOSECONDS.toMicros(manifest.elapsedNs()));
try {
image = delta.apply(manifest.provenance());
} catch (Throwable e) {
faultHandler.handleFault("Error generating new metadata image from " +
- "snapshot at offset " + reader.lastContainedLogOffset(), e);
+ "snapshot " + snapshotName, e);
return;
}
if (stillNeedToCatchUp("handleLoadSnapshot", manifest.provenance().lastContainedOffset())) {
return;
}
- log.info("handleLoadSnapshot: publishing new snapshot image with provenance {}.", image.provenance());
+ log.info("handleLoadSnapshot({}): publishing new snapshot image to {} publisher(s).",
+ snapshotName, publishers.size());
for (MetadataPublisher publisher : publishers.values()) {
try {
publisher.onMetadataUpdate(delta, image, manifest);
@@ -446,6 +449,7 @@ public void handleLoadSnapshot(SnapshotReader reader) {
}
}
metrics.updateLastAppliedImageProvenance(image.provenance());
+ metrics.setCurrentMetadataVersion(image.features().metadataVersion());
if (uninitializedPublishers.isEmpty()) {
scheduleInitializeNewPublishers(0);
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java
deleted file mode 100644
index 654bc9dd505d9..0000000000000
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.image.loader;
-
-import org.apache.kafka.image.MetadataProvenance;
-
-
-/**
- * An interface for the metadata loader metrics.
- */
-public interface MetadataLoaderMetrics extends AutoCloseable {
- /**
- * Update the batch processing time histogram.
- */
- void updateBatchProcessingTime(long elapsedNs);
-
- /**
- * Update the batch size histogram.
- */
- void updateBatchSize(int size);
-
- /**
- * Set the provenance of the last image which has been processed by all publishers.
- */
- void updateLastAppliedImageProvenance(MetadataProvenance provenance);
-
- /**
- * Retrieve the last offset which has been processed by all publishers.
- */
- long lastAppliedOffset();
-}
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
new file mode 100644
index 0000000000000..351ac4fc5c354
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+/**
+ * These are the metrics which are managed by the MetadataLoader class.
+ */
+public final class MetadataLoaderMetrics implements AutoCloseable {
+ private final static MetricName CURRENT_METADATA_VERSION = getMetricName(
+ "MetadataLoader", "CurrentMetadataVersion");
+ private final static MetricName HANDLE_LOAD_SNAPSHOT_COUNT = getMetricName(
+ "MetadataLoader", "HandleLoadSnapshotCount");
+
+ private final Optional registry;
+ private final AtomicReference currentMetadataVersion =
+ new AtomicReference<>(MetadataVersion.MINIMUM_KRAFT_VERSION);
+ private final AtomicLong handleLoadSnapshotCount = new AtomicLong(0);
+ private final Consumer batchProcessingTimeNsUpdater;
+ private final Consumer batchSizesUpdater;
+ private final AtomicReference lastAppliedProvenance;
+
+ /**
+ * Create a new LoaderMetrics object.
+ *
+ * @param registry The metrics registry, or Optional.empty if this is a
+ * test and we don't have one.
+ * @param batchProcessingTimeNsUpdater Updates the batch processing time histogram.
+ * @param batchSizesUpdater Updates the batch sizes histogram.
+ */
+ public MetadataLoaderMetrics(
+ Optional registry,
+ Consumer batchProcessingTimeNsUpdater,
+ Consumer batchSizesUpdater,
+ AtomicReference lastAppliedProvenance
+ ) {
+ this.registry = registry;
+ this.batchProcessingTimeNsUpdater = batchProcessingTimeNsUpdater;
+ this.batchSizesUpdater = batchSizesUpdater;
+ this.lastAppliedProvenance = lastAppliedProvenance;
+ registry.ifPresent(r -> r.newGauge(CURRENT_METADATA_VERSION, new Gauge() {
+ @Override
+ public Integer value() {
+ return Integer.valueOf(currentMetadataVersion().featureLevel());
+ }
+ }));
+ registry.ifPresent(r -> r.newGauge(HANDLE_LOAD_SNAPSHOT_COUNT, new Gauge() {
+ @Override
+ public Long value() {
+ return handleLoadSnapshotCount();
+ }
+ }));
+ }
+
+ /**
+ * Update the batch processing time histogram.
+ */
+ public void updateBatchProcessingTimeNs(long elapsedNs) {
+ batchProcessingTimeNsUpdater.accept(elapsedNs);
+ }
+
+ /**
+ * Update the batch size histogram.
+ */
+ public void updateBatchSize(int size) {
+ batchSizesUpdater.accept(size);
+ }
+
+ /**
+ * Set the provenance of the last image which has been processed by all publishers.
+ */
+ public void updateLastAppliedImageProvenance(MetadataProvenance lastAppliedProvenance) {
+ this.lastAppliedProvenance.set(lastAppliedProvenance);
+ }
+
+ /**
+ * Retrieve the last offset which has been processed by all publishers.
+ */
+ public long lastAppliedOffset() {
+ return this.lastAppliedProvenance.get().lastContainedOffset();
+ }
+
+ public void setCurrentMetadataVersion(MetadataVersion metadataVersion) {
+ this.currentMetadataVersion.set(metadataVersion);
+ }
+
+ public MetadataVersion currentMetadataVersion() {
+ return this.currentMetadataVersion.get();
+ }
+
+ public long incrementHandleLoadSnapshotCount() {
+ return this.handleLoadSnapshotCount.incrementAndGet();
+ }
+
+ public long handleLoadSnapshotCount() {
+ return this.handleLoadSnapshotCount.get();
+ }
+
+ @Override
+ public void close() {
+ registry.ifPresent(r -> Arrays.asList(
+ CURRENT_METADATA_VERSION,
+ HANDLE_LOAD_SNAPSHOT_COUNT
+ ).forEach(r::removeMetric));
+ }
+
+ private static MetricName getMetricName(String type, String name) {
+ return KafkaYammerMetrics.getMetricName("kafka.server", type, name);
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java
index ec46bcc06faec..8ab224f91f2a4 100644
--- a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java
+++ b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java
@@ -18,9 +18,11 @@
package org.apache.kafka.image.publisher;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RaftSnapshotWriter;
import org.apache.kafka.raft.RaftClient;
@@ -43,9 +45,16 @@ public class SnapshotEmitter implements SnapshotGenerator.Emitter {
private final static int DEFAULT_BATCH_SIZE = 1024;
public static class Builder {
+ private Time time = Time.SYSTEM;
private int nodeId = 0;
private RaftClient raftClient = null;
private int batchSize = DEFAULT_BATCH_SIZE;
+ private SnapshotEmitterMetrics metrics = null;
+
+ public Builder setTime(Time time) {
+ this.time = time;
+ return this;
+ }
public Builder setNodeId(int nodeId) {
this.nodeId = nodeId;
@@ -62,11 +71,21 @@ public Builder setBatchSize(int batchSize) {
return this;
}
+ public Builder setMetrics(SnapshotEmitterMetrics metrics) {
+ this.metrics = metrics;
+ return this;
+ }
+
public SnapshotEmitter build() {
if (raftClient == null) throw new RuntimeException("You must set the raftClient.");
- return new SnapshotEmitter(nodeId,
+ if (metrics == null) metrics = new SnapshotEmitterMetrics(
+ Optional.empty(),
+ time);
+ return new SnapshotEmitter(time,
+ nodeId,
raftClient,
- batchSize);
+ batchSize,
+ metrics);
}
}
@@ -75,6 +94,11 @@ public SnapshotEmitter build() {
*/
private final Logger log;
+ /**
+ * The clock object.
+ */
+ private final Time time;
+
/**
* The RaftClient to use.
*/
@@ -85,14 +109,27 @@ public SnapshotEmitter build() {
*/
private final int batchSize;
+ /**
+ * The metrics to use.
+ */
+ private final SnapshotEmitterMetrics metrics;
+
private SnapshotEmitter(
- int nodeId,
- RaftClient raftClient,
- int batchSize
+ Time time,
+ int nodeId,
+ RaftClient raftClient,
+ int batchSize,
+ SnapshotEmitterMetrics metrics
) {
+ this.time = time;
this.log = new LogContext("[SnapshotEmitter id=" + nodeId + "] ").logger(SnapshotEmitter.class);
this.raftClient = raftClient;
this.batchSize = batchSize;
+ this.metrics = metrics;
+ }
+
+ SnapshotEmitterMetrics metrics() {
+ return metrics;
}
@Override
@@ -112,6 +149,9 @@ public void maybeEmit(MetadataImage image) {
setMetadataVersion(image.features().metadataVersion()).
build());
writer.close(true);
+ metrics.setLatestSnapshotGeneratedTimeMs(time.milliseconds());
+ metrics.setLatestSnapshotGeneratedBytes(writer.frozenSize().getAsLong());
+ log.info("Successfully wrote {}", provenance.snapshotName());
} catch (Throwable e) {
log.error("Encountered error while writing {}", provenance.snapshotName(), e);
throw e;
@@ -119,6 +159,5 @@ public void maybeEmit(MetadataImage image) {
Utils.closeQuietly(writer, "RaftSnapshotWriter");
Utils.closeQuietly(snapshotWriter.get(), "SnapshotWriter");
}
- log.info("Successfully wrote {}", provenance.snapshotName());
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java b/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java
new file mode 100644
index 0000000000000..5e59942c960c3
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * These are the metrics which are managed by the SnapshotEmitter class.
+ */
+public final class SnapshotEmitterMetrics implements AutoCloseable {
+ private final static MetricName LATEST_SNAPSHOT_GENERATED_BYTES = getMetricName(
+ "SnapshotEmitter", "LatestSnapshotGeneratedBytes");
+ private final static MetricName LATEST_SNAPSHOT_GENERATED_AGE_MS = getMetricName(
+ "SnapshotEmitter", "LatestSnapshotGeneratedAgeMs");
+
+ private final Optional registry;
+ private final Time time;
+ private final AtomicLong latestSnapshotGeneratedBytes;
+ private final AtomicLong latestSnapshotGeneratedTimeMs;
+
+ /**
+ * Create a new LoaderMetrics object.
+ *
+ * @param registry The metrics registry, or Optional.empty if this is a test and we don't have one.
+ */
+ public SnapshotEmitterMetrics(
+ Optional registry,
+ Time time
+ ) {
+ this.registry = registry;
+ this.time = time;
+ this.latestSnapshotGeneratedBytes = new AtomicLong(0L);
+ this.latestSnapshotGeneratedTimeMs = new AtomicLong(time.milliseconds());
+ registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_BYTES, new Gauge() {
+ @Override
+ public Long value() {
+ return latestSnapshotGeneratedBytes();
+ }
+ }));
+ registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_AGE_MS, new Gauge() {
+ @Override
+ public Long value() {
+ return latestSnapshotGeneratedAgeMs();
+ }
+ }));
+ }
+
+ public void setLatestSnapshotGeneratedBytes(long value) {
+ this.latestSnapshotGeneratedBytes.set(value);
+ }
+
+ public long latestSnapshotGeneratedBytes() {
+ return this.latestSnapshotGeneratedBytes.get();
+ }
+
+ public void setLatestSnapshotGeneratedTimeMs(long timeMs) {
+ this.latestSnapshotGeneratedTimeMs.set(timeMs);
+ }
+
+ public long latestSnapshotGeneratedTimeMs() {
+ return this.latestSnapshotGeneratedTimeMs.get();
+ }
+
+ public long latestSnapshotGeneratedAgeMs() {
+ return time.milliseconds() - this.latestSnapshotGeneratedTimeMs.get();
+ }
+
+ @Override
+ public void close() {
+ registry.ifPresent(r -> Arrays.asList(
+ LATEST_SNAPSHOT_GENERATED_BYTES,
+ LATEST_SNAPSHOT_GENERATED_AGE_MS
+ ).forEach(r::removeMetric));
+ }
+
+ private static MetricName getMetricName(String type, String name) {
+ return KafkaYammerMetrics.getMetricName("kafka.server", type, name);
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/writer/RaftSnapshotWriter.java b/metadata/src/main/java/org/apache/kafka/image/writer/RaftSnapshotWriter.java
index e1faad90dc3a2..eec5cd90c762a 100644
--- a/metadata/src/main/java/org/apache/kafka/image/writer/RaftSnapshotWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/image/writer/RaftSnapshotWriter.java
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.OptionalLong;
/**
@@ -31,6 +32,7 @@ public class RaftSnapshotWriter implements ImageWriter {
private final SnapshotWriter snapshotWriter;
private final int batchSize;
private List records;
+ private OptionalLong frozenSize = OptionalLong.empty();
public RaftSnapshotWriter(
SnapshotWriter snapshotWriter,
@@ -59,11 +61,18 @@ public void close(boolean complete) {
if (!records.isEmpty()) {
snapshotWriter.append(records);
}
- snapshotWriter.freeze();
+ frozenSize = OptionalLong.of(snapshotWriter.freeze());
}
} finally {
records = null;
snapshotWriter.close();
}
}
+
+ /**
+ * @return the frozen size of the snapshot, or OptionalLong.empty if the snapshot was not frozen.
+ */
+ public OptionalLong frozenSize() {
+ return frozenSize;
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
new file mode 100644
index 0000000000000..aec2a4a513f0b
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData.Listener;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+/**
+ * Utility functions for use in QuorumController integration tests.
+ */
+public class QuorumControllerIntegrationTestUtils {
+ private final static Logger log = LoggerFactory.getLogger(QuorumControllerIntegrationTestUtils.class);
+
+ BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
+ return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, MetadataVersion.latest());
+ }
+
+ /**
+ * Create a broker features collection for use in a registration request. We only set MV. here.
+ *
+ * @param minVersion The minimum supported MV.
+ * @param maxVersion The maximum supported MV.
+ */
+ static BrokerRegistrationRequestData.FeatureCollection brokerFeatures(
+ MetadataVersion minVersion,
+ MetadataVersion maxVersion
+ ) {
+ BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
+ features.add(new BrokerRegistrationRequestData.Feature()
+ .setName(MetadataVersion.FEATURE_NAME)
+ .setMinSupportedVersion(minVersion.featureLevel())
+ .setMaxSupportedVersion(maxVersion.featureLevel()));
+ return features;
+ }
+
+ /**
+ * Register the given number of brokers.
+ *
+ * @param controller The active controller.
+ * @param numBrokers The number of brokers to register. We will start at 0 and increment.
+ *
+ * @return A map from broker IDs to broker epochs.
+ */
+ static Map registerBrokersAndUnfence(
+ QuorumController controller,
+ int numBrokers
+ ) throws Exception {
+ Map brokerEpochs = new HashMap<>();
+ for (int brokerId = 0; brokerId < numBrokers; brokerId++) {
+ BrokerRegistrationReply reply = controller.registerBroker(ANONYMOUS_CONTEXT,
+ new BrokerRegistrationRequestData()
+ .setBrokerId(brokerId)
+ .setRack(null)
+ .setClusterId(controller.clusterId())
+ .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0))
+ .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId))
+ .setListeners(new ListenerCollection(
+ Arrays.asList(
+ new Listener()
+ .setName("PLAINTEXT")
+ .setHost("localhost")
+ .setPort(9092 + brokerId)
+ ).iterator()
+ )
+ )
+ ).get();
+ brokerEpochs.put(brokerId, reply.epoch());
+
+ // Send heartbeat to unfence
+ controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
+ new BrokerHeartbeatRequestData()
+ .setWantFence(false)
+ .setBrokerEpoch(brokerEpochs.get(brokerId))
+ .setBrokerId(brokerId)
+ .setCurrentMetadataOffset(100000L)
+ ).get();
+ }
+
+ return brokerEpochs;
+ }
+
+ /**
+ * Send broker heartbeats for the provided brokers.
+ *
+ * @param controller The active controller.
+ * @param brokers The broker IDs to send heartbeats for.
+ * @param brokerEpochs A map from broker ID to broker epoch.
+ */
+ static void sendBrokerHeartbeat(
+ QuorumController controller,
+ List brokers,
+ Map brokerEpochs
+ ) throws Exception {
+ if (brokers.isEmpty()) {
+ return;
+ }
+ for (Integer brokerId : brokers) {
+ BrokerHeartbeatReply reply = controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
+ new BrokerHeartbeatRequestData()
+ .setWantFence(false)
+ .setBrokerEpoch(brokerEpochs.get(brokerId))
+ .setBrokerId(brokerId)
+ .setCurrentMetadataOffset(100000)
+ ).get();
+ assertEquals(new BrokerHeartbeatReply(true, false, false, false), reply);
+ }
+ }
+
+ /**
+ * Create some topics directly on the controller.
+ *
+ * @param controller The active controller.
+ * @param prefix The prefix to use for topic names.
+ * @param numTopics The number of topics to create.
+ * @param replicationFactor The replication factor to use.
+ */
+ static void createTopics(
+ QuorumController controller,
+ String prefix,
+ int numTopics,
+ int replicationFactor
+ ) throws Exception {
+ HashSet describable = new HashSet<>();
+ for (int i = 0; i < numTopics; i++) {
+ describable.add(prefix + i);
+ }
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ for (int i = 0; i < numTopics; i++) {
+ request.topics().add(
+ new CreatableTopic().
+ setName(prefix + i).
+ setNumPartitions(-1).
+ setReplicationFactor((short) replicationFactor));
+ }
+ CreateTopicsResponseData response =
+ controller.createTopics(ANONYMOUS_CONTEXT, request, describable).get();
+ for (int i = 0; i < numTopics; i++) {
+ CreatableTopicResult result = response.topics().find(prefix + i);
+ assertEquals((short) 0, result.errorCode());
+ }
+ }
+
+ /**
+ * Add an event to the controller event queue that will pause it temporarily.
+ *
+ * @param controller The controller.
+ * @return The latch that can be used to unpause the controller.
+ */
+ public static CountDownLatch pause(QuorumController controller) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ controller.appendControlEvent("pause", () -> {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ log.info("Interrupted while waiting for unpause.", e);
+ }
+ });
+ return latch;
+ }
+
+ /**
+ * Force the current controller to renounce.
+ *
+ * @param controller The controller.
+ */
+ static void forceRenounce(QuorumController controller) throws Exception {
+ CompletableFuture future = new CompletableFuture<>();
+ controller.appendControlEvent("forceRenounce", () -> {
+ controller.renounce();
+ future.complete(null);
+ });
+ future.get();
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
new file mode 100644
index 0000000000000..949d63fa375ed
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metalog.LocalLogManagerTestEnv;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.forceRenounce;
+import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
+import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class QuorumControllerMetricsIntegrationTest {
+ private final static Logger log = LoggerFactory.getLogger(QuorumControllerMetricsIntegrationTest.class);
+
+ static class MockControllerMetrics extends QuorumControllerMetrics {
+ final AtomicBoolean closed = new AtomicBoolean(false);
+
+ MockControllerMetrics() {
+ super(Optional.empty(), Time.SYSTEM, true);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ closed.set(true);
+ }
+ }
+
+ /**
+ * Test that closing the QuorumController closes the metrics object.
+ */
+ @Test
+ public void testClosingQuorumControllerClosesMetrics() throws Throwable {
+ MockControllerMetrics metrics = new MockControllerMetrics();
+ try (
+ LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
+ build();
+ QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+ setControllerBuilderInitializer(controllerBuilder -> {
+ controllerBuilder.setMetrics(metrics);
+ }).
+ build()
+ ) {
+ assertEquals(1, controlEnv.activeController().controllerMetrics().newActiveControllers());
+ }
+ assertTrue(metrics.closed.get(), "metrics were not closed");
+ }
+
+ /**
+ * Test that failing over to a new controller increments NewActiveControllersCount on both the
+ * active and inactive controllers.
+ */
+ @Test
+ public void testFailingOverIncrementsNewActiveControllerCount() throws Throwable {
+ try (
+ LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+ build();
+ QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+ build()
+ ) {
+ controlEnv.activeController(); // wait for a controller to become active.
+ TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+ for (QuorumController controller : controlEnv.controllers()) {
+ assertEquals(1, controller.controllerMetrics().newActiveControllers());
+ }
+ });
+ forceRenounce(controlEnv.activeController());
+ TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+ for (QuorumController controller : controlEnv.controllers()) {
+ assertEquals(2, controller.controllerMetrics().newActiveControllers());
+ }
+ });
+ }
+ }
+
+ /**
+ * Test the heartbeat and general operation timeout metrics.
+ * These are incremented on the active controller only.
+ */
+ @Test
+ public void testTimeoutMetrics() throws Throwable {
+ try (
+ LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+ build();
+ QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+ build()
+ ) {
+ QuorumController active = controlEnv.activeController();
+ Map brokerEpochs = registerBrokersAndUnfence(active, 3);
+ assertEquals(0L, active.controllerMetrics().timedOutHeartbeats());
+ assertEquals(0L, active.controllerMetrics().operationsTimedOut());
+
+ // We pause the controller so that the heartbeat event will definitely be expired
+ // rather than processed.
+ CountDownLatch latch = pause(active);
+ ControllerRequestContext expiredTimeoutContext = new ControllerRequestContext(
+ new RequestHeaderData(),
+ KafkaPrincipal.ANONYMOUS,
+ OptionalLong.of(active.time().nanoseconds()));
+ CompletableFuture replyFuture =
+ active.processBrokerHeartbeat(expiredTimeoutContext,
+ new BrokerHeartbeatRequestData()
+ .setWantFence(false)
+ .setBrokerEpoch(brokerEpochs.get(0))
+ .setBrokerId(0)
+ .setCurrentMetadataOffset(100000));
+ latch.countDown(); // Unpause the controller.
+ assertEquals(TimeoutException.class,
+ assertThrows(ExecutionException.class, () -> replyFuture.get()).
+ getCause().getClass());
+ assertEquals(1L, active.controllerMetrics().timedOutHeartbeats());
+ assertEquals(1L, active.controllerMetrics().operationsTimedOut());
+
+ // Inject a new timed out operation.
+ CountDownLatch latch2 = pause(active);
+ active.appendControlEventWithDeadline("fakeTimeoutOperation",
+ () -> { },
+ active.time().nanoseconds());
+ latch2.countDown();
+ TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+ // The fake timeout increments operationsTimedOut but not timedOutHeartbeats.
+ assertEquals(1L, active.controllerMetrics().timedOutHeartbeats());
+ assertEquals(2L, active.controllerMetrics().operationsTimedOut());
+ });
+ for (QuorumController controller : controlEnv.controllers()) {
+ // Inactive controllers don't set these metrics.
+ if (!controller.isActive()) {
+ assertEquals(false, controller.controllerMetrics().active());
+ assertEquals(0L, controller.controllerMetrics().timedOutHeartbeats());
+ assertEquals(0L, controller.controllerMetrics().operationsTimedOut());
+ }
+ }
+ }
+ }
+
+ /**
+ * Test the event queue operations started metric.
+ */
+ @Test
+ public void testEventQueueOperationsStartedMetric() throws Throwable {
+ try (
+ LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+ build();
+ QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+ build()
+ ) {
+ QuorumController active = controlEnv.activeController();
+ Map brokerEpochs = registerBrokersAndUnfence(active, 3);
+
+ // Test that a new operation increments operationsStarted. We retry this if needed
+ // to handle the case where another operation is performed in between loading
+ // expectedOperationsStarted and running the new control event.
+ TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+ long expectedOperationsStarted = active.controllerMetrics().operationsStarted() + 1;
+ CompletableFuture actualOperationsStarted = new CompletableFuture<>();
+ active.appendControlEvent("checkOperationsStarted", () -> {
+ actualOperationsStarted.complete(active.controllerMetrics().operationsStarted());
+ });
+ assertEquals(expectedOperationsStarted, actualOperationsStarted.get());
+ });
+ }
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index be39b9f74eee7..261d3c91b2889 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -36,7 +36,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -88,7 +87,6 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker;
-import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationReply;
@@ -124,6 +122,10 @@
import static org.apache.kafka.controller.ConfigurationControlManagerTest.SCHEMA;
import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
import static org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
+import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures;
+import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
+import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
+import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.sendBrokerHeartbeat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -140,39 +142,6 @@ public class QuorumControllerTest {
static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata.
fromVersion(MetadataVersion.IBP_3_3_IV3, "test-provided bootstrap");
- static class MockControllerMetrics extends QuorumControllerMetrics {
- final AtomicBoolean closed = new AtomicBoolean(false);
-
- MockControllerMetrics() {
- super(Optional.empty(), Time.SYSTEM, false);
- }
-
- @Override
- public void close() {
- super.close();
- closed.set(true);
- }
- }
-
- /**
- * Test creating a new QuorumController and closing it.
- */
- @Test
- public void testCreateAndClose() throws Throwable {
- MockControllerMetrics metrics = new MockControllerMetrics();
- try (
- LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
- build();
- QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
- setControllerBuilderInitializer(controllerBuilder -> {
- controllerBuilder.setMetrics(metrics);
- }).
- build()
- ) {
- }
- assertTrue(metrics.closed.get(), "metrics were not closed");
- }
-
/**
* Test setting some configuration values and reading them back.
*/
@@ -610,22 +579,6 @@ public void testUnregisterBroker() throws Throwable {
}
}
- private BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
- return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, MetadataVersion.latest());
- }
-
- private BrokerRegistrationRequestData.FeatureCollection brokerFeatures(
- MetadataVersion minVersion,
- MetadataVersion maxVersion
- ) {
- BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
- features.add(new BrokerRegistrationRequestData.Feature()
- .setName(MetadataVersion.FEATURE_NAME)
- .setMinSupportedVersion(minVersion.featureLevel())
- .setMaxSupportedVersion(maxVersion.featureLevel()));
- return features;
- }
-
private RegisterBrokerRecord.BrokerFeatureCollection registrationFeatures(
MetadataVersion minVersion,
MetadataVersion maxVersion
@@ -782,7 +735,7 @@ public void testTimeouts() throws Throwable {
build()
) {
QuorumController controller = controlEnv.activeController();
- CountDownLatch countDownLatch = controller.pause();
+ CountDownLatch countDownLatch = pause(controller);
long now = controller.time().nanoseconds();
ControllerRequestContext context0 = new ControllerRequestContext(
new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(now));
@@ -846,7 +799,7 @@ public void testEarlyControllerResults() throws Throwable {
build()
) {
QuorumController controller = controlEnv.activeController();
- CountDownLatch countDownLatch = controller.pause();
+ CountDownLatch countDownLatch = pause(controller);
CompletableFuture createFuture =
controller.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().
setTimeoutMs(120000), Collections.emptySet());
@@ -891,7 +844,7 @@ public void testMissingInMemorySnapshot() throws Exception {
) {
QuorumController controller = controlEnv.activeController();
- Map brokerEpochs = registerBrokers(controller, numBrokers);
+ Map brokerEpochs = registerBrokersAndUnfence(controller, numBrokers);
// Create a lot of partitions
List partitions = IntStream
@@ -980,62 +933,6 @@ public void testMissingInMemorySnapshot() throws Exception {
}
}
- private Map registerBrokers(QuorumController controller, int numBrokers) throws Exception {
- Map brokerEpochs = new HashMap<>();
- for (int brokerId = 0; brokerId < numBrokers; brokerId++) {
- BrokerRegistrationReply reply = controller.registerBroker(ANONYMOUS_CONTEXT,
- new BrokerRegistrationRequestData()
- .setBrokerId(brokerId)
- .setRack(null)
- .setClusterId(controller.clusterId())
- .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0))
- .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId))
- .setListeners(
- new ListenerCollection(
- Arrays.asList(
- new Listener()
- .setName("PLAINTEXT")
- .setHost("localhost")
- .setPort(9092 + brokerId)
- ).iterator()
- )
- )
- ).get();
- brokerEpochs.put(brokerId, reply.epoch());
-
- // Send heartbeat to unfence
- controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
- new BrokerHeartbeatRequestData()
- .setWantFence(false)
- .setBrokerEpoch(brokerEpochs.get(brokerId))
- .setBrokerId(brokerId)
- .setCurrentMetadataOffset(100000L)
- ).get();
- }
-
- return brokerEpochs;
- }
-
- private void sendBrokerHeartbeat(
- QuorumController controller,
- List brokers,
- Map brokerEpochs
- ) throws Exception {
- if (brokers.isEmpty()) {
- return;
- }
- for (Integer brokerId : brokers) {
- BrokerHeartbeatReply reply = controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
- new BrokerHeartbeatRequestData()
- .setWantFence(false)
- .setBrokerEpoch(brokerEpochs.get(brokerId))
- .setBrokerId(brokerId)
- .setCurrentMetadataOffset(100000)
- ).get();
- assertEquals(new BrokerHeartbeatReply(true, false, false, false), reply);
- }
- }
-
@Test
public void testConfigResourceExistenceChecker() throws Throwable {
try (
@@ -1048,7 +945,7 @@ public void testConfigResourceExistenceChecker() throws Throwable {
build()
) {
QuorumController active = controlEnv.activeController();
- registerBrokers(active, 5);
+ registerBrokersAndUnfence(active, 5);
active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().
setTopics(new CreatableTopicCollection(Collections.singleton(
new CreatableTopic().setName("foo").
@@ -1508,7 +1405,8 @@ public void testActivationRecordsNonEmptyLog() {
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.empty(), true);
assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
- assertEquals(ZkMigrationState.PRE_MIGRATION, featureControl.zkMigrationState()); }
+ assertEquals(ZkMigrationState.PRE_MIGRATION, featureControl.zkMigrationState());
+ }
@Test
public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws Exception {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
index bec023020a418..9258577a0ddf3 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
@@ -29,52 +29,39 @@
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
public class QuorumControllerMetricsTest {
- @Test
- public void testMetricNamesNotInMigrationState() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testMetricNames(boolean inMigration) {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try {
- try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
- ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
- new HashSet<>(Arrays.asList(
- "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
- "kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
- "kafka.controller:type=KafkaController,name=ActiveControllerCount",
- "kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
- "kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
- "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
- "kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
- "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset"
- )));
- }
- ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
- Collections.emptySet());
- } finally {
- registry.shutdown();
- }
- }
-
- @Test
- public void testMetricNamesInMigrationState() {
- MetricsRegistry registry = new MetricsRegistry();
- MockTime time = new MockTime();
- try {
- try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
- ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
- new HashSet<>(Arrays.asList(
- "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
- "kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
- "kafka.controller:type=KafkaController,name=ActiveControllerCount",
- "kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
- "kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
- "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
- "kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
- "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
- "kafka.controller:type=KafkaController,name=ZKWriteBehindLag"
- )));
+ try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(
+ Optional.of(registry),
+ time,
+ inMigration)) {
+ HashSet expected = new HashSet<>(Arrays.asList(
+ "kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
+ "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
+ "kafka.controller:type=KafkaController,name=ActiveControllerCount",
+ "kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
+ "kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
+ "kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
+ "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
+ "kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
+ "kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
+ "kafka.controller:type=KafkaController,name=NewActiveControllersCount",
+ "kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount"
+ ));
+ if (inMigration) {
+ expected.add("kafka.controller:type=KafkaController,name=ZKWriteBehindLag");
+ }
+ ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller", expected);
}
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
Collections.emptySet());
@@ -112,10 +99,23 @@ public void testLastAppliedRecordMetrics() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
time.sleep(1000);
- try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
+ try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
metrics.setLastAppliedRecordOffset(100);
metrics.setLastAppliedRecordTimestamp(500);
metrics.setLastCommittedRecordOffset(50);
+ metrics.updateDualWriteOffset(40L);
+ for (int i = 0; i < 2; i++) {
+ metrics.incrementTimedOutHeartbeats();
+ }
+ for (int i = 0; i < 3; i++) {
+ metrics.incrementOperationsStarted();
+ }
+ for (int i = 0; i < 4; i++) {
+ metrics.incrementOperationsTimedOut();
+ }
+ for (int i = 0; i < 5; i++) {
+ metrics.incrementNewActiveControllers();
+ }
@SuppressWarnings("unchecked")
Gauge lastAppliedRecordOffset = (Gauge) registry
@@ -140,6 +140,36 @@ public void testLastAppliedRecordMetrics() {
.allMetrics()
.get(metricName("KafkaController", "LastCommittedRecordOffset"));
assertEquals(50, lastCommittedRecordOffset.value());
+
+ @SuppressWarnings("unchecked")
+ Gauge zkWriteBehindLag = (Gauge) registry
+ .allMetrics()
+ .get(metricName("KafkaController", "ZKWriteBehindLag"));
+ assertEquals(10L, zkWriteBehindLag.value());
+
+ @SuppressWarnings("unchecked")
+ Gauge timedOutBrokerHeartbeats = (Gauge) registry
+ .allMetrics()
+ .get(metricName("KafkaController", "TimedOutBrokerHeartbeatCount"));
+ assertEquals(2L, timedOutBrokerHeartbeats.value());
+
+ @SuppressWarnings("unchecked")
+ Gauge operationsStarted = (Gauge) registry
+ .allMetrics()
+ .get(metricName("KafkaController", "EventQueueOperationsStartedCount"));
+ assertEquals(3L, operationsStarted.value());
+
+ @SuppressWarnings("unchecked")
+ Gauge operationsTimedOut = (Gauge) registry
+ .allMetrics()
+ .get(metricName("KafkaController", "EventQueueOperationsTimedOutCount"));
+ assertEquals(4L, operationsTimedOut.value());
+
+ @SuppressWarnings("unchecked")
+ Gauge newActiveControllers = (Gauge) registry
+ .allMetrics()
+ .get(metricName("KafkaController", "NewActiveControllersCount"));
+ assertEquals(5L, newActiveControllers.value());
} finally {
registry.shutdown();
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java b/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java
new file mode 100644
index 0000000000000..d76b1c81f217a
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.snapshot.SnapshotWriter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+
+public class FakeSnapshotWriter implements SnapshotWriter {
+ private final OffsetAndEpoch snapshotId;
+ private List> batches = new ArrayList<>();
+ private boolean frozen = false;
+ private boolean closed = false;
+
+ public List> batches() {
+ List> result = new ArrayList<>();
+ for (List batch : batches) {
+ result.add(Collections.unmodifiableList(batch));
+ }
+ return Collections.unmodifiableList(result);
+ }
+
+ public FakeSnapshotWriter() {
+ this(new OffsetAndEpoch(100L, 10));
+ }
+
+ public FakeSnapshotWriter(OffsetAndEpoch snapshotId) {
+ this.snapshotId = snapshotId;
+ }
+
+ @Override
+ public OffsetAndEpoch snapshotId() {
+ return snapshotId;
+ }
+
+ @Override
+ public long lastContainedLogOffset() {
+ return snapshotId().offset() - 1;
+ }
+
+ @Override
+ public int lastContainedLogEpoch() {
+ return snapshotId().epoch();
+ }
+
+ @Override
+ public boolean isFrozen() {
+ return frozen;
+ }
+
+ @Override
+ public void append(List batch) {
+ if (frozen) {
+ throw new IllegalStateException("Append not supported. Snapshot is already frozen.");
+ }
+ batches.add(batch);
+ }
+
+ @Override
+ public long freeze() {
+ frozen = true;
+ return batches.size() * 100;
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+}
\ No newline at end of file
diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index 1eefa15a8c3d6..372700b1fb623 100644
--- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -37,6 +37,7 @@
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.snapshot.SnapshotReader;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
@@ -55,6 +56,7 @@
import static java.util.Arrays.asList;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_5_IV0;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -251,6 +253,13 @@ public void testPublisherCannotBeInstalledMoreThanOnce(
)
);
loader.handleLoadSnapshot(snapshotReader);
+ TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+ assertEquals(1L, loader.metrics().handleLoadSnapshotCount());
+ });
+ } else {
+ TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+ assertEquals(0L, loader.metrics().handleLoadSnapshotCount());
+ });
}
loader.waitForAllEventsToBeHandled();
if (sameObject) {
@@ -328,6 +337,8 @@ public void testLoadEmptySnapshot() throws Exception {
assertEquals(300L, loader.lastAppliedOffset());
assertEquals(new SnapshotManifest(new MetadataProvenance(300, 100, 4000), 3000000L),
publishers.get(0).latestSnapshotManifest);
+ assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION,
+ loader.metrics().currentMetadataVersion());
}
assertTrue(publishers.get(0).closed);
assertEquals(MetadataVersion.IBP_3_0_IV1,
@@ -587,14 +598,27 @@ public void testReloadSnapshot() throws Exception {
loadTestSnapshot(loader, 200);
assertEquals(200L, loader.lastAppliedOffset());
+ assertEquals(IBP_3_3_IV1.featureLevel(),
+ loader.metrics().currentMetadataVersion().featureLevel());
assertFalse(publishers.get(0).latestDelta.image().isEmpty());
loadTestSnapshot2(loader, 400);
assertEquals(400L, loader.lastAppliedOffset());
+ assertEquals(IBP_3_3_IV2.featureLevel(),
+ loader.metrics().currentMetadataVersion().featureLevel());
// Make sure the topic in the initial snapshot was overwritten by loading the new snapshot.
assertFalse(publishers.get(0).latestImage.topics().topicsByName().containsKey("foo"));
assertTrue(publishers.get(0).latestImage.topics().topicsByName().containsKey("bar"));
+
+ loader.handleCommit(new MockBatchReader(500, asList(
+ MockBatchReader.newBatch(500, 100, asList(
+ new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(IBP_3_5_IV0.featureLevel()), (short) 0))))));
+ loader.waitForAllEventsToBeHandled();
+ assertEquals(IBP_3_5_IV0.featureLevel(),
+ loader.metrics().currentMetadataVersion().featureLevel());
}
faultHandler.maybeRethrowFirstException();
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
new file mode 100644
index 0000000000000..5e3c6c6f5712f
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils;
+import org.apache.kafka.image.MetadataProvenance;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+public class MetadataLoaderMetricsTest {
+ private static class FakeMetadataLoaderMetrics implements AutoCloseable {
+ final AtomicLong batchProcessingTimeNs = new AtomicLong(0L);
+ final AtomicInteger batchSize = new AtomicInteger(0);
+ final AtomicReference provenance =
+ new AtomicReference<>(MetadataProvenance.EMPTY);
+ final MetadataLoaderMetrics metrics;
+
+ FakeMetadataLoaderMetrics() {
+ this(Optional.empty());
+ }
+
+ FakeMetadataLoaderMetrics(MetricsRegistry registry) {
+ this(Optional.of(registry));
+ }
+
+ FakeMetadataLoaderMetrics(Optional registry) {
+ metrics = new MetadataLoaderMetrics(
+ registry,
+ n -> batchProcessingTimeNs.set(n),
+ n -> batchSize.set(n),
+ provenance);
+ }
+
+ @Override
+ public void close() {
+ metrics.close();
+ }
+ }
+
+ @Test
+ public void testMetricNames() {
+ MetricsRegistry registry = new MetricsRegistry();
+ try {
+ try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) {
+ ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
+ new HashSet<>(Arrays.asList(
+ "kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
+ "kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount"
+ )));
+ }
+ ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
+ Collections.emptySet());
+ } finally {
+ registry.shutdown();
+ }
+ }
+
+ @Test
+ public void testUpdateBatchProcessingTimeNs() {
+ MetricsRegistry registry = new MetricsRegistry();
+ try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) {
+ fakeMetrics.metrics.updateBatchProcessingTimeNs(123L);
+ assertEquals(123L, fakeMetrics.batchProcessingTimeNs.get());
+ }
+ }
+
+ @Test
+ public void testUpdateBatchSize() {
+ MetricsRegistry registry = new MetricsRegistry();
+ try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) {
+ fakeMetrics.metrics.updateBatchSize(50);
+ assertEquals(50, fakeMetrics.batchSize.get());
+ }
+ }
+
+ @Test
+ public void testUpdateLastAppliedImageProvenance() {
+ MetricsRegistry registry = new MetricsRegistry();
+ try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) {
+ MetadataProvenance provenance = new MetadataProvenance(1L, 2, 3L);
+ fakeMetrics.metrics.updateLastAppliedImageProvenance(provenance);
+ assertEquals(provenance, fakeMetrics.provenance.get());
+ }
+ }
+
+ @Test
+ public void testManagedMetrics() {
+ MetricsRegistry registry = new MetricsRegistry();
+ try {
+ try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) {
+ fakeMetrics.metrics.setCurrentMetadataVersion(IBP_3_3_IV2);
+ fakeMetrics.metrics.incrementHandleLoadSnapshotCount();
+ fakeMetrics.metrics.incrementHandleLoadSnapshotCount();
+
+ @SuppressWarnings("unchecked")
+ Gauge currentMetadataVersion = (Gauge) registry
+ .allMetrics()
+ .get(metricName("MetadataLoader", "CurrentMetadataVersion"));
+ assertEquals(IBP_3_3_IV2.featureLevel(),
+ currentMetadataVersion.value().shortValue());
+
+ @SuppressWarnings("unchecked")
+ Gauge loadSnapshotCount = (Gauge) registry
+ .allMetrics()
+ .get(metricName("MetadataLoader", "HandleLoadSnapshotCount"));
+ assertEquals(2L, loadSnapshotCount.value().longValue());
+ }
+ ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
+ Collections.emptySet());
+ } finally {
+ registry.shutdown();
+ }
+ }
+
+ private static MetricName metricName(String type, String name) {
+ String mBeanName = String.format("kafka.server:type=%s,name=%s", type, name);
+ return new MetricName("kafka.server", type, name, null, mBeanName);
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
index ca72aa058ef9d..be4285f6bbf3e 100644
--- a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
@@ -17,6 +17,8 @@
package org.apache.kafka.image.publisher;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.image.FakeSnapshotWriter;
import org.apache.kafka.image.MetadataImageTest;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
@@ -26,7 +28,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
@@ -43,7 +44,7 @@
@Timeout(value = 40)
public class SnapshotEmitterTest {
static class MockRaftClient implements RaftClient {
- TreeMap writers = new TreeMap<>();
+ TreeMap writers = new TreeMap<>();
@Override
public void initialize() {
@@ -103,7 +104,7 @@ public Optional> createSnapshot(
if (writers.containsKey(snapshotId)) {
return Optional.empty();
}
- MockSnapshotWriter writer = new MockSnapshotWriter(snapshotId);
+ FakeSnapshotWriter writer = new FakeSnapshotWriter(snapshotId);
writers.put(snapshotId, writer);
return Optional.of(writer);
}
@@ -124,72 +125,24 @@ public void close() throws Exception {
}
}
- static class MockSnapshotWriter implements SnapshotWriter {
- private final OffsetAndEpoch snapshotId;
- private boolean frozen = false;
- private boolean closed = false;
- private final List> batches;
-
- MockSnapshotWriter(OffsetAndEpoch snapshotId) {
- this.snapshotId = snapshotId;
- this.batches = new ArrayList<>();
- }
-
- @Override
- public OffsetAndEpoch snapshotId() {
- return snapshotId;
- }
-
- @Override
- public long lastContainedLogOffset() {
- return snapshotId.offset() - 1;
- }
-
- @Override
- public int lastContainedLogEpoch() {
- return snapshotId.epoch();
- }
-
- @Override
- public boolean isFrozen() {
- return frozen;
- }
-
- @Override
- public void append(List records) {
- batches.add(records);
- }
-
- List> batches() {
- List> results = new ArrayList<>();
- batches.forEach(batch -> results.add(new ArrayList<>(batch)));
- return results;
- }
-
- @Override
- public void freeze() {
- frozen = true;
- }
-
- @Override
- public void close() {
- closed = true;
- }
-
- boolean isClosed() {
- return closed;
- }
- }
-
@Test
public void testEmit() throws Exception {
MockRaftClient mockRaftClient = new MockRaftClient();
+ MockTime time = new MockTime(0, 10000L, 20000L);
SnapshotEmitter emitter = new SnapshotEmitter.Builder().
+ setTime(time).
setBatchSize(2).
setRaftClient(mockRaftClient).
build();
+ assertEquals(0L, emitter.metrics().latestSnapshotGeneratedAgeMs());
+ assertEquals(0L, emitter.metrics().latestSnapshotGeneratedBytes());
+ time.sleep(30000L);
+ assertEquals(30000L, emitter.metrics().latestSnapshotGeneratedAgeMs());
+ assertEquals(0L, emitter.metrics().latestSnapshotGeneratedBytes());
emitter.maybeEmit(MetadataImageTest.IMAGE1);
- MockSnapshotWriter writer = mockRaftClient.writers.get(
+ assertEquals(0L, emitter.metrics().latestSnapshotGeneratedAgeMs());
+ assertEquals(1400L, emitter.metrics().latestSnapshotGeneratedBytes());
+ FakeSnapshotWriter writer = mockRaftClient.writers.get(
MetadataImageTest.IMAGE1.provenance().snapshotId());
assertNotNull(writer);
assertEquals(MetadataImageTest.IMAGE1.highestOffsetAndEpoch().offset(),
diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetricsTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetricsTest.java
new file mode 100644
index 0000000000000..9f79631eef3d0
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetricsTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+@Timeout(value = 40)
+public class SnapshotEmitterMetricsTest {
+ private static final Logger log = LoggerFactory.getLogger(SnapshotEmitterMetricsTest.class);
+
+ static class SnapshotEmitterMetricsTestContext implements AutoCloseable {
+ final MetricsRegistry registry;
+ final MockTime time;
+ final SnapshotEmitterMetrics metrics;
+
+ SnapshotEmitterMetricsTestContext() {
+ this.registry = new MetricsRegistry();
+ this.time = new MockTime(0, 10000L, 0L);
+ this.metrics = new SnapshotEmitterMetrics(Optional.of(registry), time);
+ }
+
+ @SuppressWarnings("unchecked") // suppress warning about Gauge typecast
+ long readLongGauge(String name) {
+ MetricName metricName = new MetricName(
+ "kafka.server",
+ "SnapshotEmitter",
+ name,
+ null,
+ "kafka.server:type=SnapshotEmitter,name=" + name
+ );
+ return ((Gauge) registry.allMetrics().get(metricName)).value();
+ }
+
+ @Override
+ public void close() {
+ try {
+ registry.shutdown();
+ } catch (Exception e) {
+ log.error("Error closing registry", e);
+ }
+ }
+ }
+
+ @Test
+ public void testMetricNames() {
+ try (SnapshotEmitterMetricsTestContext ctx = new SnapshotEmitterMetricsTestContext()) {
+ ControllerMetricsTestUtils.assertMetricsForTypeEqual(ctx.registry, "kafka.server:",
+ new HashSet<>(Arrays.asList(
+ "kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes",
+ "kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs"
+ )));
+ ctx.metrics.close();
+ ControllerMetricsTestUtils.assertMetricsForTypeEqual(ctx.registry, "KafkaController",
+ Collections.emptySet());
+ }
+ }
+
+ @Test
+ public void testLatestSnapshotGeneratedBytesMetric() {
+ try (SnapshotEmitterMetricsTestContext ctx = new SnapshotEmitterMetricsTestContext()) {
+ assertEquals(0L, ctx.metrics.latestSnapshotGeneratedBytes());
+ ctx.metrics.setLatestSnapshotGeneratedBytes(12345L);
+ assertEquals(12345L, ctx.metrics.latestSnapshotGeneratedBytes());
+ assertEquals(12345L, ctx.readLongGauge("LatestSnapshotGeneratedBytes"));
+ }
+ }
+
+ @Test
+ public void testLatestSnapshotGeneratedAgeMsMetric() {
+ try (SnapshotEmitterMetricsTestContext ctx = new SnapshotEmitterMetricsTestContext()) {
+ assertEquals(10000L, ctx.metrics.latestSnapshotGeneratedTimeMs());
+ assertEquals(0L, ctx.metrics.latestSnapshotGeneratedAgeMs());
+ ctx.time.sleep(20000L);
+ assertEquals(10000L, ctx.metrics.latestSnapshotGeneratedTimeMs());
+ assertEquals(20000L, ctx.metrics.latestSnapshotGeneratedAgeMs());
+ assertEquals(20000L, ctx.readLongGauge("LatestSnapshotGeneratedAgeMs"));
+ }
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/image/writer/RaftSnapshotWriterTest.java b/metadata/src/test/java/org/apache/kafka/image/writer/RaftSnapshotWriterTest.java
index 4cdacfe7e1e8b..0137aeb077224 100644
--- a/metadata/src/test/java/org/apache/kafka/image/writer/RaftSnapshotWriterTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/writer/RaftSnapshotWriterTest.java
@@ -17,15 +17,11 @@
package org.apache.kafka.image.writer;
-import org.apache.kafka.raft.OffsetAndEpoch;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.snapshot.SnapshotWriter;
+import org.apache.kafka.image.FakeSnapshotWriter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
import static java.util.Collections.emptyList;
import static org.apache.kafka.metadata.RecordTestUtils.testRecord;
@@ -36,70 +32,29 @@
@Timeout(value = 40)
public class RaftSnapshotWriterTest {
- static class MockSnapshotWriter implements SnapshotWriter {
- boolean frozen = false;
- boolean closed = false;
- List> batches = new ArrayList<>();
-
- @Override
- public OffsetAndEpoch snapshotId() {
- return new OffsetAndEpoch(100L, 10);
- }
-
- @Override
- public long lastContainedLogOffset() {
- return snapshotId().offset();
- }
-
- @Override
- public int lastContainedLogEpoch() {
- return snapshotId().epoch();
- }
-
- @Override
- public boolean isFrozen() {
- return frozen;
- }
-
- @Override
- public void append(List batch) {
- batches.add(batch);
- }
-
- @Override
- public void freeze() {
- frozen = true;
- }
-
- @Override
- public void close() {
- closed = true;
- }
- }
-
@Test
public void testFreezeAndClose() {
- MockSnapshotWriter snapshotWriter = new MockSnapshotWriter();
+ FakeSnapshotWriter snapshotWriter = new FakeSnapshotWriter();
RaftSnapshotWriter writer = new RaftSnapshotWriter(snapshotWriter, 2);
writer.write(testRecord(0));
writer.write(testRecord(1));
writer.write(testRecord(2));
writer.close(true);
- assertTrue(snapshotWriter.frozen);
- assertTrue(snapshotWriter.closed);
+ assertTrue(snapshotWriter.isFrozen());
+ assertTrue(snapshotWriter.isClosed());
assertEquals(Arrays.asList(
Arrays.asList(testRecord(0), testRecord(1)),
- Arrays.asList(testRecord(2))), snapshotWriter.batches);
+ Arrays.asList(testRecord(2))), snapshotWriter.batches());
}
@Test
public void testCloseWithoutFreeze() {
- MockSnapshotWriter snapshotWriter = new MockSnapshotWriter();
+ FakeSnapshotWriter snapshotWriter = new FakeSnapshotWriter();
RaftSnapshotWriter writer = new RaftSnapshotWriter(snapshotWriter, 2);
writer.write(testRecord(0));
writer.close();
- assertFalse(snapshotWriter.frozen);
- assertTrue(snapshotWriter.closed);
- assertEquals(emptyList(), snapshotWriter.batches);
+ assertFalse(snapshotWriter.isFrozen());
+ assertTrue(snapshotWriter.isClosed());
+ assertEquals(emptyList(), snapshotWriter.batches());
}
}
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
index badefd321ed3f..535c176c7280c 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
@@ -35,7 +35,7 @@ public final class FileRawSnapshotWriter implements RawSnapshotWriter {
private final FileChannel channel;
private final OffsetAndEpoch snapshotId;
private final Optional replicatedLog;
- private boolean frozen = false;
+ private long frozenSize;
private FileRawSnapshotWriter(
Path tempSnapshotPath,
@@ -47,6 +47,7 @@ private FileRawSnapshotWriter(
this.channel = channel;
this.snapshotId = snapshotId;
this.replicatedLog = replicatedLog;
+ this.frozenSize = -1L;
}
@Override
@@ -56,6 +57,9 @@ public OffsetAndEpoch snapshotId() {
@Override
public long sizeInBytes() {
+ if (frozenSize >= 0) {
+ return frozenSize;
+ }
try {
return channel.size();
} catch (IOException e) {
@@ -99,7 +103,7 @@ public void append(MemoryRecords records) {
@Override
public boolean isFrozen() {
- return frozen;
+ return frozenSize >= 0;
}
@Override
@@ -107,8 +111,8 @@ public void freeze() {
try {
checkIfFrozen("Freeze");
+ frozenSize = channel.size();
channel.close();
- frozen = true;
if (!tempSnapshotPath.toFile().setReadOnly()) {
throw new IllegalStateException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath));
@@ -148,12 +152,12 @@ public String toString() {
"FileRawSnapshotWriter(path=%s, snapshotId=%s, frozen=%s)",
tempSnapshotPath,
snapshotId,
- frozen
+ isFrozen()
);
}
void checkIfFrozen(String operation) {
- if (frozen) {
+ if (isFrozen()) {
throw new IllegalStateException(
String.format(
"%s is not supported. Snapshot is already frozen: id = %s; temp path = %s",
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
index eeacf608a9ff0..ae6202426b81d 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
@@ -192,11 +192,12 @@ public void append(List records) {
}
@Override
- public void freeze() {
+ public long freeze() {
finalizeSnapshotWithFooter();
appendBatches(accumulator.drain());
snapshot.freeze();
accumulator.close();
+ return snapshot.sizeInBytes();
}
@Override
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
index 537335c058abb..244cc5478f46e 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
@@ -71,8 +71,10 @@ public interface SnapshotWriter extends AutoCloseable {
* Freezes the snapshot by flushing all pending writes and marking it as immutable.
*
* Also adds a {@link SnapshotFooterRecord} to the end of the snapshot
+ *
+ * @return The size of the snapshot in bytes.
*/
- void freeze();
+ long freeze();
/**
* Closes the snapshot writer.
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index bc1d0ca21fc34..f5b1ea15c3dc1 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -48,6 +48,7 @@
import java.util.OptionalLong;
import java.util.OptionalInt;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -1832,12 +1833,12 @@ private static SnapshotWriter snapshotWriter(RaftClientTestContext conte
private final static class MemorySnapshotWriter implements RawSnapshotWriter {
private final OffsetAndEpoch snapshotId;
private ByteBuffer data;
- private boolean frozen;
+ private AtomicLong frozenPosition;
public MemorySnapshotWriter(OffsetAndEpoch snapshotId) {
this.snapshotId = snapshotId;
this.data = ByteBuffer.allocate(0);
- this.frozen = false;
+ this.frozenPosition = new AtomicLong(-1L);
}
@Override
@@ -1847,16 +1848,13 @@ public OffsetAndEpoch snapshotId() {
@Override
public long sizeInBytes() {
- if (frozen) {
- throw new RuntimeException("Snapshot is already frozen " + snapshotId);
- }
-
- return data.position();
+ long position = frozenPosition.get();
+ return (position < 0) ? data.position() : position;
}
@Override
public void append(UnalignedMemoryRecords records) {
- if (frozen) {
+ if (isFrozen()) {
throw new RuntimeException("Snapshot is already frozen " + snapshotId);
}
append(records.buffer());
@@ -1864,7 +1862,7 @@ public void append(UnalignedMemoryRecords records) {
@Override
public void append(MemoryRecords records) {
- if (frozen) {
+ if (isFrozen()) {
throw new RuntimeException("Snapshot is already frozen " + snapshotId);
}
append(records.buffer());
@@ -1885,16 +1883,14 @@ private void append(ByteBuffer buffer) {
@Override
public boolean isFrozen() {
- return frozen;
+ return frozenPosition.get() >= 0;
}
@Override
public void freeze() {
- if (frozen) {
+ if (!frozenPosition.compareAndSet(-1L, data.position())) {
throw new RuntimeException("Snapshot is already frozen " + snapshotId);
}
-
- frozen = true;
data.flip();
}
diff --git a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
index 0b5cc66a0c157..103eb3781c696 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
@@ -46,7 +46,7 @@ public OffsetAndEpoch snapshotId() {
@Override
public long sizeInBytes() {
- ensureNotFrozenOrClosed();
+ ensureOpen();
return data.position();
}