Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@
import java.util.stream.Collectors;

public class FetchRequest extends AbstractRequest {

public static final int CONSUMER_REPLICA_ID = -1;

// default values for older versions where a request level limit did not exist
public static final int DEFAULT_RESPONSE_MAX_BYTES = Integer.MAX_VALUE;
public static final long INVALID_LOG_START_OFFSET = -1L;

public static final int ORDINARY_CONSUMER_ID = -1;
public static final int DEBUGGING_CONSUMER_ID = -2;
public static final int FUTURE_LOCAL_REPLICA_ID = -3;

private final FetchRequestData data;
private volatile LinkedHashMap<TopicIdPartition, PartitionData> fetchData = null;
private volatile List<TopicIdPartition> toForget = null;
Expand Down Expand Up @@ -429,6 +432,29 @@ public static FetchRequest parse(ByteBuffer buffer, short version) {
return new FetchRequest(new FetchRequestData(new ByteBufferAccessor(buffer), version), version);
}

// Broker ids are non-negative int.
public static boolean isValidBrokerId(int brokerId) {
return brokerId >= 0;
}

public static boolean isConsumer(int replicaId) {
return replicaId < 0 && replicaId != FUTURE_LOCAL_REPLICA_ID;
}

public static String describeReplicaId(int replicaId) {
switch (replicaId) {
case ORDINARY_CONSUMER_ID: return "consumer";
case DEBUGGING_CONSUMER_ID: return "debug consumer";
case FUTURE_LOCAL_REPLICA_ID: return "future local replica";
default: {
if (isValidBrokerId(replicaId))
return "replica [" + replicaId + "]";
else
return "invalid replica [" + replicaId + "]";
}
}
}

@Override
public FetchRequestData data() {
return data;
Expand Down
41 changes: 0 additions & 41 deletions core/src/main/scala/kafka/api/Request.scala

This file was deleted.

6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.internals.{AppendOrigin, LogOffsetMetadata}
import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata}

import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -1189,7 +1189,7 @@ class Partition(val topicPartition: TopicPartition,
* @param minOneMessage whether to ensure that at least one complete message is returned
* @param updateFetchState true if the Fetch should update replica state (only applies to follower fetches)
* @return [[LogReadInfo]] containing the fetched records or the diverging epoch if present
* @throws NotLeaderOrFollowerException if this node is not the current leader and [[FetchParams.fetchOnlyLeader]]
* @throws NotLeaderOrFollowerException if this node is not the current leader and `FetchParams.fetchOnlyLeader`
* is enabled, or if this is a follower fetch with an older request version
* and the replicaId is not recognized among the current valid replicas
* @throws FencedLeaderEpochException if the leader epoch in the `Fetch` request is lower than the current
Expand All @@ -1198,7 +1198,7 @@ class Partition(val topicPartition: TopicPartition,
* leader epoch, or if this is a follower fetch and the replicaId is not
* recognized among the current valid replicas
* @throws OffsetOutOfRangeException if the fetch offset is smaller than the log start offset or larger than
* the log end offset (or high watermark depending on [[FetchParams.isolation]]),
* the log end offset (or high watermark depending on `FetchParams.isolation`),
* or if the end offset for the last fetched epoch in [[FetchRequest.PartitionData]]
* cannot be determined from the local epoch cache (e.g. if it is larger than
* any cached epoch value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.yammer.metrics.core.Gauge
import kafka.common.OffsetAndMetadata
import kafka.internals.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{FetchLogEnd, ReplicaManager, RequestLocal}
import kafka.server.{ReplicaManager, RequestLocal}
import kafka.utils.CoreUtils.inLock
import kafka.utils.Implicits._
import kafka.utils._
Expand All @@ -46,7 +46,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicPartition}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0}
import org.apache.kafka.server.log.internals.AppendOrigin
import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation}
import org.apache.kafka.server.util.KafkaScheduler

import scala.collection._
Expand Down Expand Up @@ -599,7 +599,7 @@ class GroupMetadataManager(brokerId: Int,
while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get()) {
val fetchDataInfo = log.read(currOffset,
maxLength = config.loadBufferSize,
isolation = FetchLogEnd,
isolation = FetchIsolation.LOG_END,
minOneMessage = true)

readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.{Defaults, FetchLogEnd, ReplicaManager, RequestLocal}
import kafka.server.{Defaults, ReplicaManager, RequestLocal}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils.{Logging, Pool}
import kafka.utils.Implicits._
Expand All @@ -36,7 +36,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.server.log.internals.AppendOrigin
import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler

Expand Down Expand Up @@ -438,7 +438,7 @@ class TransactionStateManager(brokerId: Int,
idAndEpoch.txnPartitionId == topicPartition.partition && idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) {
val fetchDataInfo = log.read(currOffset,
maxLength = config.transactionLogLoadBufferSize,
isolation = FetchLogEnd,
isolation = FetchIsolation.LOG_END,
minOneMessage = true)

readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
Expand Down
26 changes: 13 additions & 13 deletions core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ import java.text.NumberFormat
import java.util.concurrent.atomic.AtomicLong
import java.util.regex.Pattern
import kafka.metrics.KafkaMetricsGroup

import kafka.server.FetchDataInfo
import kafka.utils.Logging
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException}
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.LogFileUtils.offsetFromFileName
import org.apache.kafka.server.log.internals.{AbortedTxn, LogConfig, LogDirFailureChannel, LogOffsetMetadata, OffsetPosition}
import org.apache.kafka.server.log.internals.{AbortedTxn, FetchDataInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, OffsetPosition}
import org.apache.kafka.server.util.Scheduler

import java.util.{Collections, Optional}
import scala.jdk.CollectionConverters._
import scala.collection.{Seq, immutable}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
Expand Down Expand Up @@ -429,7 +428,7 @@ class LocalLog(@volatile private var _dir: File,
// okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
// this can happen when all messages with offset larger than start offsets have been deleted.
// In this case, we will return the empty set with log end offset metadata
FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
}
}
}
Expand All @@ -454,10 +453,10 @@ class LocalLog(@volatile private var _dir: File,
def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)
collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator)

FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata,
records = fetchInfo.records,
firstEntryIncomplete = fetchInfo.firstEntryIncomplete,
abortedTransactions = Some(abortedTransactions.toList))
new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
fetchInfo.records,
fetchInfo.firstEntryIncomplete,
Optional.of(abortedTransactions.toList.asJava))
}

private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long,
Expand Down Expand Up @@ -1007,12 +1006,13 @@ object LocalLog extends Logging {

private[log] def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata,
includeAbortedTxns: Boolean): FetchDataInfo = {
val abortedTransactions =
if (includeAbortedTxns) Some(List.empty[FetchResponseData.AbortedTransaction])
else None
FetchDataInfo(fetchOffsetMetadata,
val abortedTransactions: Optional[java.util.List[FetchResponseData.AbortedTransaction]] =
if (includeAbortedTxns) Optional.of(Collections.emptyList())
else Optional.empty()
new FetchDataInfo(fetchOffsetMetadata,
MemoryRecords.EMPTY,
abortedTransactions = abortedTransactions)
false,
abortedTransactions)
}

private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ import java.util.concurrent.TimeUnit
import kafka.common.LogSegmentOffsetOverflowException
import kafka.metrics.KafkaMetricsGroup
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.FetchDataInfo
import kafka.utils._
import org.apache.kafka.common.InvalidRecordException
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult, FetchDataInfo}

import java.util.Optional
import scala.compat.java8.OptionConverters._
Expand Down Expand Up @@ -314,13 +313,13 @@ class LogSegment private[log] (val log: FileRecords,

// return a log segment but with zero size in the case below
if (adjustedMaxSize == 0)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)

// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)

FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
new FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
adjustedMaxSize < startOffsetAndSize.size, Optional.empty())
}

def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Optional[Long] =
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import kafka.log.remote.RemoteLogManager
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
Expand All @@ -42,7 +42,7 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator, ProducerAppendInfo}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, FetchIsolation, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator, ProducerAppendInfo}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
Expand Down Expand Up @@ -1254,11 +1254,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
minOneMessage: Boolean): FetchDataInfo = {
checkLogStartOffset(startOffset)
val maxOffsetMetadata = isolation match {
case FetchLogEnd => localLog.logEndOffsetMetadata
case FetchHighWatermark => fetchHighWatermarkMetadata
case FetchTxnCommitted => fetchLastStableOffsetMetadata
case FetchIsolation.LOG_END => localLog.logEndOffsetMetadata
case FetchIsolation.HIGH_WATERMARK => fetchHighWatermarkMetadata
case FetchIsolation.TXN_COMMITTED => fetchLastStableOffsetMetadata
}
localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchTxnCommitted)
localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchIsolation.TXN_COMMITTED)
}

private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ package kafka.raft

import kafka.log.{LogOffsetSnapshot, ProducerStateManagerConfig, SnapshotGenerated, UnifiedLog}
import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMinBytesProp}
import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, RequestLocal}
import kafka.server.{BrokerTopicStats, KafkaConfig, RequestLocal}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.config.{AbstractConfig, TopicConfig}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, Records}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.server.log.internals.{AppendOrigin, LogConfig, LogDirFailureChannel}
import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel}
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}

Expand All @@ -52,8 +52,8 @@ final class KafkaMetadataLog private (

override def read(startOffset: Long, readIsolation: Isolation): LogFetchInfo = {
val isolation = readIsolation match {
case Isolation.COMMITTED => FetchHighWatermark
case Isolation.UNCOMMITTED => FetchLogEnd
case Isolation.COMMITTED => FetchIsolation.HIGH_WATERMARK
case Isolation.UNCOMMITTED => FetchIsolation.LOG_END
case _ => throw new IllegalArgumentException(s"Unhandled read isolation $readIsolation")
}

Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
package kafka.server

import java.util.concurrent.TimeUnit

import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.server.log.internals.LogOffsetMetadata
import org.apache.kafka.server.log.internals.{FetchIsolation, FetchParams, FetchPartitionData, LogOffsetMetadata}

import scala.collection._

Expand Down Expand Up @@ -81,9 +80,9 @@ class DelayedFetch(
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, params.fetchOnlyLeader)

val endOffset = params.isolation match {
case FetchLogEnd => offsetSnapshot.logEndOffset
case FetchHighWatermark => offsetSnapshot.highWatermark
case FetchTxnCommitted => offsetSnapshot.lastStableOffset
case FetchIsolation.LOG_END => offsetSnapshot.logEndOffset
case FetchIsolation.HIGH_WATERMARK => offsetSnapshot.highWatermark
case FetchIsolation.TXN_COMMITTED => offsetSnapshot.lastStableOffset
}

// Go directly to the check for Case G if the message offsets are the same. If the log segment
Expand Down
Loading