diff --git a/clients/src/main/java/org/apache/kafka/raft/internals/LogWriteFailureException.java b/clients/src/main/java/org/apache/kafka/raft/internals/LogWriteFailureException.java new file mode 100644 index 0000000000000..07af20b44ec69 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/raft/internals/LogWriteFailureException.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.KafkaException; + +/** + * Representing certain types of fatal exceptions thrown from + * {@link org.apache.kafka.raft} module. The Raft client will catch + * this exception and wrap it with a controlled shutdown to make the state transition smooth. + * The general handling logic is like: + * + * 1. + * + * + */ +public class LogWriteFailureException extends KafkaException { + + private final boolean shutdownNeeded; + + public LogWriteFailureException(String message, boolean shutdownNeeded) { + super(message); + this.shutdownNeeded = shutdownNeeded; + } + + public LogWriteFailureException(String message, Throwable cause, boolean shutdownNeeded) { + super(message, cause); + this.shutdownNeeded = shutdownNeeded; + } + + public boolean shutdownNeeded() { + return shutdownNeeded; + } +} diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 8d179dccacd02..e84e39813c215 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -226,6 +226,7 @@ class TestRaftServer( metadataLog, quorumState, time, + gracefulShutdownTimeoutMs, expirationService, logContext ) @@ -420,7 +421,7 @@ class TestRaftServer( override def initiateShutdown(): Boolean = { if (super.initiateShutdown()) { - client.shutdown(5000).whenComplete { (_, exception) => + client.shutdown(gracefulShutdownTimeoutMs).whenComplete { (_, exception) => if (exception != null) { error("Graceful shutdown of RaftClient failed", exception) } else { @@ -442,6 +443,8 @@ class TestRaftServer( object TestRaftServer extends Logging { + private val gracefulShutdownTimeoutMs = 5000 + case class PendingAppend( offset: Long, appendTimeMs: Long diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 66553f66f3332..2688141e29036 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -58,6 +58,7 @@ import org.apache.kafka.raft.internals.CloseListener; import org.apache.kafka.raft.internals.FuturePurgatory; import org.apache.kafka.raft.internals.KafkaRaftMetrics; +import org.apache.kafka.raft.internals.LogWriteFailureException; import org.apache.kafka.raft.internals.MemoryBatchReader; import org.apache.kafka.raft.internals.RecordsBatchReader; import org.apache.kafka.raft.internals.ThresholdPurgatory; @@ -130,6 +131,7 @@ public class KafkaRaftClient implements RaftClient { private final int electionBackoffMaxMs; private final int fetchMaxWaitMs; private final int appendLingerMs; + private final int gracefulShutdownTimeoutMs; private final KafkaRaftMetrics kafkaRaftMetrics; private final NetworkChannel channel; private final ReplicatedLog log; @@ -152,6 +154,7 @@ public KafkaRaftClient( ReplicatedLog log, QuorumState quorum, Time time, + int gracefulShutdownTimeoutMs, ExpirationService expirationService, LogContext logContext ) { @@ -169,6 +172,7 @@ public KafkaRaftClient( raftConfig.requestTimeoutMs(), 1000, raftConfig.appendLingerMs(), + gracefulShutdownTimeoutMs, logContext, new Random()); } @@ -188,6 +192,7 @@ public KafkaRaftClient( int requestTimeoutMs, int fetchMaxWaitMs, int appendLingerMs, + int gracefulShutdownTimeoutMs, LogContext logContext, Random random ) { @@ -202,6 +207,7 @@ public KafkaRaftClient( this.electionBackoffMaxMs = electionBackoffMaxMs; this.fetchMaxWaitMs = fetchMaxWaitMs; this.appendLingerMs = appendLingerMs; + this.gracefulShutdownTimeoutMs = gracefulShutdownTimeoutMs; this.logger = logContext.logger(KafkaRaftClient.class); this.random = random; this.requestManager = new RequestManager(voterAddresses.keySet(), retryBackoffMs, requestTimeoutMs, random); @@ -434,8 +440,8 @@ private void transitionToUnattached(int epoch) throws IOException { resetConnections(); } - private void transitionToResigned(List preferredSuccessors) { - quorum.transitionToResigned(preferredSuccessors); + private void transitionToResigned() { + quorum.transitionToResigned(quorum.leaderStateOrThrow().nonLeaderVotersByDescendingFetchOffset()); resetConnections(); } @@ -1484,6 +1490,8 @@ private void appendBatch( maybeFireHandleCommit(batch.baseOffset, epoch, batch.records); } }); + } catch (Exception e) { + throw new LogWriteFailureException("Failed to append to the leader", e, true); } finally { batch.release(); } @@ -1544,7 +1552,7 @@ private long pollLeader(long currentTimeMs) { GracefulShutdown shutdown = this.shutdown.get(); if (shutdown != null) { - transitionToResigned(state.nonLeaderVotersByDescendingFetchOffset()); + transitionToResigned(); return 0L; } @@ -1771,24 +1779,33 @@ private boolean maybeCompleteShutdown(long currentTimeMs) { } public void poll() throws IOException { - pollListeners(); - - long currentTimeMs = time.milliseconds(); - if (maybeCompleteShutdown(currentTimeMs)) { - return; - } + try { + pollListeners(); - long pollTimeoutMs = pollCurrentState(currentTimeMs); - kafkaRaftMetrics.updatePollStart(currentTimeMs); + long currentTimeMs = time.milliseconds(); + if (maybeCompleteShutdown(currentTimeMs)) { + return; + } - List inboundMessages = channel.receive(pollTimeoutMs); + long pollTimeoutMs = pollCurrentState(currentTimeMs); + kafkaRaftMetrics.updatePollStart(currentTimeMs); - currentTimeMs = time.milliseconds(); - kafkaRaftMetrics.updatePollEnd(currentTimeMs); + List inboundMessages = channel.receive(pollTimeoutMs); - for (RaftMessage message : inboundMessages) { - handleInboundMessage(message, currentTimeMs); currentTimeMs = time.milliseconds(); + kafkaRaftMetrics.updatePollEnd(currentTimeMs); + + for (RaftMessage message : inboundMessages) { + handleInboundMessage(message, currentTimeMs); + currentTimeMs = time.milliseconds(); + } + } catch (LogWriteFailureException e) { + logger.error("Failed to append records to the log, will resign the leadership now", e); + transitionToResigned(); + if (e.shutdownNeeded() && shutdown.get() == null) { + logger.error("Transit to graceful shutdown state due to fatal error", e.getCause()); + shutdown(gracefulShutdownTimeoutMs); + } } }