Skip to content

Commit

Permalink
KAFKA-14124: improve quorum controller fault handling (#12447)
Browse files Browse the repository at this point in the history
Before trying to commit a batch of records to the __cluster_metadata log, the active controller
should try to apply them to its current in-memory state. If this application process fails, the
active controller process should exit, allowing another node to take leadership. This will prevent
most bad metadata records from ending up in the log and help to surface errors during testing.

Similarly, if the active controller attempts to renounce leadership, and the renunciation process
itself fails, the process should exit. This will help avoid bugs where the active controller
continues in an undefined state.

In contrast, standby controllers that experience metadata application errors should continue on, in
order to avoid a scenario where a bad record brings down the whole controller cluster.  The
intended effect of these changes is to make it harder to commit a bad record to the metadata log,
but to continue to ride out the bad record as well as possible if such a record does get committed.

This PR introduces the FaultHandler interface to implement these concepts. In junit tests, we use a
FaultHandler implementation which does not exit the process. This allows us to avoid terminating
the gradle test runner, which would be very disruptive. It also allows us to ensure that the test
surfaces these exceptions, which we previously were not doing (the mock fault handler stores the
exception).

In addition to the above, this PR fixes a bug where RaftClient#resign was not being called from the
renounce() function. This bug could have resulted in the raft layer not being informed of an active
controller resigning.

Reviewers: David Arthur <mumrah@gmail.com>
  • Loading branch information
cmccabe authored Aug 5, 2022
1 parent add7cd8 commit 555744d
Show file tree
Hide file tree
Showing 17 changed files with 586 additions and 169 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,7 @@ project(':core') {
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':metadata').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
testImplementation libs.bcpkix
testImplementation libs.mockitoCore
testImplementation(libs.apacheda) {
Expand Down Expand Up @@ -1179,6 +1180,7 @@ project(':metadata') {
testImplementation libs.slf4jlog4j
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
generator project(':generator')
}

Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.fault" />
</subpackage>

<subpackage name="tools">
Expand Down
4 changes: 4 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.policy"/>
<allow pkg="org.apache.kafka.snapshot" />
Expand Down Expand Up @@ -276,6 +277,9 @@
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.metadata" />
</subpackage>
<subpackage name="fault">
<allow pkg="org.apache.kafka.server.fault" />
</subpackage>
</subpackage>

<subpackage name="metalog">
Expand Down
2 changes: 2 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>

<!-- Clients -->
<suppress id="dontUseSystemExit"
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util
import java.util.OptionalLong
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit}

import kafka.cluster.Broker.ServerInfo
import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
import kafka.network.{DataPlaneAcceptor, SocketServer}
Expand All @@ -46,6 +45,7 @@ import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}

Expand All @@ -65,7 +65,9 @@ class ControllerServer(
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
val configSchema: KafkaConfigSchema,
val raftApiVersions: ApiVersions,
val bootstrapMetadata: BootstrapMetadata
val bootstrapMetadata: BootstrapMetadata,
val metadataFaultHandler: FaultHandler,
val fatalFaultHandler: FaultHandler,
) extends Logging with KafkaMetricsGroup {
import kafka.server.Server._

Expand Down Expand Up @@ -204,7 +206,9 @@ class ControllerServer(
setAlterConfigPolicy(alterConfigPolicy.asJava).
setConfigurationValidator(new ControllerConfigurationValidator()).
setStaticConfig(config.originals).
setBootstrapMetadata(bootstrapMetadata)
setBootstrapMetadata(bootstrapMetadata).
setMetadataFaultHandler(metadataFaultHandler).
setFatalFaultHandler(fatalFaultHandler)
}
authorizer match {
case Some(a: ClusterMetadataAuthorizer) => controllerBuilder.setAuthorizer(a)
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/server/KafkaRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.controller.BootstrapMetadata
import org.apache.kafka.metadata.fault.MetadataFaultHandler
import org.apache.kafka.metadata.{KafkaConfigSchema, MetadataRecordSerde}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.fault.ProcessExitingFaultHandler
import org.apache.kafka.server.metrics.KafkaYammerMetrics

import java.nio.file.Paths
Expand Down Expand Up @@ -106,7 +108,9 @@ class KafkaRaftServer(
controllerQuorumVotersFuture,
KafkaRaftServer.configSchema,
raftManager.apiVersions,
bootstrapMetadata
bootstrapMetadata,
new MetadataFaultHandler(),
new ProcessExitingFaultHandler(),
))
} else {
None
Expand Down
36 changes: 26 additions & 10 deletions core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -115,6 +116,8 @@ public void close() {
public static class Builder {
private TestKitNodes nodes;
private Map<String, String> configProps = new HashMap<>();
private MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler");
private MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");

public Builder(TestKitNodes nodes) {
this.nodes = nodes;
Expand Down Expand Up @@ -190,7 +193,9 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS
connectFutureManager.future,
KafkaRaftServer.configSchema(),
raftManager.apiVersions(),
bootstrapMetadata
bootstrapMetadata,
metadataFaultHandler,
fatalFaultHandler
);
controllers.put(node.id(), controller);
controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
Expand Down Expand Up @@ -273,7 +278,8 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS
throw e;
}
return new KafkaClusterTestKit(executorService, nodes, controllers,
brokers, raftManagers, connectFutureManager, baseDirectory);
brokers, raftManagers, connectFutureManager, baseDirectory,
metadataFaultHandler, fatalFaultHandler);
}

private String listeners(int node) {
Expand Down Expand Up @@ -314,21 +320,29 @@ static private void setupNodeDirectories(File baseDirectory,
private final Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers;
private final ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager;
private final File baseDirectory;

private KafkaClusterTestKit(ExecutorService executorService,
TestKitNodes nodes,
Map<Integer, ControllerServer> controllers,
Map<Integer, BrokerServer> brokers,
Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers,
ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager,
File baseDirectory) {
private final MockFaultHandler metadataFaultHandler;
private final MockFaultHandler fatalFaultHandler;

private KafkaClusterTestKit(
ExecutorService executorService,
TestKitNodes nodes,
Map<Integer, ControllerServer> controllers,
Map<Integer, BrokerServer> brokers,
Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers,
ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager,
File baseDirectory,
MockFaultHandler metadataFaultHandler,
MockFaultHandler fatalFaultHandler
) {
this.executorService = executorService;
this.nodes = nodes;
this.controllers = controllers;
this.brokers = brokers;
this.raftManagers = raftManagers;
this.controllerQuorumVotersFutureManager = controllerQuorumVotersFutureManager;
this.baseDirectory = baseDirectory;
this.metadataFaultHandler = metadataFaultHandler;
this.fatalFaultHandler = fatalFaultHandler;
}

public void format() throws Exception {
Expand Down Expand Up @@ -520,6 +534,8 @@ public void close() throws Exception {
executorService.shutdownNow();
executorService.awaitTermination(5, TimeUnit.MINUTES);
}
metadataFaultHandler.maybeRethrowFirstException();
fatalFaultHandler.maybeRethrowFirstException();
}

private void waitForAllFutures(List<Entry<String, Future<?>>> futureEntries)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.kafka.controller.BootstrapMetadata
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.fault.MockFaultHandler
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -188,6 +189,8 @@ abstract class QuorumTestHarness extends Logging {
}
}

val faultHandler = new MockFaultHandler("quorumTestHarnessFaultHandler")

// Note: according to the junit documentation: "JUnit Jupiter does not guarantee the execution
// order of multiple @BeforeEach methods that are declared within a single test class or test
// interface." Therefore, if you have things you would like to do before each test case runs, it
Expand Down Expand Up @@ -308,6 +311,8 @@ abstract class QuorumTestHarness extends Logging {
configSchema = KafkaRaftServer.configSchema,
raftApiVersions = raftManager.apiVersions,
bootstrapMetadata = BootstrapMetadata.create(metadataVersion, bootstrapRecords.asJava),
metadataFaultHandler = faultHandler,
fatalFaultHandler = faultHandler,
)
controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => {
if (e != null) {
Expand Down Expand Up @@ -374,6 +379,7 @@ abstract class QuorumTestHarness extends Logging {
}
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
Configuration.setConfiguration(null)
faultHandler.maybeRethrowFirstException()
}

// Trigger session expiry by reusing the session id in another client
Expand Down
Loading

0 comments on commit 555744d

Please sign in to comment.