Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft.internals;

import org.apache.kafka.common.KafkaException;

/**
* Representing certain types of fatal exceptions thrown from
* {@link org.apache.kafka.raft} module. The Raft client will catch
* this exception and wrap it with a controlled shutdown to make the state transition smooth.
* The general handling logic is like:
*
* 1.
*
*
*/
public class LogWriteFailureException extends KafkaException {

private final boolean shutdownNeeded;

public LogWriteFailureException(String message, boolean shutdownNeeded) {
super(message);
this.shutdownNeeded = shutdownNeeded;
}

public LogWriteFailureException(String message, Throwable cause, boolean shutdownNeeded) {
super(message, cause);
this.shutdownNeeded = shutdownNeeded;
}

public boolean shutdownNeeded() {
return shutdownNeeded;
}
}
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class TestRaftServer(
metadataLog,
quorumState,
time,
gracefulShutdownTimeoutMs,
expirationService,
logContext
)
Expand Down Expand Up @@ -420,7 +421,7 @@ class TestRaftServer(

override def initiateShutdown(): Boolean = {
if (super.initiateShutdown()) {
client.shutdown(5000).whenComplete { (_, exception) =>
client.shutdown(gracefulShutdownTimeoutMs).whenComplete { (_, exception) =>
if (exception != null) {
error("Graceful shutdown of RaftClient failed", exception)
} else {
Expand All @@ -442,6 +443,8 @@ class TestRaftServer(

object TestRaftServer extends Logging {

private val gracefulShutdownTimeoutMs = 5000

case class PendingAppend(
offset: Long,
appendTimeMs: Long
Expand Down
49 changes: 33 additions & 16 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.kafka.raft.internals.CloseListener;
import org.apache.kafka.raft.internals.FuturePurgatory;
import org.apache.kafka.raft.internals.KafkaRaftMetrics;
import org.apache.kafka.raft.internals.LogWriteFailureException;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.raft.internals.RecordsBatchReader;
import org.apache.kafka.raft.internals.ThresholdPurgatory;
Expand Down Expand Up @@ -130,6 +131,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private final int electionBackoffMaxMs;
private final int fetchMaxWaitMs;
private final int appendLingerMs;
private final int gracefulShutdownTimeoutMs;
private final KafkaRaftMetrics kafkaRaftMetrics;
private final NetworkChannel channel;
private final ReplicatedLog log;
Expand All @@ -152,6 +154,7 @@ public KafkaRaftClient(
ReplicatedLog log,
QuorumState quorum,
Time time,
int gracefulShutdownTimeoutMs,
ExpirationService expirationService,
LogContext logContext
) {
Expand All @@ -169,6 +172,7 @@ public KafkaRaftClient(
raftConfig.requestTimeoutMs(),
1000,
raftConfig.appendLingerMs(),
gracefulShutdownTimeoutMs,
logContext,
new Random());
}
Expand All @@ -188,6 +192,7 @@ public KafkaRaftClient(
int requestTimeoutMs,
int fetchMaxWaitMs,
int appendLingerMs,
int gracefulShutdownTimeoutMs,
LogContext logContext,
Random random
) {
Expand All @@ -202,6 +207,7 @@ public KafkaRaftClient(
this.electionBackoffMaxMs = electionBackoffMaxMs;
this.fetchMaxWaitMs = fetchMaxWaitMs;
this.appendLingerMs = appendLingerMs;
this.gracefulShutdownTimeoutMs = gracefulShutdownTimeoutMs;
this.logger = logContext.logger(KafkaRaftClient.class);
this.random = random;
this.requestManager = new RequestManager(voterAddresses.keySet(), retryBackoffMs, requestTimeoutMs, random);
Expand Down Expand Up @@ -434,8 +440,8 @@ private void transitionToUnattached(int epoch) throws IOException {
resetConnections();
}

private void transitionToResigned(List<Integer> preferredSuccessors) {
quorum.transitionToResigned(preferredSuccessors);
private void transitionToResigned() {
quorum.transitionToResigned(quorum.leaderStateOrThrow().nonLeaderVotersByDescendingFetchOffset());
resetConnections();
}

Expand Down Expand Up @@ -1484,6 +1490,8 @@ private void appendBatch(
maybeFireHandleCommit(batch.baseOffset, epoch, batch.records);
}
});
} catch (Exception e) {
throw new LogWriteFailureException("Failed to append to the leader", e, true);
} finally {
batch.release();
}
Expand Down Expand Up @@ -1544,7 +1552,7 @@ private long pollLeader(long currentTimeMs) {

GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) {
transitionToResigned(state.nonLeaderVotersByDescendingFetchOffset());
transitionToResigned();
return 0L;
}

Expand Down Expand Up @@ -1771,24 +1779,33 @@ private boolean maybeCompleteShutdown(long currentTimeMs) {
}

public void poll() throws IOException {
pollListeners();

long currentTimeMs = time.milliseconds();
if (maybeCompleteShutdown(currentTimeMs)) {
return;
}
try {
pollListeners();

long pollTimeoutMs = pollCurrentState(currentTimeMs);
kafkaRaftMetrics.updatePollStart(currentTimeMs);
long currentTimeMs = time.milliseconds();
if (maybeCompleteShutdown(currentTimeMs)) {
return;
}

List<RaftMessage> inboundMessages = channel.receive(pollTimeoutMs);
long pollTimeoutMs = pollCurrentState(currentTimeMs);
kafkaRaftMetrics.updatePollStart(currentTimeMs);

currentTimeMs = time.milliseconds();
kafkaRaftMetrics.updatePollEnd(currentTimeMs);
List<RaftMessage> inboundMessages = channel.receive(pollTimeoutMs);

for (RaftMessage message : inboundMessages) {
handleInboundMessage(message, currentTimeMs);
currentTimeMs = time.milliseconds();
kafkaRaftMetrics.updatePollEnd(currentTimeMs);

for (RaftMessage message : inboundMessages) {
handleInboundMessage(message, currentTimeMs);
currentTimeMs = time.milliseconds();
}
} catch (LogWriteFailureException e) {
logger.error("Failed to append records to the log, will resign the leadership now", e);
transitionToResigned();
if (e.shutdownNeeded() && shutdown.get() == null) {
logger.error("Transit to graceful shutdown state due to fatal error", e.getCause());
shutdown(gracefulShutdownTimeoutMs);
}
}
}

Expand Down