From 555744da7040f1ad91decc3cf2b813285af60aa2 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Thu, 4 Aug 2022 22:49:45 -0700 Subject: [PATCH] KAFKA-14124: improve quorum controller fault handling (#12447) Before trying to commit a batch of records to the __cluster_metadata log, the active controller should try to apply them to its current in-memory state. If this application process fails, the active controller process should exit, allowing another node to take leadership. This will prevent most bad metadata records from ending up in the log and help to surface errors during testing. Similarly, if the active controller attempts to renounce leadership, and the renunciation process itself fails, the process should exit. This will help avoid bugs where the active controller continues in an undefined state. In contrast, standby controllers that experience metadata application errors should continue on, in order to avoid a scenario where a bad record brings down the whole controller cluster. The intended effect of these changes is to make it harder to commit a bad record to the metadata log, but to continue to ride out the bad record as well as possible if such a record does get committed. This PR introduces the FaultHandler interface to implement these concepts. In junit tests, we use a FaultHandler implementation which does not exit the process. This allows us to avoid terminating the gradle test runner, which would be very disruptive. It also allows us to ensure that the test surfaces these exceptions, which we previously were not doing (the mock fault handler stores the exception). In addition to the above, this PR fixes a bug where RaftClient#resign was not being called from the renounce() function. This bug could have resulted in the raft layer not being informed of an active controller resigning. Reviewers: David Arthur --- build.gradle | 2 + checkstyle/import-control-core.xml | 1 + checkstyle/import-control.xml | 4 + checkstyle/suppressions.xml | 2 + .../scala/kafka/server/ControllerServer.scala | 10 +- .../scala/kafka/server/KafkaRaftServer.scala | 6 +- .../kafka/testkit/KafkaClusterTestKit.java | 36 +- .../kafka/server/QuorumTestHarness.scala | 6 + .../kafka/controller/QuorumController.java | 382 +++++++++++------- .../fault/MetadataFaultException.java | 32 ++ .../metadata/fault/MetadataFaultHandler.java | 36 ++ .../controller/QuorumControllerTest.java | 25 ++ .../controller/QuorumControllerTestEnv.java | 15 + .../kafka/server/fault/FaultHandler.java | 58 +++ .../fault/ProcessExitingFaultHandler.java | 37 ++ .../kafka/server/fault/MockFaultHandler.java | 65 +++ .../fault/MockFaultHandlerException.java | 38 ++ 17 files changed, 586 insertions(+), 169 deletions(-) create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java create mode 100644 server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java create mode 100644 server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java create mode 100644 server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java create mode 100644 server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java diff --git a/build.gradle b/build.gradle index 54068f2977f0..e1f773075a9a 100644 --- a/build.gradle +++ b/build.gradle @@ -897,6 +897,7 @@ project(':core') { testImplementation project(':clients').sourceSets.test.output testImplementation project(':metadata').sourceSets.test.output testImplementation project(':raft').sourceSets.test.output + testImplementation project(':server-common').sourceSets.test.output testImplementation libs.bcpkix testImplementation libs.mockitoCore testImplementation(libs.apacheda) { @@ -1179,6 +1180,7 @@ project(':metadata') { testImplementation libs.slf4jlog4j testImplementation project(':clients').sourceSets.test.output testImplementation project(':raft').sourceSets.test.output + testImplementation project(':server-common').sourceSets.test.output generator project(':generator') } diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index 28b325b093da..4042cba402fd 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -54,6 +54,7 @@ + diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 211d23ff60ae..4b07a26cba5c 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -232,6 +232,7 @@ + @@ -276,6 +277,9 @@ + + + diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index f6ca0d02fe31..bec3da1637a9 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -39,6 +39,8 @@ + controllerBuilder.setAuthorizer(a) diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index 07a311837204..e7cf8f8f1fab 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -29,9 +29,11 @@ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.utils.{AppInfoParser, Time} import org.apache.kafka.common.{KafkaException, Uuid} import org.apache.kafka.controller.BootstrapMetadata +import org.apache.kafka.metadata.fault.MetadataFaultHandler import org.apache.kafka.metadata.{KafkaConfigSchema, MetadataRecordSerde} import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion} +import org.apache.kafka.server.fault.ProcessExitingFaultHandler import org.apache.kafka.server.metrics.KafkaYammerMetrics import java.nio.file.Paths @@ -106,7 +108,9 @@ class KafkaRaftServer( controllerQuorumVotersFuture, KafkaRaftServer.configSchema, raftManager.apiVersions, - bootstrapMetadata + bootstrapMetadata, + new MetadataFaultHandler(), + new ProcessExitingFaultHandler(), )) } else { None diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index c961d71bbe58..42120324f5f8 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -40,6 +40,7 @@ import org.apache.kafka.raft.RaftConfig; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.fault.MockFaultHandler; import org.apache.kafka.test.TestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,6 +116,8 @@ public void close() { public static class Builder { private TestKitNodes nodes; private Map configProps = new HashMap<>(); + private MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler"); + private MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler"); public Builder(TestKitNodes nodes) { this.nodes = nodes; @@ -190,7 +193,9 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS connectFutureManager.future, KafkaRaftServer.configSchema(), raftManager.apiVersions(), - bootstrapMetadata + bootstrapMetadata, + metadataFaultHandler, + fatalFaultHandler ); controllers.put(node.id(), controller); controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> { @@ -273,7 +278,8 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS throw e; } return new KafkaClusterTestKit(executorService, nodes, controllers, - brokers, raftManagers, connectFutureManager, baseDirectory); + brokers, raftManagers, connectFutureManager, baseDirectory, + metadataFaultHandler, fatalFaultHandler); } private String listeners(int node) { @@ -314,14 +320,20 @@ static private void setupNodeDirectories(File baseDirectory, private final Map> raftManagers; private final ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager; private final File baseDirectory; - - private KafkaClusterTestKit(ExecutorService executorService, - TestKitNodes nodes, - Map controllers, - Map brokers, - Map> raftManagers, - ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager, - File baseDirectory) { + private final MockFaultHandler metadataFaultHandler; + private final MockFaultHandler fatalFaultHandler; + + private KafkaClusterTestKit( + ExecutorService executorService, + TestKitNodes nodes, + Map controllers, + Map brokers, + Map> raftManagers, + ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager, + File baseDirectory, + MockFaultHandler metadataFaultHandler, + MockFaultHandler fatalFaultHandler + ) { this.executorService = executorService; this.nodes = nodes; this.controllers = controllers; @@ -329,6 +341,8 @@ private KafkaClusterTestKit(ExecutorService executorService, this.raftManagers = raftManagers; this.controllerQuorumVotersFutureManager = controllerQuorumVotersFutureManager; this.baseDirectory = baseDirectory; + this.metadataFaultHandler = metadataFaultHandler; + this.fatalFaultHandler = fatalFaultHandler; } public void format() throws Exception { @@ -520,6 +534,8 @@ public void close() throws Exception { executorService.shutdownNow(); executorService.awaitTermination(5, TimeUnit.MINUTES); } + metadataFaultHandler.maybeRethrowFirstException(); + fatalFaultHandler.maybeRethrowFirstException(); } private void waitForAllFutures(List>> futureEntries) diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index a2393cdccbcf..9894df9c5f74 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -36,6 +36,7 @@ import org.apache.kafka.controller.BootstrapMetadata import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec} import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion} +import org.apache.kafka.server.fault.MockFaultHandler import org.apache.zookeeper.client.ZKClientConfig import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper} import org.junit.jupiter.api.Assertions._ @@ -188,6 +189,8 @@ abstract class QuorumTestHarness extends Logging { } } + val faultHandler = new MockFaultHandler("quorumTestHarnessFaultHandler") + // Note: according to the junit documentation: "JUnit Jupiter does not guarantee the execution // order of multiple @BeforeEach methods that are declared within a single test class or test // interface." Therefore, if you have things you would like to do before each test case runs, it @@ -308,6 +311,8 @@ abstract class QuorumTestHarness extends Logging { configSchema = KafkaRaftServer.configSchema, raftApiVersions = raftManager.apiVersions, bootstrapMetadata = BootstrapMetadata.create(metadataVersion, bootstrapRecords.asJava), + metadataFaultHandler = faultHandler, + fatalFaultHandler = faultHandler, ) controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => { if (e != null) { @@ -374,6 +379,7 @@ abstract class QuorumTestHarness extends Logging { } System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) Configuration.setConfiguration(null) + faultHandler.maybeRethrowFirstException() } // Trigger session expiry by reusing the session id in another client diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 0290e0040c2d..a4cc1d92cba4 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -91,6 +91,7 @@ import org.apache.kafka.server.authorizer.AclDeleteResult; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.fault.FaultHandler; import org.apache.kafka.server.policy.AlterConfigPolicy; import org.apache.kafka.server.policy.CreateTopicPolicy; import org.apache.kafka.snapshot.SnapshotReader; @@ -149,6 +150,8 @@ public final class QuorumController implements Controller { static public class Builder { private final int nodeId; private final String clusterId; + private FaultHandler fatalFaultHandler = null; + private FaultHandler metadataFaultHandler = null; private Time time = Time.SYSTEM; private String threadNamePrefix = null; private LogContext logContext = null; @@ -175,6 +178,16 @@ public Builder(int nodeId, String clusterId) { this.clusterId = clusterId; } + public Builder setFatalFaultHandler(FaultHandler fatalFaultHandler) { + this.fatalFaultHandler = fatalFaultHandler; + return this; + } + + public Builder setMetadataFaultHandler(FaultHandler metadataFaultHandler) { + this.metadataFaultHandler = metadataFaultHandler; + return this; + } + public int nodeId() { return nodeId; } @@ -287,6 +300,10 @@ public QuorumController build() throws Exception { throw new IllegalStateException("You must specify an initial metadata.version using the kafka-storage tool."); } else if (quorumFeatures == null) { throw new IllegalStateException("You must specify the quorum features"); + } else if (fatalFaultHandler == null) { + throw new IllegalStateException("You must specify a fatal fault handler."); + } else if (metadataFaultHandler == null) { + throw new IllegalStateException("You must specify a metadata fault handler."); } if (threadNamePrefix == null) { @@ -304,6 +321,8 @@ public QuorumController build() throws Exception { try { queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController"); return new QuorumController( + fatalFaultHandler, + metadataFaultHandler, logContext, nodeId, clusterId, @@ -426,12 +445,18 @@ private Throwable handleEventException(String name, exception.getClass().getSimpleName(), deltaUs); return exception; } - log.warn("{}: failed with unknown server exception {} at epoch {} in {} us. " + - "Renouncing leadership and reverting to the last committed offset {}.", - name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs, - lastCommittedOffset, exception); - raftClient.resign(curClaimEpoch); - renounce(); + if (isActiveController()) { + log.warn("{}: failed with unknown server exception {} at epoch {} in {} us. " + + "Renouncing leadership and reverting to the last committed offset {}.", + name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs, + lastCommittedOffset, exception); + renounce(); + } else { + log.warn("{}: failed with unknown server exception {} in {} us. " + + "The controller is already in standby mode.", + name, exception.getClass().getSimpleName(), deltaUs, + exception); + } return new UnknownServerException(exception); } @@ -702,7 +727,7 @@ public void run() throws Exception { long now = time.nanoseconds(); controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs)); int controllerEpoch = curClaimEpoch; - if (controllerEpoch == -1) { + if (!isActiveController()) { throw newNotControllerException(); } startProcessingTimeNs = OptionalLong.of(now); @@ -728,9 +753,26 @@ public void run() throws Exception { "reaches offset {}", this, resultAndOffset.offset()); } } else { - // If the operation returned a batch of records, those records need to be - // written before we can return our result to the user. Here, we hand off - // the batch of records to the raft client. They will be written out + // Start by trying to apply the record to our in-memory state. This should always + // succeed; if it does not, that's a fatal error. It is important to do this before + // scheduling the record for Raft replication. + int i = 1; + for (ApiMessageAndVersion message : result.records()) { + try { + replay(message.message(), Optional.empty()); + } catch (Throwable e) { + String failureMessage = String.format("Unable to apply %s record, which was " + + "%d of %d record(s) in the batch following last writeOffset %d.", + message.message().getClass().getSimpleName(), i, result.records().size(), + writeOffset); + fatalFaultHandler.handleFault(failureMessage, e); + } + i++; + } + + // If the operation returned a batch of records, and those records could be applied, + // they need to be written before we can return our result to the user. Here, we + // hand off the batch of records to the raft client. They will be written out // asynchronously. final long offset; if (result.isAtomic()) { @@ -741,9 +783,6 @@ public void run() throws Exception { op.processBatchEndOffset(offset); updateWriteOffset(offset); resultAndOffset = ControllerResultAndOffset.of(offset, result); - for (ApiMessageAndVersion message : result.records()) { - replay(message.message(), Optional.empty(), offset); - } snapshotRegistry.getOrCreateSnapshot(offset); log.debug("Read-write operation {} will be completed when the log " + @@ -789,9 +828,9 @@ private CompletableFuture prependWriteEvent(String name, return event.future(); } - private CompletableFuture appendWriteEvent(String name, - OptionalLong deadlineNs, - ControllerWriteOperation op) { + CompletableFuture appendWriteEvent(String name, + OptionalLong deadlineNs, + ControllerWriteOperation op) { ControllerWriteEvent event = new ControllerWriteEvent<>(name, op); if (deadlineNs.isPresent()) { queue.appendWithDeadline(deadlineNs.getAsLong(), event); @@ -841,11 +880,20 @@ public void handleCommit(BatchReader reader) { "offset {} and epoch {}.", offset, epoch); } } - for (ApiMessageAndVersion messageAndVersion : messages) { - replay(messageAndVersion.message(), Optional.empty(), offset); + int i = 1; + for (ApiMessageAndVersion message : messages) { + try { + replay(message.message(), Optional.empty()); + } catch (Throwable e) { + String failureMessage = String.format("Unable to apply %s record on standby " + + "controller, which was %d of %d record(s) in the batch with baseOffset %d.", + message.message().getClass().getSimpleName(), i, messages.size(), + batch.baseOffset()); + metadataFaultHandler.handleFault(failureMessage, e); + } + i++; } } - updateLastCommittedState(offset, epoch, batch.appendTimestamp()); processedRecordsSize += batch.sizeInBytes(); } @@ -862,13 +910,9 @@ public void handleSnapshot(SnapshotReader reader) { appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> { try { if (isActiveController()) { - throw new IllegalStateException( - String.format( - "Asked to load snapshot (%s) when it is the active controller (%d)", - reader.snapshotId(), - curClaimEpoch - ) - ); + fatalFaultHandler.handleFault(String.format("Asked to load snapshot " + + "(%s) when it is the active controller (%d)", reader.snapshotId(), + curClaimEpoch)); } log.info("Starting to replay snapshot ({}), from last commit offset ({}) and epoch ({})", reader.snapshotId(), lastCommittedOffset, lastCommittedEpoch); @@ -882,26 +926,28 @@ public void handleSnapshot(SnapshotReader reader) { if (log.isDebugEnabled()) { if (log.isTraceEnabled()) { - log.trace( - "Replaying snapshot ({}) batch with last offset of {}: {}", - reader.snapshotId(), - offset, - messages - .stream() - .map(ApiMessageAndVersion::toString) - .collect(Collectors.joining(", ")) - ); + log.trace("Replaying snapshot ({}) batch with last offset of {}: {}", + reader.snapshotId(), offset, messages.stream().map(ApiMessageAndVersion::toString). + collect(Collectors.joining(", "))); } else { - log.debug( - "Replaying snapshot ({}) batch with last offset of {}", - reader.snapshotId(), - offset - ); + log.debug("Replaying snapshot ({}) batch with last offset of {}", + reader.snapshotId(), offset); } } - for (ApiMessageAndVersion messageAndVersion : messages) { - replay(messageAndVersion.message(), Optional.of(reader.snapshotId()), offset); + int i = 1; + for (ApiMessageAndVersion message : messages) { + try { + replay(message.message(), Optional.of(reader.snapshotId())); + } catch (Throwable e) { + String failureMessage = String.format("Unable to apply %s record " + + "from snapshot %s on standby controller, which was %d of " + + "%d record(s) in the batch with baseOffset %d.", + message.message().getClass().getSimpleName(), reader.snapshotId(), + i, messages.size(), batch.baseOffset()); + metadataFaultHandler.handleFault(failureMessage, e); + } + i++; } } updateLastCommittedState( @@ -968,10 +1014,14 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) { if (exception != null) { log.error("Failed to bootstrap metadata.", exception); appendRaftEvent("bootstrapMetadata[" + curClaimEpoch + "]", () -> { - log.warn("Renouncing the leadership at oldEpoch {} since we could not bootstrap " + - "metadata. Reverting to last committed offset {}.", - curClaimEpoch, lastCommittedOffset); - renounce(); + if (isActiveController()) { + log.warn("Renouncing the leadership at oldEpoch {} since we could not bootstrap " + + "metadata. Reverting to last committed offset {}.", + curClaimEpoch, lastCommittedOffset); + renounce(); + } else { + log.warn("Unable to bootstrap metadata on standby controller."); + } }); } }); @@ -998,10 +1048,12 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) { }); } else if (isActiveController()) { appendRaftEvent("handleRenounce[" + curClaimEpoch + "]", () -> { - log.warn("Renouncing the leadership at oldEpoch {} due to a metadata " + - "log event. Reverting to last committed offset {}.", curClaimEpoch, - lastCommittedOffset); - renounce(); + if (isActiveController()) { + log.warn("Renouncing the leadership at oldEpoch {} due to a metadata " + + "log event. Reverting to last committed offset {}.", curClaimEpoch, + lastCommittedOffset); + renounce(); + } }); } } @@ -1078,26 +1130,34 @@ private void updateLastCommittedState(long offset, int epoch, long timestamp) { } private void renounce() { - curClaimEpoch = -1; - controllerMetrics.setActive(false); - purgatory.failAll(newNotControllerException()); - - if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) { - newBytesSinceLastSnapshot = 0; - snapshotRegistry.revertToSnapshot(lastCommittedOffset); - authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl())); - } else { - resetState(); - raftClient.unregister(metaLogListener); - metaLogListener = new QuorumMetaLogListener(); - raftClient.register(metaLogListener); + try { + if (curClaimEpoch == -1) { + throw new RuntimeException("Cannot renounce leadership because we are not the " + + "current leader."); + } + raftClient.resign(curClaimEpoch); + curClaimEpoch = -1; + controllerMetrics.setActive(false); + purgatory.failAll(newNotControllerException()); + + if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) { + newBytesSinceLastSnapshot = 0; + snapshotRegistry.revertToSnapshot(lastCommittedOffset); + authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl())); + } else { + resetState(); + raftClient.unregister(metaLogListener); + metaLogListener = new QuorumMetaLogListener(); + raftClient.register(metaLogListener); + } + updateWriteOffset(-1); + clusterControl.deactivate(); + cancelMaybeFenceReplicas(); + cancelMaybeBalancePartitionLeaders(); + cancelNextWriteNoOpRecord(); + } catch (Throwable e) { + fatalFaultHandler.handleFault("exception while renouncing leadership", e); } - - updateWriteOffset(-1); - clusterControl.deactivate(); - cancelMaybeFenceReplicas(); - cancelMaybeBalancePartitionLeaders(); - cancelNextWriteNoOpRecord(); } private void scheduleDeferredWriteEvent(String name, long deadlineNs, @@ -1246,70 +1306,60 @@ private void handleFeatureControlChange() { } @SuppressWarnings("unchecked") - private void replay(ApiMessage message, Optional snapshotId, long offset) { - try { - MetadataRecordType type = MetadataRecordType.fromId(message.apiKey()); - switch (type) { - case REGISTER_BROKER_RECORD: - clusterControl.replay((RegisterBrokerRecord) message); - break; - case UNREGISTER_BROKER_RECORD: - clusterControl.replay((UnregisterBrokerRecord) message); - break; - case TOPIC_RECORD: - replicationControl.replay((TopicRecord) message); - break; - case PARTITION_RECORD: - replicationControl.replay((PartitionRecord) message); - break; - case CONFIG_RECORD: - configurationControl.replay((ConfigRecord) message); - break; - case PARTITION_CHANGE_RECORD: - replicationControl.replay((PartitionChangeRecord) message); - break; - case FENCE_BROKER_RECORD: - clusterControl.replay((FenceBrokerRecord) message); - break; - case UNFENCE_BROKER_RECORD: - clusterControl.replay((UnfenceBrokerRecord) message); - break; - case REMOVE_TOPIC_RECORD: - replicationControl.replay((RemoveTopicRecord) message); - break; - case FEATURE_LEVEL_RECORD: - featureControl.replay((FeatureLevelRecord) message); - handleFeatureControlChange(); - break; - case CLIENT_QUOTA_RECORD: - clientQuotaControlManager.replay((ClientQuotaRecord) message); - break; - case PRODUCER_IDS_RECORD: - producerIdControlManager.replay((ProducerIdsRecord) message); - break; - case BROKER_REGISTRATION_CHANGE_RECORD: - clusterControl.replay((BrokerRegistrationChangeRecord) message); - break; - case ACCESS_CONTROL_ENTRY_RECORD: - aclControlManager.replay((AccessControlEntryRecord) message, snapshotId); - break; - case REMOVE_ACCESS_CONTROL_ENTRY_RECORD: - aclControlManager.replay((RemoveAccessControlEntryRecord) message, snapshotId); - break; - case NO_OP_RECORD: - // NoOpRecord is an empty record and doesn't need to be replayed - break; - default: - throw new RuntimeException("Unhandled record type " + type); - } - } catch (Exception e) { - if (snapshotId.isPresent()) { - log.error("Error replaying record {} from snapshot {} at last offset {}.", - message.toString(), snapshotId.get(), offset, e); - } else { - log.error("Error replaying record {} at last offset {}.", - message.toString(), offset, e); - } + private void replay(ApiMessage message, Optional snapshotId) { + MetadataRecordType type = MetadataRecordType.fromId(message.apiKey()); + switch (type) { + case REGISTER_BROKER_RECORD: + clusterControl.replay((RegisterBrokerRecord) message); + break; + case UNREGISTER_BROKER_RECORD: + clusterControl.replay((UnregisterBrokerRecord) message); + break; + case TOPIC_RECORD: + replicationControl.replay((TopicRecord) message); + break; + case PARTITION_RECORD: + replicationControl.replay((PartitionRecord) message); + break; + case CONFIG_RECORD: + configurationControl.replay((ConfigRecord) message); + break; + case PARTITION_CHANGE_RECORD: + replicationControl.replay((PartitionChangeRecord) message); + break; + case FENCE_BROKER_RECORD: + clusterControl.replay((FenceBrokerRecord) message); + break; + case UNFENCE_BROKER_RECORD: + clusterControl.replay((UnfenceBrokerRecord) message); + break; + case REMOVE_TOPIC_RECORD: + replicationControl.replay((RemoveTopicRecord) message); + break; + case FEATURE_LEVEL_RECORD: + featureControl.replay((FeatureLevelRecord) message); + handleFeatureControlChange(); + break; + case CLIENT_QUOTA_RECORD: + clientQuotaControlManager.replay((ClientQuotaRecord) message); + break; + case PRODUCER_IDS_RECORD: + producerIdControlManager.replay((ProducerIdsRecord) message); + break; + case BROKER_REGISTRATION_CHANGE_RECORD: + clusterControl.replay((BrokerRegistrationChangeRecord) message); + break; + case ACCESS_CONTROL_ENTRY_RECORD: + aclControlManager.replay((AccessControlEntryRecord) message, snapshotId); + break; + case REMOVE_ACCESS_CONTROL_ENTRY_RECORD: + aclControlManager.replay((RemoveAccessControlEntryRecord) message, snapshotId); + break; + case NO_OP_RECORD: + // NoOpRecord is an empty record and doesn't need to be replayed + break; + default: + throw new RuntimeException("Unhandled record type " + type); } } @@ -1344,8 +1394,24 @@ private void resetState() { updateLastCommittedState(-1, -1, -1); } + /** + * Handles faults that should normally be fatal to the process. + */ + private final FaultHandler fatalFaultHandler; + + /** + * Handles faults in metadata handling that are normally not fatal. + */ + private final FaultHandler metadataFaultHandler; + + /** + * The slf4j log context, used to create new loggers. + */ private final LogContext logContext; + /** + * The slf4j logger. + */ private final Logger log; /** @@ -1530,28 +1596,34 @@ private enum ImbalanceSchedule { private final BootstrapMetadata bootstrapMetadata; - private QuorumController(LogContext logContext, - int nodeId, - String clusterId, - KafkaEventQueue queue, - Time time, - KafkaConfigSchema configSchema, - RaftClient raftClient, - QuorumFeatures quorumFeatures, - short defaultReplicationFactor, - int defaultNumPartitions, - ReplicaPlacer replicaPlacer, - long snapshotMaxNewRecordBytes, - OptionalLong leaderImbalanceCheckIntervalNs, - OptionalLong maxIdleIntervalNs, - long sessionTimeoutNs, - ControllerMetrics controllerMetrics, - Optional createTopicPolicy, - Optional alterConfigPolicy, - ConfigurationValidator configurationValidator, - Optional authorizer, - Map staticConfig, - BootstrapMetadata bootstrapMetadata) { + private QuorumController( + FaultHandler fatalFaultHandler, + FaultHandler metadataFaultHandler, + LogContext logContext, + int nodeId, + String clusterId, + KafkaEventQueue queue, + Time time, + KafkaConfigSchema configSchema, + RaftClient raftClient, + QuorumFeatures quorumFeatures, + short defaultReplicationFactor, + int defaultNumPartitions, + ReplicaPlacer replicaPlacer, + long snapshotMaxNewRecordBytes, + OptionalLong leaderImbalanceCheckIntervalNs, + OptionalLong maxIdleIntervalNs, + long sessionTimeoutNs, + ControllerMetrics controllerMetrics, + Optional createTopicPolicy, + Optional alterConfigPolicy, + ConfigurationValidator configurationValidator, + Optional authorizer, + Map staticConfig, + BootstrapMetadata bootstrapMetadata + ) { + this.fatalFaultHandler = fatalFaultHandler; + this.metadataFaultHandler = metadataFaultHandler; this.logContext = logContext; this.log = logContext.logger(QuorumController.class); this.nodeId = nodeId; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java new file mode 100644 index 000000000000..c57ce46fb359 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.fault; + + +/** + * A fault that we encountered while we replayed cluster metadata. + */ +public class MetadataFaultException extends RuntimeException { + public MetadataFaultException(String message, Throwable cause) { + super(message, cause); + } + + public MetadataFaultException(String message) { + super(message); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java new file mode 100644 index 000000000000..e9f71b80e675 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.fault; + +import org.apache.kafka.server.fault.FaultHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Handles faults in Kafka metadata management. + */ +public class MetadataFaultHandler implements FaultHandler { + private static final Logger log = LoggerFactory.getLogger(MetadataFaultHandler.class); + + @Override + public void handleFault(String failureMessage, Throwable cause) { + FaultHandler.logFailureMessage(log, failureMessage, cause); + throw new MetadataFaultException("Encountered metadata fault: " + failureMessage, cause); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index a62b1f682f06..5e395cebcb03 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.message.RequestHeaderData; +import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.config.ConfigResource; @@ -1181,6 +1182,30 @@ public void testQuorumControllerCompletesAuthorizerInitialLoad() throws Throwabl } } + @Test + public void testFatalMetadataReplayErrorOnActive() throws Throwable { + try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) { + try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> { + })) { + QuorumController active = controlEnv.activeController(); + CompletableFuture future = active.appendWriteEvent("errorEvent", + OptionalLong.empty(), () -> { + return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion( + new ConfigRecord(). + setName(null). + setResourceName(null). + setResourceType((byte) 255). + setValue(null), (short) 0)), null); + }); + assertThrows(ExecutionException.class, () -> future.get()); + assertEquals(NullPointerException.class, + controlEnv.fatalFaultHandler().firstException().getCause().getClass()); + controlEnv.fatalFaultHandler().setIgnore(true); + controlEnv.metadataFaultHandler().setIgnore(true); + } + } + } + private static void assertInitialLoadFuturesNotComplete(List authorizers) { for (int i = 0; i < authorizers.size(); i++) { assertFalse(authorizers.get(i).initialLoadFuture().isDone(), diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java index 4cc45a9774b3..40dd21c88d33 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java @@ -34,6 +34,7 @@ import org.apache.kafka.metalog.LocalLogManagerTestEnv; import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.fault.MockFaultHandler; import org.apache.kafka.test.TestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +45,8 @@ public class QuorumControllerTestEnv implements AutoCloseable { private final List controllers; private final LocalLogManagerTestEnv logEnv; + private final MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler"); + private final MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler"); public QuorumControllerTestEnv( LocalLogManagerTestEnv logEnv, @@ -84,6 +87,8 @@ public QuorumControllerTestEnv( sessionTimeoutMillis.ifPresent(timeout -> { builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS)); }); + builder.setFatalFaultHandler(fatalFaultHandler); + builder.setMetadataFaultHandler(metadataFaultHandler); builderConsumer.accept(builder); this.controllers.add(builder.build()); } @@ -117,6 +122,14 @@ public List controllers() { return controllers; } + public MockFaultHandler fatalFaultHandler() { + return fatalFaultHandler; + } + + public MockFaultHandler metadataFaultHandler() { + return metadataFaultHandler; + } + @Override public void close() throws InterruptedException { for (QuorumController controller : controllers) { @@ -125,5 +138,7 @@ public void close() throws InterruptedException { for (QuorumController controller : controllers) { controller.close(); } + fatalFaultHandler.maybeRethrowFirstException(); + metadataFaultHandler.maybeRethrowFirstException(); } } diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java new file mode 100644 index 000000000000..4c03eacc32f3 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.fault; + +import org.slf4j.Logger; + + +/** + * Handle a server fault. + */ +public interface FaultHandler { + /** + * Handle a fault. + * + * @param failureMessage The failure message to log. + */ + default void handleFault(String failureMessage) { + handleFault(failureMessage, null); + } + + /** + * Handle a fault. + * + * @param failureMessage The failure message to log. + * @param cause The exception that caused the problem, or null. + */ + void handleFault(String failureMessage, Throwable cause); + + /** + * Log a failure message about a fault. + * + * @param log The log4j logger. + * @param failureMessage The failure message. + * @param cause The exception which caused the failure, or null. + */ + static void logFailureMessage(Logger log, String failureMessage, Throwable cause) { + if (cause == null) { + log.error("Encountered fatal fault: {}", failureMessage); + } else { + log.error("Encountered fatal fault: {}", failureMessage, cause); + } + } +} diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java new file mode 100644 index 000000000000..e3b9f25a3bea --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.fault; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kafka.common.utils.Exit; + + +/** + * This is a fault handler which exits the Java process. + */ +public class ProcessExitingFaultHandler implements FaultHandler { + private static final Logger log = LoggerFactory.getLogger(ProcessExitingFaultHandler.class); + + @Override + public void handleFault(String failureMessage, Throwable cause) { + FaultHandler.logFailureMessage(log, failureMessage, cause); + Exit.exit(1); + } +} diff --git a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java new file mode 100644 index 000000000000..39b3ed078471 --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.fault; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This is a fault handler suitable for use in JUnit tests. It will store the result of the first + * call to handleFault that was made. + */ +public class MockFaultHandler implements FaultHandler { + private static final Logger log = LoggerFactory.getLogger(MockFaultHandler.class); + + private final String name; + private MockFaultHandlerException firstException = null; + private boolean ignore = false; + + public MockFaultHandler(String name) { + this.name = name; + } + + @Override + public synchronized void handleFault(String failureMessage, Throwable cause) { + FaultHandler.logFailureMessage(log, failureMessage, cause); + MockFaultHandlerException e = (cause == null) ? + new MockFaultHandlerException(name + ": " + failureMessage) : + new MockFaultHandlerException(name + ": " + failureMessage + + ": " + cause.getMessage(), cause); + if (firstException == null) { + firstException = e; + } + throw e; + } + + public synchronized void maybeRethrowFirstException() { + if (firstException != null && !ignore) { + throw firstException; + } + } + + public synchronized MockFaultHandlerException firstException() { + return firstException; + } + + public synchronized void setIgnore(boolean ignore) { + this.ignore = ignore; + } +} diff --git a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java new file mode 100644 index 000000000000..ef9b11bdeb53 --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.fault; + + +/** + * An exception thrown by MockFaultHandler. + */ +public class MockFaultHandlerException extends RuntimeException { + public MockFaultHandlerException(String failureMessage, Throwable cause) { + super(failureMessage, cause); + // If a cause exception was provided, set our the stack trace its stack trace. This is + // useful in junit tests where a limited number of stack frames are printed, and usually + // the stack frames of cause exceptions get trimmed. + if (cause != null) { + setStackTrace(cause.getStackTrace()); + } + } + + public MockFaultHandlerException(String failureMessage) { + this(failureMessage, null); + } +}