Skip to content

Commit

Permalink
KAFKA-13986; Brokers should include node.id in fetches to metadata qu…
Browse files Browse the repository at this point in the history
…orum (#12498)

Currently we do not set the replicaId in fetches from brokers to the metadata quorum. It is useful to do so since that allows us to debug replication using the `DescribeQuorum` API.

Reviewers: dengziming <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
  • Loading branch information
Jason Gustafson committed Aug 11, 2022
1 parent 42a6078 commit 4422958
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 15 deletions.
8 changes: 1 addition & 7 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture
import kafka.log.UnifiedLog
import kafka.raft.KafkaRaftManager.RaftIoThread
import kafka.server.{KafkaConfig, MetaProperties}
import kafka.server.KafkaRaftServer.ControllerRole
import kafka.utils.timer.SystemTimer
import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
Expand Down Expand Up @@ -181,12 +180,7 @@ class KafkaRaftManager[T](
val expirationTimer = new SystemTimer("raft-expiration-executor")
val expirationService = new TimingWheelExpirationService(expirationTimer)
val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state"))

val nodeId = if (config.processRoles.contains(ControllerRole)) {
OptionalInt.of(config.nodeId)
} else {
OptionalInt.empty()
}
val nodeId = OptionalInt.of(config.nodeId)

val client = new KafkaRaftClient(
recordSerde,
Expand Down
13 changes: 13 additions & 0 deletions core/src/test/java/kafka/test/ClusterInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;

public interface ClusterInstance {

Expand All @@ -50,6 +51,18 @@ default boolean isKRaftTest() {
*/
ClusterConfig config();

/**
* Return the set of all controller IDs configured for this test. For kraft, this
* will return only the nodes which have the "controller" role enabled in `process.roles`.
* For zookeeper, this will return all broker IDs since they are all eligible controllers.
*/
Set<Integer> controllerIds();

/**
* Return the set of all broker IDs configured for this test.
*/
Set<Integer> brokerIds();

/**
* The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If
* unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -192,6 +193,20 @@ public ClusterConfig config() {
return clusterConfig;
}

@Override
public Set<Integer> controllerIds() {
return controllers()
.map(controllerServer -> controllerServer.config().nodeId())
.collect(Collectors.toSet());
}

@Override
public Set<Integer> brokerIds() {
return brokers()
.map(brokerServer -> brokerServer.config().nodeId())
.collect(Collectors.toSet());
}

@Override
public KafkaClusterTestKit getUnderlying() {
return clusterReference.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -257,6 +258,18 @@ public ClusterConfig config() {
return config;
}

@Override
public Set<Integer> controllerIds() {
return brokerIds();
}

@Override
public Set<Integer> brokerIds() {
return servers()
.map(brokerServer -> brokerServer.config().nodeId())
.collect(Collectors.toSet());
}

@Override
public IntegrationTestHarness getUnderlying() {
return clusterReference.get();
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,23 @@ class RaftManagerTest {
}

@Test
def testSentinelNodeIdIfBrokerRoleOnly(): Unit = {
def testNodeIdPresentIfBrokerRoleOnly(): Unit = {
val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "broker", "1")
assertFalse(raftManager.client.nodeId.isPresent)
assertEquals(1, raftManager.client.nodeId.getAsInt)
raftManager.shutdown()
}

@Test
def testNodeIdPresentIfControllerRoleOnly(): Unit = {
val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller", "1")
assertTrue(raftManager.client.nodeId.getAsInt == 1)
assertEquals(1, raftManager.client.nodeId.getAsInt)
raftManager.shutdown()
}

@Test
def testNodeIdPresentIfColocated(): Unit = {
val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller,broker", "1")
assertTrue(raftManager.client.nodeId.getAsInt == 1)
assertEquals(1, raftManager.client.nodeId.getAsInt)
raftManager.shutdown()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,12 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
assertTrue(leaderState.logEndOffset > 0)

val voterData = partitionData.currentVoters.asScala
assertEquals(cluster.controllerIds().asScala, voterData.map(_.replicaId).toSet);

val observerData = partitionData.observers.asScala
assertEquals(1, voterData.size)
assertEquals(0, observerData.size)
voterData.foreach { state =>
assertTrue(0 < state.replicaId)
assertEquals(cluster.brokerIds().asScala, observerData.map(_.replicaId).toSet);

(voterData ++ observerData).foreach { state =>
assertTrue(0 < state.logEndOffset)
assertEquals(-1, state.lastFetchTimestamp)
assertEquals(-1, state.lastCaughtUpTimestamp)
Expand Down

0 comments on commit 4422958

Please sign in to comment.