Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
import kafka.api._
import kafka.common._
import kafka.cluster.Broker
import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback}
import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback, ZkMigrationStateMetricName}
import kafka.coordinator.transaction.ZkProducerIdManager
import kafka.server._
import kafka.server.metadata.ZkFinalizedFeatureCache
Expand All @@ -44,6 +44,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.migration.ZkMigrationState
import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.KafkaScheduler
Expand Down Expand Up @@ -81,9 +82,11 @@ object KafkaController extends Logging {
private val ReplicasIneligibleToDeleteCountMetricName = "ReplicasIneligibleToDeleteCount"
private val ActiveBrokerCountMetricName = "ActiveBrokerCount"
private val FencedBrokerCountMetricName = "FencedBrokerCount"
private val ZkMigrationStateMetricName = "ZkMigrationState"

// package private for testing
private[controller] val MetricNames = Set(
ZkMigrationStateMetricName,
ActiveControllerCountMetricName,
OfflinePartitionsCountMetricName,
PreferredReplicaImbalanceCountMetricName,
Expand Down Expand Up @@ -172,6 +175,7 @@ class KafkaController(val config: KafkaConfig,
/* single-thread scheduler to clean expired tokens */
private val tokenCleanScheduler = new KafkaScheduler(1, true, "delegation-token-cleaner")

metricsGroup.newGauge(ZkMigrationStateMetricName, () => ZkMigrationState.ZK)
metricsGroup.newGauge(ActiveControllerCountMetricName, () => if (isActive) 1 else 0)
metricsGroup.newGauge(OfflinePartitionsCountMetricName, () => offlinePartitionCount)
metricsGroup.newGauge(PreferredReplicaImbalanceCountMetricName, () => preferredReplicaImbalanceCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,8 @@ public static List<ApiMessageAndVersion> generateActivationRecords(
"has been completed.");
}
break;
default:
throw new IllegalStateException("Unsupported ZkMigrationState " + featureControl.zkMigrationState());
}
} else {
if (zkMigrationEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
"KafkaController", "FencedBrokerCount");
private final static MetricName ACTIVE_BROKER_COUNT = getMetricName(
"KafkaController", "ActiveBrokerCount");
private final static MetricName MIGRATING_ZK_BROKER_COUNT = getMetricName(
"KafkaController", "MigratingZkBrokerCount");
private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName(
"KafkaController", "GlobalTopicCount");
private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName(
Expand All @@ -55,6 +57,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private final Optional<MetricsRegistry> registry;
private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
private final AtomicInteger activeBrokerCount = new AtomicInteger(0);
private final AtomicInteger migratingZkBrokerCount = new AtomicInteger(0);
private final AtomicInteger globalTopicCount = new AtomicInteger(0);
private final AtomicInteger globalPartitionCount = new AtomicInteger(0);
private final AtomicInteger offlinePartitionCount = new AtomicInteger(0);
Expand All @@ -65,7 +68,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
/**
* Create a new ControllerMetadataMetrics object.
*
* @param registry The metrics registry, or Optional.empty if this is a test and we don't have one.
* @param registry The metrics registry, or Optional.empty if this is a test and we don't have one.
*/
public ControllerMetadataMetrics(Optional<MetricsRegistry> registry) {
this.registry = registry;
Expand Down Expand Up @@ -117,6 +120,14 @@ public Integer value() {
return (int) zkMigrationState();
}
}));

registry.ifPresent(r -> r.newGauge(MIGRATING_ZK_BROKER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return migratingZkBrokerCount();
}
}));

}

public void setFencedBrokerCount(int brokerCount) {
Expand All @@ -143,6 +154,18 @@ public int activeBrokerCount() {
return this.activeBrokerCount.get();
}

public void setMigratingZkBrokerCount(int brokerCount) {
this.migratingZkBrokerCount.set(brokerCount);
}

public void addToMigratingZkBrokerCount(int brokerCountDelta) {
this.migratingZkBrokerCount.addAndGet(brokerCountDelta);
}

public int migratingZkBrokerCount() {
return this.migratingZkBrokerCount.get();
}

public void setGlobalTopicCount(int topicCount) {
this.globalTopicCount.set(topicCount);
}
Expand Down Expand Up @@ -212,6 +235,7 @@ public void close() {
registry.ifPresent(r -> Arrays.asList(
FENCED_BROKER_COUNT,
ACTIVE_BROKER_COUNT,
MIGRATING_ZK_BROKER_COUNT,
GLOBAL_TOPIC_COUNT,
GLOBAL_PARTITION_COUNT,
OFFLINE_PARTITION_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,21 @@ private void publishSnapshot(MetadataImage newImage) {
metrics.setGlobalTopicCount(newImage.topics().topicsById().size());
int fencedBrokers = 0;
int activeBrokers = 0;
int zkBrokers = 0;
for (BrokerRegistration broker : newImage.cluster().brokers().values()) {
if (broker.fenced()) {
fencedBrokers++;
} else {
activeBrokers++;
}
if (broker.isMigratingZkBroker()) {
zkBrokers++;
}
}
metrics.setFencedBrokerCount(fencedBrokers);
metrics.setActiveBrokerCount(activeBrokers);
metrics.setMigratingZkBrokerCount(zkBrokers);

int totalPartitions = 0;
int offlinePartitions = 0;
int partitionsWithoutPreferredLeader = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ static int delta(boolean prev, boolean next) {

private int fencedBrokersChange = 0;
private int activeBrokersChange = 0;
private int migratingZkBrokersChange = 0;
private int globalTopicsChange = 0;
private int globalPartitionsChange = 0;
private int offlinePartitionsChange = 0;
Expand All @@ -56,6 +57,10 @@ public int activeBrokersChange() {
return activeBrokersChange;
}

public int migratingZkBrokersChange() {
return migratingZkBrokersChange;
}

public int globalTopicsChange() {
return globalTopicsChange;
}
Expand All @@ -75,18 +80,23 @@ public int partitionsWithoutPreferredLeaderChange() {
void handleBrokerChange(BrokerRegistration prev, BrokerRegistration next) {
boolean wasFenced = false;
boolean wasActive = false;
boolean wasZk = false;
if (prev != null) {
wasFenced = prev.fenced();
wasActive = !prev.fenced();
wasZk = prev.isMigratingZkBroker();
}
boolean isFenced = false;
boolean isActive = false;
boolean isZk = false;
if (next != null) {
isFenced = next.fenced();
isActive = !next.fenced();
isZk = next.isMigratingZkBroker();
}
fencedBrokersChange += delta(wasFenced, isFenced);
activeBrokersChange += delta(wasActive, isActive);
migratingZkBrokersChange += delta(wasZk, isZk);
}

void handleDeletedTopic(TopicImage deletedTopic) {
Expand Down Expand Up @@ -141,6 +151,9 @@ void apply(ControllerMetadataMetrics metrics) {
if (activeBrokersChange != 0) {
metrics.addToActiveBrokerCount(activeBrokersChange);
}
if (migratingZkBrokersChange != 0) {
metrics.addToMigratingZkBrokerCount(migratingZkBrokersChange);
}
if (globalTopicsChange != 0) {
metrics.addToGlobalTopicCount(globalTopicsChange);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private final static MetricName ZK_WRITE_BEHIND_LAG = getMetricName(
"KafkaController", "ZKWriteBehindLag");
"KafkaController", "ZkWriteBehindLag");
private final static MetricName ZK_WRITE_SNAPSHOT_TIME_MS = getMetricName(
"KafkaController", "ZkWriteSnapshotTimeMs");
private final static MetricName ZK_WRITE_DELTA_TIME_MS = getMetricName(
"KafkaController", "ZkWriteDeltaTimeMs");
private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
Expand All @@ -71,6 +75,9 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong dualWriteOffset = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
private final Consumer<Long> zkWriteSnapshotTimeHandler;
private final Consumer<Long> zkWriteDeltaTimeHandler;

private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
private final AtomicLong operationsStarted = new AtomicLong(0);
private final AtomicLong operationsTimedOut = new AtomicLong(0);
Expand All @@ -88,7 +95,7 @@ private Consumer<Long> newHistogram(MetricName name, boolean biased) {
public QuorumControllerMetrics(
Optional<MetricsRegistry> registry,
Time time,
boolean zkMigrationState
boolean zkMigrationEnabled
) {
this.registry = registry;
this.active = false;
Expand Down Expand Up @@ -148,7 +155,8 @@ public Long value() {
return newActiveControllers();
}
}));
if (zkMigrationState) {

if (zkMigrationEnabled) {
registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new Gauge<Long>() {
@Override
public Long value() {
Expand All @@ -158,6 +166,11 @@ public Long value() {
else return lastCommittedRecordOffset() - dualWriteOffset();
}
}));
this.zkWriteSnapshotTimeHandler = newHistogram(ZK_WRITE_SNAPSHOT_TIME_MS, true);
this.zkWriteDeltaTimeHandler = newHistogram(ZK_WRITE_DELTA_TIME_MS, true);
} else {
this.zkWriteSnapshotTimeHandler = __ -> { };
this.zkWriteDeltaTimeHandler = __ -> { };
}
}

Expand All @@ -177,6 +190,14 @@ public void updateEventQueueProcessingTime(long durationMs) {
eventQueueProcessingTimeUpdater.accept(durationMs);
}

public void updateZkWriteSnapshotTimeMs(long durationMs) {
zkWriteSnapshotTimeHandler.accept(durationMs);
}

public void updateZkWriteDeltaTimeMs(long durationMs) {
zkWriteDeltaTimeHandler.accept(durationMs);
}

public void setLastAppliedRecordOffset(long offset) {
lastAppliedRecordOffset.set(offset);
}
Expand Down Expand Up @@ -255,7 +276,9 @@ public void close() {
EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
NEW_ACTIVE_CONTROLLERS_COUNT,
ZK_WRITE_BEHIND_LAG
ZK_WRITE_BEHIND_LAG,
ZK_WRITE_SNAPSHOT_TIME_MS,
ZK_WRITE_DELTA_TIME_MS
).forEach(r::removeMetric));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,12 +486,17 @@ public void run() throws Exception {
}

Map<String, Integer> dualWriteCounts = new TreeMap<>();
long startTime = time.nanoseconds();
if (isSnapshot) {
zkMetadataWriter.handleSnapshot(image, countingOperationConsumer(
dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
controllerMetrics.updateZkWriteSnapshotTimeMs(NANOSECONDS.toMillis(time.nanoseconds() - startTime));
} else {
zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer(
dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
if (zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer(
dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation))) {
// Only record delta write time if we changed something. Otherwise, no-op records will skew timings.
controllerMetrics.updateZkWriteDeltaTimeMs(NANOSECONDS.toMillis(time.nanoseconds() - startTime));
}
}
if (dualWriteCounts.isEmpty()) {
log.trace("Did not make any ZK writes when handling KRaft {}", isSnapshot ? "snapshot" : "delta");
Expand Down Expand Up @@ -567,6 +572,8 @@ public void run() throws Exception {
log.error("KRaft controller indicates a completed migration, but the migration driver is somehow active.");
transitionTo(MigrationDriverState.INACTIVE);
break;
default:
throw new IllegalStateException("Unsupported ZkMigrationState " + zkMigrationState);
}
}
}
Expand Down Expand Up @@ -669,8 +676,11 @@ public void run() throws Exception {
if (migrationState == MigrationDriverState.SYNC_KRAFT_TO_ZK) {
log.info("Performing a full metadata sync from KRaft to ZK.");
Map<String, Integer> dualWriteCounts = new TreeMap<>();
long startTime = time.nanoseconds();
zkMetadataWriter.handleSnapshot(image, countingOperationConsumer(
dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
long endTime = time.nanoseconds();
controllerMetrics.updateZkWriteSnapshotTimeMs(NANOSECONDS.toMillis(startTime - endTime));
log.info("Made the following ZK writes when reconciling with KRaft state: {}", dualWriteCounts);
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,27 +93,34 @@ public void handleSnapshot(MetadataImage image, KRaftMigrationOperationConsumer
handleAclsSnapshot(image.acls(), operationConsumer);
}

public void handleDelta(
public boolean handleDelta(
MetadataImage previousImage,
MetadataImage image,
MetadataDelta delta,
KRaftMigrationOperationConsumer operationConsumer
) {
boolean updated = false;
if (delta.topicsDelta() != null) {
handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, image.topics(), delta.topicsDelta(), operationConsumer);
updated = true;
}
if (delta.configsDelta() != null) {
handleConfigsDelta(image.configs(), delta.configsDelta(), operationConsumer);
updated = true;
}
if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) {
handleClientQuotasDelta(image, delta, operationConsumer);
updated = true;
}
if (delta.producerIdsDelta() != null) {
handleProducerIdDelta(delta.producerIdsDelta(), operationConsumer);
updated = true;
}
if (delta.aclsDelta() != null) {
handleAclsDelta(image.acls(), delta.aclsDelta(), operationConsumer);
updated = true;
}
return updated;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ public enum ZkMigrationState {
* will persist indefinitely after the migration. In operational terms, this is the same as the NONE
* state.
*/
POST_MIGRATION((byte) 3);
POST_MIGRATION((byte) 3),

/**
* The controller is a ZK controller. No migration has been performed. This state is never persisted
* and is only used by KafkaController in order to have a unified metric that indicates what kind of
* metadata state the controller is in.
*/
ZK((byte) 4);

private final byte value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void testMetricNames() {
new HashSet<>(Arrays.asList(
"kafka.controller:type=KafkaController,name=ActiveBrokerCount",
"kafka.controller:type=KafkaController,name=FencedBrokerCount",
"kafka.controller:type=KafkaController,name=MigratingZkBrokerCount",
"kafka.controller:type=KafkaController,name=GlobalPartitionCount",
"kafka.controller:type=KafkaController,name=GlobalTopicCount",
"kafka.controller:type=KafkaController,name=MetadataErrorCount",
Expand Down
Loading