Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collection;

/**
Expand Down Expand Up @@ -121,7 +120,7 @@ public interface ConsumerRebalanceListener {
/**
* A callback method the user can implement to provide handling of offset commits to a customized store.
* This method will be called during a rebalance operation when the consumer has to give up some partitions.
* It can also be called when consumer is being closed ({@link KafkaConsumer#close(Duration)})
* It can also be called when consumer is being closed ({@link KafkaConsumer#close(CloseOptions option)})
* or is unsubscribing ({@link KafkaConsumer#unsubscribe()}).
* It is recommended that offsets should be committed in this callback to either Kafka or a
* custom offset store to prevent duplicate data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
// the request is now relevant. Partitions will be processed in the order
// they appear in the request.
//
// Version 4 adds IsolationLevel. Starting in version 4, the reqestor must be
// Version 4 adds IsolationLevel. Starting in version 4, the requestor must be
// able to handle Kafka log message format version 2.
//
// Version 5 adds LogStartOffset to indicate the earliest available offset of
Expand Down
4 changes: 2 additions & 2 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3145,7 +3145,7 @@ private long pollLeader(long currentTimeMs) {
return 0L;
}

long timeUtilVoterChangeExpires = state.maybeExpirePendingOperation(currentTimeMs);
long timeUntilVoterChangeExpires = state.maybeExpirePendingOperation(currentTimeMs);

long timeUntilFlush = maybeAppendBatches(
state,
Expand All @@ -3163,7 +3163,7 @@ private long pollLeader(long currentTimeMs) {
timeUntilNextBeginQuorumSend,
Math.min(
timeUntilCheckQuorumExpires,
timeUtilVoterChangeExpires
timeUntilVoterChangeExpires
)
)
);
Expand Down