diff --git a/build.gradle b/build.gradle
index 54068f2977f0..e1f773075a9a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -897,6 +897,7 @@ project(':core') {
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':metadata').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output
+ testImplementation project(':server-common').sourceSets.test.output
testImplementation libs.bcpkix
testImplementation libs.mockitoCore
testImplementation(libs.apacheda) {
@@ -1179,6 +1180,7 @@ project(':metadata') {
testImplementation libs.slf4jlog4j
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output
+ testImplementation project(':server-common').sourceSets.test.output
generator project(':generator')
}
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 28b325b093da..4042cba402fd 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -54,6 +54,7 @@
+
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 211d23ff60ae..4b07a26cba5c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -232,6 +232,7 @@
+
@@ -276,6 +277,9 @@
+
+
+
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f6ca0d02fe31..bec3da1637a9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -39,6 +39,8 @@
+
controllerBuilder.setAuthorizer(a)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 07a311837204..e7cf8f8f1fab 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -29,9 +29,11 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.controller.BootstrapMetadata
+import org.apache.kafka.metadata.fault.MetadataFaultHandler
import org.apache.kafka.metadata.{KafkaConfigSchema, MetadataRecordSerde}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.fault.ProcessExitingFaultHandler
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import java.nio.file.Paths
@@ -106,7 +108,9 @@ class KafkaRaftServer(
controllerQuorumVotersFuture,
KafkaRaftServer.configSchema,
raftManager.apiVersions,
- bootstrapMetadata
+ bootstrapMetadata,
+ new MetadataFaultHandler(),
+ new ProcessExitingFaultHandler(),
))
} else {
None
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index c961d71bbe58..42120324f5f8 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -40,6 +40,7 @@
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -115,6 +116,8 @@ public void close() {
public static class Builder {
private TestKitNodes nodes;
private Map configProps = new HashMap<>();
+ private MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler");
+ private MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
public Builder(TestKitNodes nodes) {
this.nodes = nodes;
@@ -190,7 +193,9 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS
connectFutureManager.future,
KafkaRaftServer.configSchema(),
raftManager.apiVersions(),
- bootstrapMetadata
+ bootstrapMetadata,
+ metadataFaultHandler,
+ fatalFaultHandler
);
controllers.put(node.id(), controller);
controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
@@ -273,7 +278,8 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS
throw e;
}
return new KafkaClusterTestKit(executorService, nodes, controllers,
- brokers, raftManagers, connectFutureManager, baseDirectory);
+ brokers, raftManagers, connectFutureManager, baseDirectory,
+ metadataFaultHandler, fatalFaultHandler);
}
private String listeners(int node) {
@@ -314,14 +320,20 @@ static private void setupNodeDirectories(File baseDirectory,
private final Map> raftManagers;
private final ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager;
private final File baseDirectory;
-
- private KafkaClusterTestKit(ExecutorService executorService,
- TestKitNodes nodes,
- Map controllers,
- Map brokers,
- Map> raftManagers,
- ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager,
- File baseDirectory) {
+ private final MockFaultHandler metadataFaultHandler;
+ private final MockFaultHandler fatalFaultHandler;
+
+ private KafkaClusterTestKit(
+ ExecutorService executorService,
+ TestKitNodes nodes,
+ Map controllers,
+ Map brokers,
+ Map> raftManagers,
+ ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager,
+ File baseDirectory,
+ MockFaultHandler metadataFaultHandler,
+ MockFaultHandler fatalFaultHandler
+ ) {
this.executorService = executorService;
this.nodes = nodes;
this.controllers = controllers;
@@ -329,6 +341,8 @@ private KafkaClusterTestKit(ExecutorService executorService,
this.raftManagers = raftManagers;
this.controllerQuorumVotersFutureManager = controllerQuorumVotersFutureManager;
this.baseDirectory = baseDirectory;
+ this.metadataFaultHandler = metadataFaultHandler;
+ this.fatalFaultHandler = fatalFaultHandler;
}
public void format() throws Exception {
@@ -520,6 +534,8 @@ public void close() throws Exception {
executorService.shutdownNow();
executorService.awaitTermination(5, TimeUnit.MINUTES);
}
+ metadataFaultHandler.maybeRethrowFirstException();
+ fatalFaultHandler.maybeRethrowFirstException();
}
private void waitForAllFutures(List>> futureEntries)
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index a2393cdccbcf..9894df9c5f74 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.controller.BootstrapMetadata
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.fault.MockFaultHandler
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
import org.junit.jupiter.api.Assertions._
@@ -188,6 +189,8 @@ abstract class QuorumTestHarness extends Logging {
}
}
+ val faultHandler = new MockFaultHandler("quorumTestHarnessFaultHandler")
+
// Note: according to the junit documentation: "JUnit Jupiter does not guarantee the execution
// order of multiple @BeforeEach methods that are declared within a single test class or test
// interface." Therefore, if you have things you would like to do before each test case runs, it
@@ -308,6 +311,8 @@ abstract class QuorumTestHarness extends Logging {
configSchema = KafkaRaftServer.configSchema,
raftApiVersions = raftManager.apiVersions,
bootstrapMetadata = BootstrapMetadata.create(metadataVersion, bootstrapRecords.asJava),
+ metadataFaultHandler = faultHandler,
+ fatalFaultHandler = faultHandler,
)
controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => {
if (e != null) {
@@ -374,6 +379,7 @@ abstract class QuorumTestHarness extends Logging {
}
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
Configuration.setConfiguration(null)
+ faultHandler.maybeRethrowFirstException()
}
// Trigger session expiry by reusing the session id in another client
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 0290e0040c2d..a4cc1d92cba4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -91,6 +91,7 @@
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.snapshot.SnapshotReader;
@@ -149,6 +150,8 @@ public final class QuorumController implements Controller {
static public class Builder {
private final int nodeId;
private final String clusterId;
+ private FaultHandler fatalFaultHandler = null;
+ private FaultHandler metadataFaultHandler = null;
private Time time = Time.SYSTEM;
private String threadNamePrefix = null;
private LogContext logContext = null;
@@ -175,6 +178,16 @@ public Builder(int nodeId, String clusterId) {
this.clusterId = clusterId;
}
+ public Builder setFatalFaultHandler(FaultHandler fatalFaultHandler) {
+ this.fatalFaultHandler = fatalFaultHandler;
+ return this;
+ }
+
+ public Builder setMetadataFaultHandler(FaultHandler metadataFaultHandler) {
+ this.metadataFaultHandler = metadataFaultHandler;
+ return this;
+ }
+
public int nodeId() {
return nodeId;
}
@@ -287,6 +300,10 @@ public QuorumController build() throws Exception {
throw new IllegalStateException("You must specify an initial metadata.version using the kafka-storage tool.");
} else if (quorumFeatures == null) {
throw new IllegalStateException("You must specify the quorum features");
+ } else if (fatalFaultHandler == null) {
+ throw new IllegalStateException("You must specify a fatal fault handler.");
+ } else if (metadataFaultHandler == null) {
+ throw new IllegalStateException("You must specify a metadata fault handler.");
}
if (threadNamePrefix == null) {
@@ -304,6 +321,8 @@ public QuorumController build() throws Exception {
try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController");
return new QuorumController(
+ fatalFaultHandler,
+ metadataFaultHandler,
logContext,
nodeId,
clusterId,
@@ -426,12 +445,18 @@ private Throwable handleEventException(String name,
exception.getClass().getSimpleName(), deltaUs);
return exception;
}
- log.warn("{}: failed with unknown server exception {} at epoch {} in {} us. " +
- "Renouncing leadership and reverting to the last committed offset {}.",
- name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
- lastCommittedOffset, exception);
- raftClient.resign(curClaimEpoch);
- renounce();
+ if (isActiveController()) {
+ log.warn("{}: failed with unknown server exception {} at epoch {} in {} us. " +
+ "Renouncing leadership and reverting to the last committed offset {}.",
+ name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
+ lastCommittedOffset, exception);
+ renounce();
+ } else {
+ log.warn("{}: failed with unknown server exception {} in {} us. " +
+ "The controller is already in standby mode.",
+ name, exception.getClass().getSimpleName(), deltaUs,
+ exception);
+ }
return new UnknownServerException(exception);
}
@@ -702,7 +727,7 @@ public void run() throws Exception {
long now = time.nanoseconds();
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
int controllerEpoch = curClaimEpoch;
- if (controllerEpoch == -1) {
+ if (!isActiveController()) {
throw newNotControllerException();
}
startProcessingTimeNs = OptionalLong.of(now);
@@ -728,9 +753,26 @@ public void run() throws Exception {
"reaches offset {}", this, resultAndOffset.offset());
}
} else {
- // If the operation returned a batch of records, those records need to be
- // written before we can return our result to the user. Here, we hand off
- // the batch of records to the raft client. They will be written out
+ // Start by trying to apply the record to our in-memory state. This should always
+ // succeed; if it does not, that's a fatal error. It is important to do this before
+ // scheduling the record for Raft replication.
+ int i = 1;
+ for (ApiMessageAndVersion message : result.records()) {
+ try {
+ replay(message.message(), Optional.empty());
+ } catch (Throwable e) {
+ String failureMessage = String.format("Unable to apply %s record, which was " +
+ "%d of %d record(s) in the batch following last writeOffset %d.",
+ message.message().getClass().getSimpleName(), i, result.records().size(),
+ writeOffset);
+ fatalFaultHandler.handleFault(failureMessage, e);
+ }
+ i++;
+ }
+
+ // If the operation returned a batch of records, and those records could be applied,
+ // they need to be written before we can return our result to the user. Here, we
+ // hand off the batch of records to the raft client. They will be written out
// asynchronously.
final long offset;
if (result.isAtomic()) {
@@ -741,9 +783,6 @@ public void run() throws Exception {
op.processBatchEndOffset(offset);
updateWriteOffset(offset);
resultAndOffset = ControllerResultAndOffset.of(offset, result);
- for (ApiMessageAndVersion message : result.records()) {
- replay(message.message(), Optional.empty(), offset);
- }
snapshotRegistry.getOrCreateSnapshot(offset);
log.debug("Read-write operation {} will be completed when the log " +
@@ -789,9 +828,9 @@ private CompletableFuture prependWriteEvent(String name,
return event.future();
}
- private CompletableFuture appendWriteEvent(String name,
- OptionalLong deadlineNs,
- ControllerWriteOperation op) {
+ CompletableFuture appendWriteEvent(String name,
+ OptionalLong deadlineNs,
+ ControllerWriteOperation op) {
ControllerWriteEvent event = new ControllerWriteEvent<>(name, op);
if (deadlineNs.isPresent()) {
queue.appendWithDeadline(deadlineNs.getAsLong(), event);
@@ -841,11 +880,20 @@ public void handleCommit(BatchReader reader) {
"offset {} and epoch {}.", offset, epoch);
}
}
- for (ApiMessageAndVersion messageAndVersion : messages) {
- replay(messageAndVersion.message(), Optional.empty(), offset);
+ int i = 1;
+ for (ApiMessageAndVersion message : messages) {
+ try {
+ replay(message.message(), Optional.empty());
+ } catch (Throwable e) {
+ String failureMessage = String.format("Unable to apply %s record on standby " +
+ "controller, which was %d of %d record(s) in the batch with baseOffset %d.",
+ message.message().getClass().getSimpleName(), i, messages.size(),
+ batch.baseOffset());
+ metadataFaultHandler.handleFault(failureMessage, e);
+ }
+ i++;
}
}
-
updateLastCommittedState(offset, epoch, batch.appendTimestamp());
processedRecordsSize += batch.sizeInBytes();
}
@@ -862,13 +910,9 @@ public void handleSnapshot(SnapshotReader reader) {
appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
try {
if (isActiveController()) {
- throw new IllegalStateException(
- String.format(
- "Asked to load snapshot (%s) when it is the active controller (%d)",
- reader.snapshotId(),
- curClaimEpoch
- )
- );
+ fatalFaultHandler.handleFault(String.format("Asked to load snapshot " +
+ "(%s) when it is the active controller (%d)", reader.snapshotId(),
+ curClaimEpoch));
}
log.info("Starting to replay snapshot ({}), from last commit offset ({}) and epoch ({})",
reader.snapshotId(), lastCommittedOffset, lastCommittedEpoch);
@@ -882,26 +926,28 @@ public void handleSnapshot(SnapshotReader reader) {
if (log.isDebugEnabled()) {
if (log.isTraceEnabled()) {
- log.trace(
- "Replaying snapshot ({}) batch with last offset of {}: {}",
- reader.snapshotId(),
- offset,
- messages
- .stream()
- .map(ApiMessageAndVersion::toString)
- .collect(Collectors.joining(", "))
- );
+ log.trace("Replaying snapshot ({}) batch with last offset of {}: {}",
+ reader.snapshotId(), offset, messages.stream().map(ApiMessageAndVersion::toString).
+ collect(Collectors.joining(", ")));
} else {
- log.debug(
- "Replaying snapshot ({}) batch with last offset of {}",
- reader.snapshotId(),
- offset
- );
+ log.debug("Replaying snapshot ({}) batch with last offset of {}",
+ reader.snapshotId(), offset);
}
}
- for (ApiMessageAndVersion messageAndVersion : messages) {
- replay(messageAndVersion.message(), Optional.of(reader.snapshotId()), offset);
+ int i = 1;
+ for (ApiMessageAndVersion message : messages) {
+ try {
+ replay(message.message(), Optional.of(reader.snapshotId()));
+ } catch (Throwable e) {
+ String failureMessage = String.format("Unable to apply %s record " +
+ "from snapshot %s on standby controller, which was %d of " +
+ "%d record(s) in the batch with baseOffset %d.",
+ message.message().getClass().getSimpleName(), reader.snapshotId(),
+ i, messages.size(), batch.baseOffset());
+ metadataFaultHandler.handleFault(failureMessage, e);
+ }
+ i++;
}
}
updateLastCommittedState(
@@ -968,10 +1014,14 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
if (exception != null) {
log.error("Failed to bootstrap metadata.", exception);
appendRaftEvent("bootstrapMetadata[" + curClaimEpoch + "]", () -> {
- log.warn("Renouncing the leadership at oldEpoch {} since we could not bootstrap " +
- "metadata. Reverting to last committed offset {}.",
- curClaimEpoch, lastCommittedOffset);
- renounce();
+ if (isActiveController()) {
+ log.warn("Renouncing the leadership at oldEpoch {} since we could not bootstrap " +
+ "metadata. Reverting to last committed offset {}.",
+ curClaimEpoch, lastCommittedOffset);
+ renounce();
+ } else {
+ log.warn("Unable to bootstrap metadata on standby controller.");
+ }
});
}
});
@@ -998,10 +1048,12 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
});
} else if (isActiveController()) {
appendRaftEvent("handleRenounce[" + curClaimEpoch + "]", () -> {
- log.warn("Renouncing the leadership at oldEpoch {} due to a metadata " +
- "log event. Reverting to last committed offset {}.", curClaimEpoch,
- lastCommittedOffset);
- renounce();
+ if (isActiveController()) {
+ log.warn("Renouncing the leadership at oldEpoch {} due to a metadata " +
+ "log event. Reverting to last committed offset {}.", curClaimEpoch,
+ lastCommittedOffset);
+ renounce();
+ }
});
}
}
@@ -1078,26 +1130,34 @@ private void updateLastCommittedState(long offset, int epoch, long timestamp) {
}
private void renounce() {
- curClaimEpoch = -1;
- controllerMetrics.setActive(false);
- purgatory.failAll(newNotControllerException());
-
- if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
- newBytesSinceLastSnapshot = 0;
- snapshotRegistry.revertToSnapshot(lastCommittedOffset);
- authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
- } else {
- resetState();
- raftClient.unregister(metaLogListener);
- metaLogListener = new QuorumMetaLogListener();
- raftClient.register(metaLogListener);
+ try {
+ if (curClaimEpoch == -1) {
+ throw new RuntimeException("Cannot renounce leadership because we are not the " +
+ "current leader.");
+ }
+ raftClient.resign(curClaimEpoch);
+ curClaimEpoch = -1;
+ controllerMetrics.setActive(false);
+ purgatory.failAll(newNotControllerException());
+
+ if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
+ newBytesSinceLastSnapshot = 0;
+ snapshotRegistry.revertToSnapshot(lastCommittedOffset);
+ authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
+ } else {
+ resetState();
+ raftClient.unregister(metaLogListener);
+ metaLogListener = new QuorumMetaLogListener();
+ raftClient.register(metaLogListener);
+ }
+ updateWriteOffset(-1);
+ clusterControl.deactivate();
+ cancelMaybeFenceReplicas();
+ cancelMaybeBalancePartitionLeaders();
+ cancelNextWriteNoOpRecord();
+ } catch (Throwable e) {
+ fatalFaultHandler.handleFault("exception while renouncing leadership", e);
}
-
- updateWriteOffset(-1);
- clusterControl.deactivate();
- cancelMaybeFenceReplicas();
- cancelMaybeBalancePartitionLeaders();
- cancelNextWriteNoOpRecord();
}
private void scheduleDeferredWriteEvent(String name, long deadlineNs,
@@ -1246,70 +1306,60 @@ private void handleFeatureControlChange() {
}
@SuppressWarnings("unchecked")
- private void replay(ApiMessage message, Optional snapshotId, long offset) {
- try {
- MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
- switch (type) {
- case REGISTER_BROKER_RECORD:
- clusterControl.replay((RegisterBrokerRecord) message);
- break;
- case UNREGISTER_BROKER_RECORD:
- clusterControl.replay((UnregisterBrokerRecord) message);
- break;
- case TOPIC_RECORD:
- replicationControl.replay((TopicRecord) message);
- break;
- case PARTITION_RECORD:
- replicationControl.replay((PartitionRecord) message);
- break;
- case CONFIG_RECORD:
- configurationControl.replay((ConfigRecord) message);
- break;
- case PARTITION_CHANGE_RECORD:
- replicationControl.replay((PartitionChangeRecord) message);
- break;
- case FENCE_BROKER_RECORD:
- clusterControl.replay((FenceBrokerRecord) message);
- break;
- case UNFENCE_BROKER_RECORD:
- clusterControl.replay((UnfenceBrokerRecord) message);
- break;
- case REMOVE_TOPIC_RECORD:
- replicationControl.replay((RemoveTopicRecord) message);
- break;
- case FEATURE_LEVEL_RECORD:
- featureControl.replay((FeatureLevelRecord) message);
- handleFeatureControlChange();
- break;
- case CLIENT_QUOTA_RECORD:
- clientQuotaControlManager.replay((ClientQuotaRecord) message);
- break;
- case PRODUCER_IDS_RECORD:
- producerIdControlManager.replay((ProducerIdsRecord) message);
- break;
- case BROKER_REGISTRATION_CHANGE_RECORD:
- clusterControl.replay((BrokerRegistrationChangeRecord) message);
- break;
- case ACCESS_CONTROL_ENTRY_RECORD:
- aclControlManager.replay((AccessControlEntryRecord) message, snapshotId);
- break;
- case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
- aclControlManager.replay((RemoveAccessControlEntryRecord) message, snapshotId);
- break;
- case NO_OP_RECORD:
- // NoOpRecord is an empty record and doesn't need to be replayed
- break;
- default:
- throw new RuntimeException("Unhandled record type " + type);
- }
- } catch (Exception e) {
- if (snapshotId.isPresent()) {
- log.error("Error replaying record {} from snapshot {} at last offset {}.",
- message.toString(), snapshotId.get(), offset, e);
- } else {
- log.error("Error replaying record {} at last offset {}.",
- message.toString(), offset, e);
- }
+ private void replay(ApiMessage message, Optional snapshotId) {
+ MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+ switch (type) {
+ case REGISTER_BROKER_RECORD:
+ clusterControl.replay((RegisterBrokerRecord) message);
+ break;
+ case UNREGISTER_BROKER_RECORD:
+ clusterControl.replay((UnregisterBrokerRecord) message);
+ break;
+ case TOPIC_RECORD:
+ replicationControl.replay((TopicRecord) message);
+ break;
+ case PARTITION_RECORD:
+ replicationControl.replay((PartitionRecord) message);
+ break;
+ case CONFIG_RECORD:
+ configurationControl.replay((ConfigRecord) message);
+ break;
+ case PARTITION_CHANGE_RECORD:
+ replicationControl.replay((PartitionChangeRecord) message);
+ break;
+ case FENCE_BROKER_RECORD:
+ clusterControl.replay((FenceBrokerRecord) message);
+ break;
+ case UNFENCE_BROKER_RECORD:
+ clusterControl.replay((UnfenceBrokerRecord) message);
+ break;
+ case REMOVE_TOPIC_RECORD:
+ replicationControl.replay((RemoveTopicRecord) message);
+ break;
+ case FEATURE_LEVEL_RECORD:
+ featureControl.replay((FeatureLevelRecord) message);
+ handleFeatureControlChange();
+ break;
+ case CLIENT_QUOTA_RECORD:
+ clientQuotaControlManager.replay((ClientQuotaRecord) message);
+ break;
+ case PRODUCER_IDS_RECORD:
+ producerIdControlManager.replay((ProducerIdsRecord) message);
+ break;
+ case BROKER_REGISTRATION_CHANGE_RECORD:
+ clusterControl.replay((BrokerRegistrationChangeRecord) message);
+ break;
+ case ACCESS_CONTROL_ENTRY_RECORD:
+ aclControlManager.replay((AccessControlEntryRecord) message, snapshotId);
+ break;
+ case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+ aclControlManager.replay((RemoveAccessControlEntryRecord) message, snapshotId);
+ break;
+ case NO_OP_RECORD:
+ // NoOpRecord is an empty record and doesn't need to be replayed
+ break;
+ default:
+ throw new RuntimeException("Unhandled record type " + type);
}
}
@@ -1344,8 +1394,24 @@ private void resetState() {
updateLastCommittedState(-1, -1, -1);
}
+ /**
+ * Handles faults that should normally be fatal to the process.
+ */
+ private final FaultHandler fatalFaultHandler;
+
+ /**
+ * Handles faults in metadata handling that are normally not fatal.
+ */
+ private final FaultHandler metadataFaultHandler;
+
+ /**
+ * The slf4j log context, used to create new loggers.
+ */
private final LogContext logContext;
+ /**
+ * The slf4j logger.
+ */
private final Logger log;
/**
@@ -1530,28 +1596,34 @@ private enum ImbalanceSchedule {
private final BootstrapMetadata bootstrapMetadata;
- private QuorumController(LogContext logContext,
- int nodeId,
- String clusterId,
- KafkaEventQueue queue,
- Time time,
- KafkaConfigSchema configSchema,
- RaftClient raftClient,
- QuorumFeatures quorumFeatures,
- short defaultReplicationFactor,
- int defaultNumPartitions,
- ReplicaPlacer replicaPlacer,
- long snapshotMaxNewRecordBytes,
- OptionalLong leaderImbalanceCheckIntervalNs,
- OptionalLong maxIdleIntervalNs,
- long sessionTimeoutNs,
- ControllerMetrics controllerMetrics,
- Optional createTopicPolicy,
- Optional alterConfigPolicy,
- ConfigurationValidator configurationValidator,
- Optional authorizer,
- Map staticConfig,
- BootstrapMetadata bootstrapMetadata) {
+ private QuorumController(
+ FaultHandler fatalFaultHandler,
+ FaultHandler metadataFaultHandler,
+ LogContext logContext,
+ int nodeId,
+ String clusterId,
+ KafkaEventQueue queue,
+ Time time,
+ KafkaConfigSchema configSchema,
+ RaftClient raftClient,
+ QuorumFeatures quorumFeatures,
+ short defaultReplicationFactor,
+ int defaultNumPartitions,
+ ReplicaPlacer replicaPlacer,
+ long snapshotMaxNewRecordBytes,
+ OptionalLong leaderImbalanceCheckIntervalNs,
+ OptionalLong maxIdleIntervalNs,
+ long sessionTimeoutNs,
+ ControllerMetrics controllerMetrics,
+ Optional createTopicPolicy,
+ Optional alterConfigPolicy,
+ ConfigurationValidator configurationValidator,
+ Optional authorizer,
+ Map staticConfig,
+ BootstrapMetadata bootstrapMetadata
+ ) {
+ this.fatalFaultHandler = fatalFaultHandler;
+ this.metadataFaultHandler = metadataFaultHandler;
this.logContext = logContext;
this.log = logContext.logger(QuorumController.class);
this.nodeId = nodeId;
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
new file mode 100644
index 000000000000..c57ce46fb359
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.fault;
+
+
+/**
+ * A fault that we encountered while we replayed cluster metadata.
+ */
+public class MetadataFaultException extends RuntimeException {
+ public MetadataFaultException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MetadataFaultException(String message) {
+ super(message);
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
new file mode 100644
index 000000000000..e9f71b80e675
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.fault;
+
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles faults in Kafka metadata management.
+ */
+public class MetadataFaultHandler implements FaultHandler {
+ private static final Logger log = LoggerFactory.getLogger(MetadataFaultHandler.class);
+
+ @Override
+ public void handleFault(String failureMessage, Throwable cause) {
+ FaultHandler.logFailureMessage(log, failureMessage, cause);
+ throw new MetadataFaultException("Encountered metadata fault: " + failureMessage, cause);
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index a62b1f682f06..5e395cebcb03 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -45,6 +45,7 @@
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.config.ConfigResource;
@@ -1181,6 +1182,30 @@ public void testQuorumControllerCompletesAuthorizerInitialLoad() throws Throwabl
}
}
+ @Test
+ public void testFatalMetadataReplayErrorOnActive() throws Throwable {
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
+ try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
+ })) {
+ QuorumController active = controlEnv.activeController();
+ CompletableFuture future = active.appendWriteEvent("errorEvent",
+ OptionalLong.empty(), () -> {
+ return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(
+ new ConfigRecord().
+ setName(null).
+ setResourceName(null).
+ setResourceType((byte) 255).
+ setValue(null), (short) 0)), null);
+ });
+ assertThrows(ExecutionException.class, () -> future.get());
+ assertEquals(NullPointerException.class,
+ controlEnv.fatalFaultHandler().firstException().getCause().getClass());
+ controlEnv.fatalFaultHandler().setIgnore(true);
+ controlEnv.metadataFaultHandler().setIgnore(true);
+ }
+ }
+ }
+
private static void assertInitialLoadFuturesNotComplete(List authorizers) {
for (int i = 0; i < authorizers.size(); i++) {
assertFalse(authorizers.get(i).initialLoadFuture().isDone(),
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 4cc45a9774b3..40dd21c88d33 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -34,6 +34,7 @@
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +45,8 @@ public class QuorumControllerTestEnv implements AutoCloseable {
private final List controllers;
private final LocalLogManagerTestEnv logEnv;
+ private final MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
+ private final MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler");
public QuorumControllerTestEnv(
LocalLogManagerTestEnv logEnv,
@@ -84,6 +87,8 @@ public QuorumControllerTestEnv(
sessionTimeoutMillis.ifPresent(timeout -> {
builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
});
+ builder.setFatalFaultHandler(fatalFaultHandler);
+ builder.setMetadataFaultHandler(metadataFaultHandler);
builderConsumer.accept(builder);
this.controllers.add(builder.build());
}
@@ -117,6 +122,14 @@ public List controllers() {
return controllers;
}
+ public MockFaultHandler fatalFaultHandler() {
+ return fatalFaultHandler;
+ }
+
+ public MockFaultHandler metadataFaultHandler() {
+ return metadataFaultHandler;
+ }
+
@Override
public void close() throws InterruptedException {
for (QuorumController controller : controllers) {
@@ -125,5 +138,7 @@ public void close() throws InterruptedException {
for (QuorumController controller : controllers) {
controller.close();
}
+ fatalFaultHandler.maybeRethrowFirstException();
+ metadataFaultHandler.maybeRethrowFirstException();
}
}
diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
new file mode 100644
index 000000000000..4c03eacc32f3
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+
+
+/**
+ * Handle a server fault.
+ */
+public interface FaultHandler {
+ /**
+ * Handle a fault.
+ *
+ * @param failureMessage The failure message to log.
+ */
+ default void handleFault(String failureMessage) {
+ handleFault(failureMessage, null);
+ }
+
+ /**
+ * Handle a fault.
+ *
+ * @param failureMessage The failure message to log.
+ * @param cause The exception that caused the problem, or null.
+ */
+ void handleFault(String failureMessage, Throwable cause);
+
+ /**
+ * Log a failure message about a fault.
+ *
+ * @param log The log4j logger.
+ * @param failureMessage The failure message.
+ * @param cause The exception which caused the failure, or null.
+ */
+ static void logFailureMessage(Logger log, String failureMessage, Throwable cause) {
+ if (cause == null) {
+ log.error("Encountered fatal fault: {}", failureMessage);
+ } else {
+ log.error("Encountered fatal fault: {}", failureMessage, cause);
+ }
+ }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
new file mode 100644
index 000000000000..e3b9f25a3bea
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.common.utils.Exit;
+
+
+/**
+ * This is a fault handler which exits the Java process.
+ */
+public class ProcessExitingFaultHandler implements FaultHandler {
+ private static final Logger log = LoggerFactory.getLogger(ProcessExitingFaultHandler.class);
+
+ @Override
+ public void handleFault(String failureMessage, Throwable cause) {
+ FaultHandler.logFailureMessage(log, failureMessage, cause);
+ Exit.exit(1);
+ }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
new file mode 100644
index 000000000000..39b3ed078471
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a fault handler suitable for use in JUnit tests. It will store the result of the first
+ * call to handleFault that was made.
+ */
+public class MockFaultHandler implements FaultHandler {
+ private static final Logger log = LoggerFactory.getLogger(MockFaultHandler.class);
+
+ private final String name;
+ private MockFaultHandlerException firstException = null;
+ private boolean ignore = false;
+
+ public MockFaultHandler(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public synchronized void handleFault(String failureMessage, Throwable cause) {
+ FaultHandler.logFailureMessage(log, failureMessage, cause);
+ MockFaultHandlerException e = (cause == null) ?
+ new MockFaultHandlerException(name + ": " + failureMessage) :
+ new MockFaultHandlerException(name + ": " + failureMessage +
+ ": " + cause.getMessage(), cause);
+ if (firstException == null) {
+ firstException = e;
+ }
+ throw e;
+ }
+
+ public synchronized void maybeRethrowFirstException() {
+ if (firstException != null && !ignore) {
+ throw firstException;
+ }
+ }
+
+ public synchronized MockFaultHandlerException firstException() {
+ return firstException;
+ }
+
+ public synchronized void setIgnore(boolean ignore) {
+ this.ignore = ignore;
+ }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
new file mode 100644
index 000000000000..ef9b11bdeb53
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+
+/**
+ * An exception thrown by MockFaultHandler.
+ */
+public class MockFaultHandlerException extends RuntimeException {
+ public MockFaultHandlerException(String failureMessage, Throwable cause) {
+ super(failureMessage, cause);
+ // If a cause exception was provided, set our the stack trace its stack trace. This is
+ // useful in junit tests where a limited number of stack frames are printed, and usually
+ // the stack frames of cause exceptions get trimmed.
+ if (cause != null) {
+ setStackTrace(cause.getStackTrace());
+ }
+ }
+
+ public MockFaultHandlerException(String failureMessage) {
+ this(failureMessage, null);
+ }
+}