Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13986; Brokers should include node.id in fetches to metadata quorum #12498

Merged
merged 2 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture
import kafka.log.UnifiedLog
import kafka.raft.KafkaRaftManager.RaftIoThread
import kafka.server.{KafkaConfig, MetaProperties}
import kafka.server.KafkaRaftServer.ControllerRole
import kafka.utils.timer.SystemTimer
import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
Expand Down Expand Up @@ -181,12 +180,7 @@ class KafkaRaftManager[T](
val expirationTimer = new SystemTimer("raft-expiration-executor")
val expirationService = new TimingWheelExpirationService(expirationTimer)
val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state"))

val nodeId = if (config.processRoles.contains(ControllerRole)) {
OptionalInt.of(config.nodeId)
} else {
OptionalInt.empty()
}
val nodeId = OptionalInt.of(config.nodeId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did this to address this issue: https://issues.apache.org/jira/browse/KAFKA-13168. Do you think this is not needed anymore because we have this check in KafkaConfig?

        // nodeId must not appear in controller.quorum.voters
        require(!voterAddressSpecsByNodeId.containsKey(nodeId),
          s"If ${KafkaConfig.ProcessRolesProp} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNode  Id.asScala.keySet.toSet}")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is what I was thinking. Because we require the voter set to be consistently defined on all nodes, it seems like it is good enough to ensure that any node which starts with just the "broker" role is not among the voters. Do you agree with that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. It is best we can do at the moment. We can revisit this when we have replicaUuid and storageId from KIP-853.


val client = new KafkaRaftClient(
recordSerde,
Expand Down
13 changes: 13 additions & 0 deletions core/src/test/java/kafka/test/ClusterInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;

public interface ClusterInstance {

Expand All @@ -50,6 +51,18 @@ default boolean isKRaftTest() {
*/
ClusterConfig config();

/**
* Return the set of all controller IDs configured for this test. For kraft, this
* will return only the nodes which have the "controller" role enabled in `process.roles`.
* For zookeeper, this will return all broker IDs since they are all eligible controllers.
*/
Set<Integer> controllerIds();

/**
* Return the set of all broker IDs configured for this test.
*/
Set<Integer> brokerIds();

/**
* The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If
* unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -192,6 +193,20 @@ public ClusterConfig config() {
return clusterConfig;
}

@Override
public Set<Integer> controllerIds() {
return controllers()
.map(controllerServer -> controllerServer.config().nodeId())
.collect(Collectors.toSet());
}

@Override
public Set<Integer> brokerIds() {
return brokers()
.map(brokerServer -> brokerServer.config().nodeId())
.collect(Collectors.toSet());
}

@Override
public KafkaClusterTestKit getUnderlying() {
return clusterReference.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -257,6 +258,18 @@ public ClusterConfig config() {
return config;
}

@Override
public Set<Integer> controllerIds() {
return brokerIds();
}

@Override
public Set<Integer> brokerIds() {
return servers()
.map(brokerServer -> brokerServer.config().nodeId())
.collect(Collectors.toSet());
}

@Override
public IntegrationTestHarness getUnderlying() {
return clusterReference.get();
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,23 @@ class RaftManagerTest {
}

@Test
def testSentinelNodeIdIfBrokerRoleOnly(): Unit = {
def testNodeIdPresentIfBrokerRoleOnly(): Unit = {
val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "broker", "1")
assertFalse(raftManager.client.nodeId.isPresent)
assertEquals(1, raftManager.client.nodeId.getAsInt)
raftManager.shutdown()
}

@Test
def testNodeIdPresentIfControllerRoleOnly(): Unit = {
val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller", "1")
assertTrue(raftManager.client.nodeId.getAsInt == 1)
assertEquals(1, raftManager.client.nodeId.getAsInt)
raftManager.shutdown()
}

@Test
def testNodeIdPresentIfColocated(): Unit = {
val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller,broker", "1")
assertTrue(raftManager.client.nodeId.getAsInt == 1)
assertEquals(1, raftManager.client.nodeId.getAsInt)
raftManager.shutdown()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,12 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
assertTrue(leaderState.logEndOffset > 0)

val voterData = partitionData.currentVoters.asScala
assertEquals(cluster.controllerIds().asScala, voterData.map(_.replicaId).toSet);

val observerData = partitionData.observers.asScala
assertEquals(1, voterData.size)
assertEquals(0, observerData.size)
voterData.foreach { state =>
assertTrue(0 < state.replicaId)
assertEquals(cluster.brokerIds().asScala, observerData.map(_.replicaId).toSet);

(voterData ++ observerData).foreach { state =>
assertTrue(0 < state.logEndOffset)
assertEquals(-1, state.lastFetchTimestamp)
assertEquals(-1, state.lastCaughtUpTimestamp)
Expand Down