Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class ControllerServer(

val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.fold(OptionalLong.empty)(OptionalLong.of)

quorumControllerMetrics = new QuorumControllerMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry), time)
quorumControllerMetrics = new QuorumControllerMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry), time, config.migrationEnabled)

new QuorumController.Builder(config.nodeId, sharedServer.metaProps.clusterId).
setTime(time).
Expand Down Expand Up @@ -268,7 +268,8 @@ class ControllerServer(
() => {}
),
quorumFeatures,
configSchema
configSchema,
quorumControllerMetrics
)
migrationDriver.start()
migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public QuorumController build() throws Exception {
logContext = new LogContext(String.format("[QuorumController id=%d] ", nodeId));
}
if (controllerMetrics == null) {
controllerMetrics = new QuorumControllerMetrics(Optional.empty(), time);
controllerMetrics = new QuorumControllerMetrics(Optional.empty(), time, zkMigrationEnabled);
}

KafkaEventQueue queue = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
"ControllerEventManager", "EventQueueTimeMs");
private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private final static MetricName ZK_WRITE_BEHIND_LAG = getMetricName(
"KafkaController", "ZKWriteBehindLag");
private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
Expand All @@ -58,6 +60,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong lastAppliedRecordOffset = new AtomicLong(0);
private final AtomicLong lastCommittedRecordOffset = new AtomicLong(0);
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final AtomicLong dualWriteOffset = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
Expand All @@ -73,7 +76,8 @@ private Consumer<Long> newHistogram(MetricName name, boolean biased) {

public QuorumControllerMetrics(
Optional<MetricsRegistry> registry,
Time time
Time time,
boolean zkMigrationState
) {
this.registry = registry;
this.active = false;
Expand Down Expand Up @@ -109,6 +113,18 @@ public Long value() {
return time.milliseconds() - lastAppliedRecordTimestamp();
}
}));

if (zkMigrationState) {
registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new Gauge<Long>() {
@Override
public Long value() {
// not in dual-write mode: set metric value to 0
if (dualWriteOffset() == 0) return 0L;
// in dual write mode
else return lastCommittedRecordOffset() - dualWriteOffset();
}
}));
}
}

public void setActive(boolean active) {
Expand Down Expand Up @@ -151,6 +167,14 @@ public long lastAppliedRecordTimestamp() {
return lastAppliedRecordTimestamp.get();
}

public void updateDualWriteOffset(long offset) {
dualWriteOffset.set(offset);
}

public long dualWriteOffset() {
return dualWriteOffset.get();
}

public void incrementTimedOutHeartbeats() {
timedOutHeartbeats.addAndGet(1);
}
Expand All @@ -172,7 +196,8 @@ public void close() {
LAST_APPLIED_RECORD_OFFSET,
LAST_COMMITTED_RECORD_OFFSET,
LAST_APPLIED_RECORD_TIMESTAMP,
LAST_APPLIED_RECORD_LAG_MS
LAST_APPLIED_RECORD_LAG_MS,
ZK_WRITE_BEHIND_LAG
).forEach(r::removeMetric));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
Expand Down Expand Up @@ -97,6 +98,7 @@ public long nextPollTimeMs() {
private final ZkRecordConsumer zkRecordConsumer;
private final KafkaEventQueue eventQueue;
private final PollTimeSupplier pollTimeSupplier;
private final QuorumControllerMetrics controllerMetrics;
private final FaultHandler faultHandler;
private final QuorumFeatures quorumFeatures;
private final RecordRedactor recordRedactor;
Expand All @@ -119,6 +121,7 @@ public KRaftMigrationDriver(
FaultHandler faultHandler,
QuorumFeatures quorumFeatures,
KafkaConfigSchema configSchema,
QuorumControllerMetrics controllerMetrics,
Time time
) {
this.nodeId = nodeId;
Expand All @@ -127,6 +130,7 @@ public KRaftMigrationDriver(
this.propagator = propagator;
this.time = time;
LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
this.controllerMetrics = controllerMetrics;
this.log = logContext.logger(KRaftMigrationDriver.class);
this.migrationState = MigrationDriverState.UNINITIALIZED;
this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
Expand All @@ -149,9 +153,10 @@ public KRaftMigrationDriver(
Consumer<MetadataPublisher> initialZkLoadHandler,
FaultHandler faultHandler,
QuorumFeatures quorumFeatures,
KafkaConfigSchema configSchema
KafkaConfigSchema configSchema,
QuorumControllerMetrics controllerMetrics
) {
this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, configSchema, Time.SYSTEM);
this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, configSchema, controllerMetrics, Time.SYSTEM);
}


Expand Down Expand Up @@ -497,6 +502,9 @@ public void run() throws Exception {
// Persist the offset of the metadata that was written to ZK
ZkMigrationLeadershipState zkStateAfterDualWrite = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
image.highestOffsetAndEpoch().offset(), image.highestOffsetAndEpoch().epoch());
//update the dual write offset metric
controllerMetrics.updateDualWriteOffset(image.highestOffsetAndEpoch().offset());

applyMigrationOperation("Updating ZK migration state after " + metadataType,
state -> zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ static class MockControllerMetrics extends QuorumControllerMetrics {
final AtomicBoolean closed = new AtomicBoolean(false);

MockControllerMetrics() {
super(Optional.empty(), Time.SYSTEM);
super(Optional.empty(), Time.SYSTEM, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@

public class QuorumControllerMetricsTest {
@Test
public void testMetricNames() {
public void testMetricNamesNotInMigrationState() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
new HashSet<>(Arrays.asList(
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
Expand All @@ -57,11 +57,37 @@ public void testMetricNames() {
}
}

@Test
public void testMetricNamesInMigrationState() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
new HashSet<>(Arrays.asList(
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
"kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
"kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
"kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
"kafka.controller:type=KafkaController,name=ZKWriteBehindLag"
)));
}
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
Collections.emptySet());
} finally {
registry.shutdown();
}
}

@Test
public void testUpdateEventQueueTime() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
metrics.updateEventQueueTime(1000);
assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueTimeMs"), 1, 1000);
} finally {
Expand All @@ -73,7 +99,7 @@ public void testUpdateEventQueueTime() {
public void testUpdateEventQueueProcessingTime() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
metrics.updateEventQueueProcessingTime(1000);
assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueProcessingTimeMs"), 1, 1000);
} finally {
Expand All @@ -86,7 +112,7 @@ public void testLastAppliedRecordMetrics() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
time.sleep(1000);
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
metrics.setLastAppliedRecordOffset(100);
metrics.setLastAppliedRecordTimestamp(500);
metrics.setLastCommittedRecordOffset(50);
Expand Down Expand Up @@ -119,6 +145,36 @@ public void testLastAppliedRecordMetrics() {
}
}

@Test
public void testUpdateZKWriteBehindLag() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
// test zkWriteBehindLag metric when NOT in dual-write mode
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
metrics.updateDualWriteOffset(0);
@SuppressWarnings("unchecked")
Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
.allMetrics()
.get(metricName("KafkaController", "ZKWriteBehindLag"));
assertEquals(0, zkWriteBehindLag.value());
} finally {
registry.shutdown();
}

// test zkWriteBehindLag metric when in dual-write mode
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
metrics.updateDualWriteOffset(90);
metrics.setLastCommittedRecordOffset(100);
@SuppressWarnings("unchecked")
Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
.allMetrics()
.get(metricName("KafkaController", "ZKWriteBehindLag"));
assertEquals(10, zkWriteBehindLag.value());
} finally {
registry.shutdown();
}
}

private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.ClientQuotasImage;
import org.apache.kafka.image.ClusterImage;
Expand Down Expand Up @@ -63,9 +64,11 @@
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -88,6 +91,22 @@ public class KRaftMigrationDriverTest {
apiVersions,
QuorumFeatures.defaultFeatureMap(),
controllerNodes);

static class MockControllerMetrics extends QuorumControllerMetrics {
final AtomicBoolean closed = new AtomicBoolean(false);

MockControllerMetrics() {
super(Optional.empty(), Time.SYSTEM, false);
}

@Override
public void close() {
super.close();
closed.set(true);
}
}
MockControllerMetrics metrics = new MockControllerMetrics();

Time mockTime = new MockTime(1) {
public long nanoseconds() {
// We poll the event for each 1 sec, make it happen for each 10 ms to speed up the test
Expand Down Expand Up @@ -216,9 +235,9 @@ public void testOnlySendNeededRPCsToBrokers() throws Exception {
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {

MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);

Expand Down Expand Up @@ -302,6 +321,7 @@ public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershi
faultHandler,
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
Expand Down Expand Up @@ -348,9 +368,9 @@ public void testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate()
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {

MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);

Expand Down Expand Up @@ -387,15 +407,16 @@ public void testSkipWaitForBrokersInDualWrite() throws Exception {
new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient());
MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> { },
faultHandler,
quorumFeatures,
KafkaConfigSchema.EMPTY,
mockTime
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> { },
faultHandler,
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
Expand Down Expand Up @@ -467,6 +488,7 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
verifier.verify(driver, migrationClient, topicClient, configClient);
Expand Down