diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 09242bfc4bf0c..2510f1e607c28 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -40,13 +40,16 @@ import java.util.stream.Collectors; public class FetchRequest extends AbstractRequest { - public static final int CONSUMER_REPLICA_ID = -1; // default values for older versions where a request level limit did not exist public static final int DEFAULT_RESPONSE_MAX_BYTES = Integer.MAX_VALUE; public static final long INVALID_LOG_START_OFFSET = -1L; + public static final int ORDINARY_CONSUMER_ID = -1; + public static final int DEBUGGING_CONSUMER_ID = -2; + public static final int FUTURE_LOCAL_REPLICA_ID = -3; + private final FetchRequestData data; private volatile LinkedHashMap fetchData = null; private volatile List toForget = null; @@ -429,6 +432,29 @@ public static FetchRequest parse(ByteBuffer buffer, short version) { return new FetchRequest(new FetchRequestData(new ByteBufferAccessor(buffer), version), version); } + // Broker ids are non-negative int. + public static boolean isValidBrokerId(int brokerId) { + return brokerId >= 0; + } + + public static boolean isConsumer(int replicaId) { + return replicaId < 0 && replicaId != FUTURE_LOCAL_REPLICA_ID; + } + + public static String describeReplicaId(int replicaId) { + switch (replicaId) { + case ORDINARY_CONSUMER_ID: return "consumer"; + case DEBUGGING_CONSUMER_ID: return "debug consumer"; + case FUTURE_LOCAL_REPLICA_ID: return "future local replica"; + default: { + if (isValidBrokerId(replicaId)) + return "replica [" + replicaId + "]"; + else + return "invalid replica [" + replicaId + "]"; + } + } + } + @Override public FetchRequestData data() { return data; diff --git a/core/src/main/scala/kafka/api/Request.scala b/core/src/main/scala/kafka/api/Request.scala deleted file mode 100644 index 6c405a45b03ed..0000000000000 --- a/core/src/main/scala/kafka/api/Request.scala +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -object Request { - val OrdinaryConsumerId: Int = -1 - val DebuggingConsumerId: Int = -2 - val FutureLocalReplicaId: Int = -3 - - // Broker ids are non-negative int. - def isValidBrokerId(brokerId: Int): Boolean = brokerId >= 0 - - def isConsumer(replicaId: Int): Boolean = { - replicaId < 0 && replicaId != FutureLocalReplicaId - } - - def describeReplicaId(replicaId: Int): String = { - replicaId match { - case OrdinaryConsumerId => "consumer" - case DebuggingConsumerId => "debug consumer" - case FutureLocalReplicaId => "future local replica" - case id if isValidBrokerId(id) => s"replica [$id]" - case id => s"invalid replica [$id]" - } - } -} diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index fe8792189baef..271c86ad758ab 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -44,7 +44,7 @@ import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid} import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.server.log.internals.{AppendOrigin, LogOffsetMetadata} +import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata} import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ @@ -1189,7 +1189,7 @@ class Partition(val topicPartition: TopicPartition, * @param minOneMessage whether to ensure that at least one complete message is returned * @param updateFetchState true if the Fetch should update replica state (only applies to follower fetches) * @return [[LogReadInfo]] containing the fetched records or the diverging epoch if present - * @throws NotLeaderOrFollowerException if this node is not the current leader and [[FetchParams.fetchOnlyLeader]] + * @throws NotLeaderOrFollowerException if this node is not the current leader and `FetchParams.fetchOnlyLeader` * is enabled, or if this is a follower fetch with an older request version * and the replicaId is not recognized among the current valid replicas * @throws FencedLeaderEpochException if the leader epoch in the `Fetch` request is lower than the current @@ -1198,7 +1198,7 @@ class Partition(val topicPartition: TopicPartition, * leader epoch, or if this is a follower fetch and the replicaId is not * recognized among the current valid replicas * @throws OffsetOutOfRangeException if the fetch offset is smaller than the log start offset or larger than - * the log end offset (or high watermark depending on [[FetchParams.isolation]]), + * the log end offset (or high watermark depending on `FetchParams.isolation`), * or if the end offset for the last fetched epoch in [[FetchRequest.PartitionData]] * cannot be determined from the local epoch cache (e.g. if it is larger than * any cached epoch value) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index b20139a111f44..53820b004e86b 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -28,7 +28,7 @@ import com.yammer.metrics.core.Gauge import kafka.common.OffsetAndMetadata import kafka.internals.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData} import kafka.metrics.KafkaMetricsGroup -import kafka.server.{FetchLogEnd, ReplicaManager, RequestLocal} +import kafka.server.{ReplicaManager, RequestLocal} import kafka.utils.CoreUtils.inLock import kafka.utils.Implicits._ import kafka.utils._ @@ -46,7 +46,7 @@ import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicPartition} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0} -import org.apache.kafka.server.log.internals.AppendOrigin +import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation} import org.apache.kafka.server.util.KafkaScheduler import scala.collection._ @@ -599,7 +599,7 @@ class GroupMetadataManager(brokerId: Int, while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get()) { val fetchDataInfo = log.read(currOffset, maxLength = config.loadBufferSize, - isolation = FetchLogEnd, + isolation = FetchIsolation.LOG_END, minOneMessage = true) readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index ae65f8f0d01bb..e2388e9c8854c 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -21,7 +21,7 @@ import java.util.Properties import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantReadWriteLock -import kafka.server.{Defaults, FetchLogEnd, ReplicaManager, RequestLocal} +import kafka.server.{Defaults, ReplicaManager, RequestLocal} import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils.{Logging, Pool} import kafka.utils.Implicits._ @@ -36,7 +36,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.server.log.internals.AppendOrigin +import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation} import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.util.Scheduler @@ -438,7 +438,7 @@ class TransactionStateManager(brokerId: Int, idAndEpoch.txnPartitionId == topicPartition.partition && idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) { val fetchDataInfo = log.read(currOffset, maxLength = config.transactionLogLoadBufferSize, - isolation = FetchLogEnd, + isolation = FetchIsolation.LOG_END, minOneMessage = true) readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index a2f5fbae8afc1..1214d64afc162 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -23,8 +23,6 @@ import java.text.NumberFormat import java.util.concurrent.atomic.AtomicLong import java.util.regex.Pattern import kafka.metrics.KafkaMetricsGroup - -import kafka.server.FetchDataInfo import kafka.utils.Logging import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException} @@ -32,9 +30,10 @@ import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.server.log.internals.LogFileUtils.offsetFromFileName -import org.apache.kafka.server.log.internals.{AbortedTxn, LogConfig, LogDirFailureChannel, LogOffsetMetadata, OffsetPosition} +import org.apache.kafka.server.log.internals.{AbortedTxn, FetchDataInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, OffsetPosition} import org.apache.kafka.server.util.Scheduler +import java.util.{Collections, Optional} import scala.jdk.CollectionConverters._ import scala.collection.{Seq, immutable} import scala.collection.mutable.{ArrayBuffer, ListBuffer} @@ -429,7 +428,7 @@ class LocalLog(@volatile private var _dir: File, // okay we are beyond the end of the last segment with no data fetched although the start offset is in range, // this can happen when all messages with offset larger than start offsets have been deleted. // In this case, we will return the empty set with log end offset metadata - FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) + new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) } } } @@ -454,10 +453,10 @@ class LocalLog(@volatile private var _dir: File, def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction) collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator) - FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata, - records = fetchInfo.records, - firstEntryIncomplete = fetchInfo.firstEntryIncomplete, - abortedTransactions = Some(abortedTransactions.toList)) + new FetchDataInfo(fetchInfo.fetchOffsetMetadata, + fetchInfo.records, + fetchInfo.firstEntryIncomplete, + Optional.of(abortedTransactions.toList.asJava)) } private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long, @@ -1007,12 +1006,13 @@ object LocalLog extends Logging { private[log] def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { - val abortedTransactions = - if (includeAbortedTxns) Some(List.empty[FetchResponseData.AbortedTransaction]) - else None - FetchDataInfo(fetchOffsetMetadata, + val abortedTransactions: Optional[java.util.List[FetchResponseData.AbortedTransaction]] = + if (includeAbortedTxns) Optional.of(Collections.emptyList()) + else Optional.empty() + new FetchDataInfo(fetchOffsetMetadata, MemoryRecords.EMPTY, - abortedTransactions = abortedTransactions) + false, + abortedTransactions) } private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = { diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 53b51cb16ac63..54a04f8aa7e00 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -25,14 +25,13 @@ import java.util.concurrent.TimeUnit import kafka.common.LogSegmentOffsetOverflowException import kafka.metrics.KafkaMetricsGroup import kafka.server.epoch.LeaderEpochFileCache -import kafka.server.FetchDataInfo import kafka.utils._ import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} -import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult} +import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult, FetchDataInfo} import java.util.Optional import scala.compat.java8.OptionConverters._ @@ -314,13 +313,13 @@ class LogSegment private[log] (val log: FileRecords, // return a log segment but with zero size in the case below if (adjustedMaxSize == 0) - return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY) + return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY) // calculate the length of the message set to read based on whether or not they gave us a maxOffset val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize) - FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize), - firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size) + new FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize), + adjustedMaxSize < startOffsetAndSize.size, Optional.empty()) } def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Optional[Long] = diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 6debe37ee15d6..78c01a8c21878 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -28,7 +28,7 @@ import kafka.log.remote.RemoteLogManager import kafka.metrics.KafkaMetricsGroup import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.LeaderEpochFileCache -import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, OffsetAndEpoch, PartitionMetadataFile, RequestLocal} +import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, OffsetAndEpoch, PartitionMetadataFile, RequestLocal} import kafka.utils._ import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic @@ -42,7 +42,7 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0 -import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator, ProducerAppendInfo} +import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, FetchIsolation, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator, ProducerAppendInfo} import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.util.Scheduler @@ -1254,11 +1254,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, minOneMessage: Boolean): FetchDataInfo = { checkLogStartOffset(startOffset) val maxOffsetMetadata = isolation match { - case FetchLogEnd => localLog.logEndOffsetMetadata - case FetchHighWatermark => fetchHighWatermarkMetadata - case FetchTxnCommitted => fetchLastStableOffsetMetadata + case FetchIsolation.LOG_END => localLog.logEndOffsetMetadata + case FetchIsolation.HIGH_WATERMARK => fetchHighWatermarkMetadata + case FetchIsolation.TXN_COMMITTED => fetchLastStableOffsetMetadata } - localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchTxnCommitted) + localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchIsolation.TXN_COMMITTED) } private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index 6fcbc9f769eba..6c58755bba4cf 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -18,7 +18,7 @@ package kafka.raft import kafka.log.{LogOffsetSnapshot, ProducerStateManagerConfig, SnapshotGenerated, UnifiedLog} import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMinBytesProp} -import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, RequestLocal} +import kafka.server.{BrokerTopicStats, KafkaConfig, RequestLocal} import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.config.{AbstractConfig, TopicConfig} import org.apache.kafka.common.errors.InvalidConfigurationException @@ -26,7 +26,7 @@ import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, Record import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch} -import org.apache.kafka.server.log.internals.{AppendOrigin, LogConfig, LogDirFailureChannel} +import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel} import org.apache.kafka.server.util.Scheduler import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} @@ -52,8 +52,8 @@ final class KafkaMetadataLog private ( override def read(startOffset: Long, readIsolation: Isolation): LogFetchInfo = { val isolation = readIsolation match { - case Isolation.COMMITTED => FetchHighWatermark - case Isolation.UNCOMMITTED => FetchLogEnd + case Isolation.COMMITTED => FetchIsolation.HIGH_WATERMARK + case Isolation.UNCOMMITTED => FetchIsolation.LOG_END case _ => throw new IllegalArgumentException(s"Unhandled read isolation $readIsolation") } diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index b61bd8bc8a018..2a89881ef6ea1 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -18,14 +18,13 @@ package kafka.server import java.util.concurrent.TimeUnit - import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.common.TopicIdPartition import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} -import org.apache.kafka.server.log.internals.LogOffsetMetadata +import org.apache.kafka.server.log.internals.{FetchIsolation, FetchParams, FetchPartitionData, LogOffsetMetadata} import scala.collection._ @@ -81,9 +80,9 @@ class DelayedFetch( val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, params.fetchOnlyLeader) val endOffset = params.isolation match { - case FetchLogEnd => offsetSnapshot.logEndOffset - case FetchHighWatermark => offsetSnapshot.highWatermark - case FetchTxnCommitted => offsetSnapshot.lastStableOffset + case FetchIsolation.LOG_END => offsetSnapshot.logEndOffset + case FetchIsolation.HIGH_WATERMARK => offsetSnapshot.highWatermark + case FetchIsolation.TXN_COMMITTED => offsetSnapshot.lastStableOffset } // Go directly to the check for Case G if the message offsets are the same. If the log segment diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala deleted file mode 100644 index f3b0c40ea3b48..0000000000000 --- a/core/src/main/scala/kafka/server/FetchDataInfo.scala +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.api.Request -import org.apache.kafka.common.IsolationLevel -import org.apache.kafka.common.message.FetchResponseData -import org.apache.kafka.common.record.{MemoryRecords, Records} -import org.apache.kafka.common.replica.ClientMetadata -import org.apache.kafka.common.requests.FetchRequest -import org.apache.kafka.server.log.internals.LogOffsetMetadata - -sealed trait FetchIsolation -case object FetchLogEnd extends FetchIsolation -case object FetchHighWatermark extends FetchIsolation -case object FetchTxnCommitted extends FetchIsolation - -object FetchIsolation { - def apply( - request: FetchRequest - ): FetchIsolation = { - apply(request.replicaId, request.isolationLevel) - } - - def apply( - replicaId: Int, - isolationLevel: IsolationLevel - ): FetchIsolation = { - if (!Request.isConsumer(replicaId)) - FetchLogEnd - else if (isolationLevel == IsolationLevel.READ_COMMITTED) - FetchTxnCommitted - else - FetchHighWatermark - } -} - -case class FetchParams( - requestVersion: Short, - replicaId: Int, - maxWaitMs: Long, - minBytes: Int, - maxBytes: Int, - isolation: FetchIsolation, - clientMetadata: Option[ClientMetadata] -) { - def isFromFollower: Boolean = Request.isValidBrokerId(replicaId) - def isFromConsumer: Boolean = Request.isConsumer(replicaId) - def fetchOnlyLeader: Boolean = isFromFollower || (isFromConsumer && clientMetadata.isEmpty) - def hardMaxBytesLimit: Boolean = requestVersion <= 2 - - override def toString: String = { - s"FetchParams(requestVersion=$requestVersion" + - s", replicaId=$replicaId" + - s", maxWaitMs=$maxWaitMs" + - s", minBytes=$minBytes" + - s", maxBytes=$maxBytes" + - s", isolation=$isolation" + - s", clientMetadata= $clientMetadata" + - ")" - } -} - -object FetchDataInfo { - def empty(fetchOffset: Long): FetchDataInfo = { - FetchDataInfo( - fetchOffsetMetadata = new LogOffsetMetadata(fetchOffset), - records = MemoryRecords.EMPTY, - firstEntryIncomplete = false, - abortedTransactions = None - ) - } -} - -case class FetchDataInfo( - fetchOffsetMetadata: LogOffsetMetadata, - records: Records, - firstEntryIncomplete: Boolean = false, - abortedTransactions: Option[List[FetchResponseData.AbortedTransaction]] = None -) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 42768fa98f9fb..5de1e8754dd61 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -69,7 +69,7 @@ import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0} -import org.apache.kafka.server.log.internals.AppendOrigin +import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData} import org.apache.kafka.server.record.BrokerCompressionType import java.lang.{Long => JLong} @@ -840,8 +840,8 @@ class KafkaApis(val requestChannel: RequestChannel, val partitions = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] val reassigningPartitions = mutable.Set[TopicIdPartition]() responsePartitionData.foreach { case (tp, data) => - val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull - val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) + val abortedTransactions = data.abortedTransactions.orElse(null) + val lastStableOffset: Long = data.lastStableOffset.orElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) if (data.isReassignmentFetch) reassigningPartitions.add(tp) val partitionData = new FetchResponseData.PartitionData() .setPartitionIndex(tp.partition) @@ -851,8 +851,8 @@ class KafkaApis(val requestChannel: RequestChannel, .setLogStartOffset(data.logStartOffset) .setAbortedTransactions(abortedTransactions) .setRecords(data.records) - .setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID)) - data.divergingEpoch.foreach(partitionData.setDivergingEpoch) + .setPreferredReadReplica(data.preferredReadReplica.orElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID)) + data.divergingEpoch.ifPresent(partitionData.setDivergingEpoch(_)) partitions.put(tp, partitionData) } erroneous.foreach { case (tp, data) => partitions.put(tp, data) } @@ -961,26 +961,26 @@ class KafkaApis(val requestChannel: RequestChannel, val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes) val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes) - val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) { + val clientMetadata: Optional[ClientMetadata] = if (versionId >= 11) { // Fetch API version 11 added preferred replica logic - Some(new DefaultClientMetadata( + Optional.of(new DefaultClientMetadata( fetchRequest.rackId, clientId, request.context.clientAddress, request.context.principal, request.context.listenerName.value)) } else { - None + Optional.empty() } - val params = FetchParams( - requestVersion = versionId, - replicaId = fetchRequest.replicaId, - maxWaitMs = fetchRequest.maxWait, - minBytes = fetchMinBytes, - maxBytes = fetchMaxBytes, - isolation = FetchIsolation(fetchRequest), - clientMetadata = clientMetadata + val params = new FetchParams( + versionId, + fetchRequest.replicaId, + fetchRequest.maxWait, + fetchMinBytes, + fetchMaxBytes, + FetchIsolation.of(fetchRequest), + clientMetadata ) // call the replica manager to fetch messages from the local replica diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala index 109fdd73847b4..6702e180f47b2 100644 --- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala +++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala @@ -17,7 +17,6 @@ package kafka.server -import kafka.api.Request import kafka.cluster.BrokerEndPoint import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} import kafka.server.QuotaFactory.UnboundedQuota @@ -29,6 +28,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.server.log.internals.{FetchIsolation, FetchParams, FetchPartitionData} import java.util import java.util.Optional @@ -75,8 +75,8 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint, def processResponseCallback(responsePartitionData: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { partitionData = responsePartitionData.map { case (tp, data) => - val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull - val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) + val abortedTransactions = data.abortedTransactions.orElse(null) + val lastStableOffset: Long = data.lastStableOffset.orElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) tp.topicPartition -> new FetchResponseData.PartitionData() .setPartitionIndex(tp.topicPartition.partition) .setErrorCode(data.error.code) @@ -90,14 +90,14 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint, val fetchData = request.fetchData(topicNames.asJava) - val fetchParams = FetchParams( - requestVersion = request.version, - maxWaitMs = 0L, // timeout is 0 so that the callback will be executed immediately - replicaId = Request.FutureLocalReplicaId, - minBytes = request.minBytes, - maxBytes = request.maxBytes, - isolation = FetchLogEnd, - clientMetadata = None + val fetchParams = new FetchParams( + request.version, + FetchRequest.FUTURE_LOCAL_REPLICA_ID, + 0L, // timeout is 0 so that the callback will be executed immediately + request.minBytes, + request.maxBytes, + FetchIsolation.LOG_END, + Optional.empty() ) replicaManager.fetchMessages( diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 062d90b7f79e2..574eaaf40621a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -17,7 +17,7 @@ package kafka.server import java.io.File -import java.util.Optional +import java.util.{Optional, OptionalInt, OptionalLong} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.Lock @@ -53,7 +53,7 @@ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record._ import org.apache.kafka.common.replica.PartitionView.DefaultPartitionView import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView -import org.apache.kafka.common.replica.{ClientMetadata, _} +import org.apache.kafka.common.replica._ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ @@ -61,7 +61,7 @@ import org.apache.kafka.common.utils.Time import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.server.common.MetadataVersion._ -import org.apache.kafka.server.log.internals.{AppendOrigin, LogConfig, LogDirFailureChannel, LogOffsetMetadata, RecordValidationException} +import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, RecordValidationException} import org.apache.kafka.server.util.Scheduler import java.nio.file.{Files, Paths} @@ -117,19 +117,19 @@ case class LogReadResult(info: FetchDataInfo, case Some(e) => Errors.forException(e) } - def toFetchPartitionData(isReassignmentFetch: Boolean): FetchPartitionData = FetchPartitionData( + def toFetchPartitionData(isReassignmentFetch: Boolean): FetchPartitionData = new FetchPartitionData( this.error, this.highWatermark, this.leaderLogStartOffset, this.info.records, - this.divergingEpoch, - this.lastStableOffset, + this.divergingEpoch.asJava, + if (this.lastStableOffset.isDefined) OptionalLong.of(this.lastStableOffset.get) else OptionalLong.empty(), this.info.abortedTransactions, - this.preferredReadReplica, + if (this.preferredReadReplica.isDefined) OptionalInt.of(this.preferredReadReplica.get) else OptionalInt.empty(), isReassignmentFetch) def withEmptyFetchInfo: LogReadResult = - copy(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY)) + copy(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY)) override def toString = { "LogReadResult(" + @@ -148,16 +148,6 @@ case class LogReadResult(info: FetchDataInfo, } -case class FetchPartitionData(error: Errors = Errors.NONE, - highWatermark: Long, - logStartOffset: Long, - records: Records, - divergingEpoch: Option[FetchResponseData.EpochEndOffset], - lastStableOffset: Option[Long], - abortedTransactions: Option[List[FetchResponseData.AbortedTransaction]], - preferredReadReplica: Option[Int], - isReassignmentFetch: Boolean) - /** * Trait to represent the state of hosted partitions. We create a concrete (active) Partition * instance when the broker receives a LeaderAndIsr request from the controller or a metadata @@ -1116,7 +1106,7 @@ class ReplicaManager(val config: KafkaConfig, throw new InconsistentTopicIdException("Topic ID in the fetch session did not match the topic ID in the log.") // If we are the leader, determine the preferred read-replica - val preferredReadReplica = params.clientMetadata.flatMap( + val preferredReadReplica = params.clientMetadata.asScala.flatMap( metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs)) if (preferredReadReplica.isDefined) { @@ -1126,7 +1116,7 @@ class ReplicaManager(val config: KafkaConfig, } // If a preferred read-replica is set, skip the read val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false) - LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), + LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), divergingEpoch = None, highWatermark = offsetSnapshot.highWatermark.messageOffset, leaderLogStartOffset = offsetSnapshot.logStartOffset, @@ -1149,11 +1139,11 @@ class ReplicaManager(val config: KafkaConfig, val fetchDataInfo = if (params.isFromFollower && shouldLeaderThrottle(quota, partition, params.replicaId)) { // If the partition is being throttled, simply return an empty set. - FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) + new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) } else if (!params.hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) { // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make // progress in such cases and don't need to report a `RecordTooLargeException` - FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) + new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) } else { readInfo.fetchedData } @@ -1181,7 +1171,7 @@ class ReplicaManager(val config: KafkaConfig, _: KafkaStorageException | _: OffsetOutOfRangeException | _: InconsistentTopicIdException) => - LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), + LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), divergingEpoch = None, highWatermark = UnifiedLog.UnknownOffset, leaderLogStartOffset = UnifiedLog.UnknownOffset, @@ -1194,11 +1184,11 @@ class ReplicaManager(val config: KafkaConfig, brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark() brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark() - val fetchSource = Request.describeReplicaId(params.replicaId) + val fetchSource = FetchRequest.describeReplicaId(params.replicaId) error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " + s"on partition $tp: $fetchInfo", e) - LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), + LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), divergingEpoch = None, highWatermark = UnifiedLog.UnknownOffset, leaderLogStartOffset = UnifiedLog.UnknownOffset, @@ -1238,7 +1228,7 @@ class ReplicaManager(val config: KafkaConfig, currentTimeMs: Long): Option[Int] = { partition.leaderIdIfLocal.flatMap { leaderReplicaId => // Don't look up preferred for follower fetches via normal replication - if (Request.isValidBrokerId(replicaId)) + if (FetchRequest.isValidBrokerId(replicaId)) None else { replicaSelectorOpt.flatMap { replicaSelector => diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index fb687e4f8e8c9..599384530b729 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -18,8 +18,7 @@ package kafka.tools import joptsimple.OptionParser -import kafka.api._ -import kafka.utils.{IncludeList, _} +import kafka.utils._ import org.apache.kafka.clients._ import org.apache.kafka.clients.admin.{Admin, ListTopicsOptions, TopicDescription} import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} @@ -28,10 +27,11 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{NetworkReceive, Selectable, Selector} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.AbstractRequest.Builder -import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetsRequest, FetchRequest => JFetchRequest} +import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, FetchResponse, ListOffsetsRequest} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{Node, TopicPartition, Uuid} + import java.net.SocketTimeoutException import java.text.SimpleDateFormat import java.util @@ -39,7 +39,6 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import java.util.regex.{Pattern, PatternSyntaxException} import java.util.{Date, Optional, Properties} - import scala.collection.Seq import scala.jdk.CollectionConverters._ @@ -396,7 +395,7 @@ private class ReplicaFetcher(name: String, sourceBroker: Node, topicPartitions: extends ShutdownableThread(name) { private val fetchEndpoint = new ReplicaFetcherBlockingSend(sourceBroker, new ConsumerConfig(consumerConfig), new Metrics(), Time.SYSTEM, fetcherId, - s"broker-${Request.DebuggingConsumerId}-fetcher-$fetcherId") + s"broker-${FetchRequest.DEBUGGING_CONSUMER_ID}-fetcher-$fetcherId") private val topicNames = topicIds.map(_.swap) @@ -405,13 +404,13 @@ private class ReplicaFetcher(name: String, sourceBroker: Node, topicPartitions: val fetcherBarrier = replicaBuffer.getFetcherBarrier() val verificationBarrier = replicaBuffer.getVerificationBarrier() - val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData] + val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] for (topicPartition <- topicPartitions) - requestMap.put(topicPartition, new JFetchRequest.PartitionData(topicIds.getOrElse(topicPartition.topic, Uuid.ZERO_UUID), replicaBuffer.getOffset(topicPartition), + requestMap.put(topicPartition, new FetchRequest.PartitionData(topicIds.getOrElse(topicPartition.topic, Uuid.ZERO_UUID), replicaBuffer.getOffset(topicPartition), 0L, fetchSize, Optional.empty())) - val fetchRequestBuilder = JFetchRequest.Builder. - forReplica(ApiKeys.FETCH.latestVersion, Request.DebuggingConsumerId, maxWait, minBytes, requestMap) + val fetchRequestBuilder = FetchRequest.Builder. + forReplica(ApiKeys.FETCH.latestVersion, FetchRequest.DEBUGGING_CONSUMER_ID, maxWait, minBytes, requestMap) debug("Issuing fetch request ") diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 1b1608dec45ae..3b4f893eb552c 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1339,7 +1339,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumer.poll(Duration.ofMillis(50L)) brokers.forall { broker => broker.metadataCache.getPartitionInfo(newTopic, 0) match { - case Some(partitionState) => Request.isValidBrokerId(partitionState.leader) + case Some(partitionState) => FetchRequest.isValidBrokerId(partitionState.leader) case _ => false } } diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index 9d9ad717df3a7..1e5e1e5b28462 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -17,7 +17,6 @@ package kafka.server import java.util.Optional - import scala.collection.Seq import kafka.cluster.Partition import kafka.log.LogOffsetSnapshot @@ -27,7 +26,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.FetchRequest -import org.apache.kafka.server.log.internals.LogOffsetMetadata +import org.apache.kafka.server.log.internals.{FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogOffsetMetadata} import org.junit.jupiter.api.Test import org.junit.jupiter.api.Assertions._ import org.mockito.ArgumentMatchers.{any, anyInt} @@ -172,14 +171,14 @@ class DelayedFetchTest { replicaId: Int, maxWaitMs: Int ): FetchParams = { - FetchParams( - requestVersion = ApiKeys.FETCH.latestVersion, - replicaId = replicaId, - maxWaitMs = maxWaitMs, - minBytes = 1, - maxBytes = maxBytes, - isolation = FetchLogEnd, - clientMetadata = None + new FetchParams( + ApiKeys.FETCH.latestVersion, + replicaId, + maxWaitMs, + 1, + maxBytes, + FetchIsolation.LOG_END, + Optional.empty() ) } @@ -200,7 +199,7 @@ class DelayedFetchTest { private def buildReadResult(error: Errors): LogReadResult = { LogReadResult( exception = if (error != Errors.NONE) Some(error.exception) else None, - info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), + info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), divergingEpoch = None, highWatermark = -1L, leaderLogStartOffset = -1L, diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 792b3550fd9c3..c0ca8f26777c0 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -20,13 +20,13 @@ package kafka import java.util.Properties import java.util.concurrent.atomic._ import kafka.log._ -import kafka.server.{BrokerTopicStats, FetchLogEnd} +import kafka.server.BrokerTopicStats import kafka.utils._ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record.FileRecords import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.log.internals.{LogConfig, LogDirFailureChannel} +import org.apache.kafka.server.log.internals.{FetchIsolation, LogConfig, LogDirFailureChannel} /** * A stress test that instantiates a log and then runs continual appends against it from one thread and continual reads against it @@ -135,7 +135,7 @@ object StressTestLog { try { log.read(currentOffset, maxLength = 1, - isolation = FetchLogEnd, + isolation = FetchIsolation.LOG_END, minOneMessage = true).records match { case read: FileRecords if read.sizeInBytes > 0 => { val first = read.batches.iterator.next() diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 667d083c448b0..942e665ef0626 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -36,7 +36,7 @@ import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.server.log.internals.{AppendOrigin, CleanerConfig, LogConfig, LogDirFailureChannel} +import org.apache.kafka.server.log.internals.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogConfig, LogDirFailureChannel} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.ArgumentMatchers @@ -387,14 +387,14 @@ class PartitionLockTest extends Logging { val maxBytes = 1 while (fetchOffset < numRecords) { - val fetchParams = FetchParams( - requestVersion = ApiKeys.FETCH.latestVersion, - replicaId = followerId, - maxWaitMs = 0, - minBytes = 1, - maxBytes = maxBytes, - isolation = FetchLogEnd, - clientMetadata = None + val fetchParams = new FetchParams( + ApiKeys.FETCH.latestVersion, + followerId, + 0L, + 1, + maxBytes, + FetchIsolation.LOG_END, + Optional.empty() ) val fetchPartitionData = new FetchRequest.PartitionData( diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index e37f9254a270c..55c59f8220720 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -54,7 +54,7 @@ import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 -import org.apache.kafka.server.log.internals.{AppendOrigin, CleanerConfig, LogDirFailureChannel} +import org.apache.kafka.server.log.internals.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogDirFailureChannel} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.KafkaScheduler import org.junit.jupiter.params.ParameterizedTest @@ -70,14 +70,14 @@ object PartitionTest { minBytes: Int = 1, maxBytes: Int = Int.MaxValue ): FetchParams = { - FetchParams( - requestVersion = ApiKeys.FETCH.latestVersion, - replicaId = replicaId, - maxWaitMs = maxWaitMs, - minBytes = minBytes, - maxBytes = maxBytes, - isolation = FetchLogEnd, - clientMetadata = None + new FetchParams( + ApiKeys.FETCH.latestVersion, + replicaId, + maxWaitMs, + minBytes, + maxBytes, + FetchIsolation.LOG_END, + Optional.empty() ) } @@ -86,16 +86,16 @@ object PartitionTest { minBytes: Int = 1, maxBytes: Int = Int.MaxValue, clientMetadata: Option[ClientMetadata] = None, - isolation: FetchIsolation = FetchHighWatermark + isolation: FetchIsolation = FetchIsolation.HIGH_WATERMARK ): FetchParams = { - FetchParams( - requestVersion = ApiKeys.FETCH.latestVersion, - replicaId = FetchRequest.CONSUMER_REPLICA_ID, - maxWaitMs = maxWaitMs, - minBytes = minBytes, - maxBytes = maxBytes, - isolation = isolation, - clientMetadata = clientMetadata + new FetchParams( + ApiKeys.FETCH.latestVersion, + FetchRequest.CONSUMER_REPLICA_ID, + maxWaitMs, + minBytes, + maxBytes, + isolation, + clientMetadata.asJava ) } } @@ -2900,7 +2900,7 @@ class PartitionTest extends AbstractPartitionTest { lastFetchedEpoch: Option[Int] = None, fetchTimeMs: Long = time.milliseconds(), topicId: Uuid = Uuid.ZERO_UUID, - isolation: FetchIsolation = FetchHighWatermark + isolation: FetchIsolation = FetchIsolation.HIGH_WATERMARK ): LogReadInfo = { val fetchParams = consumerFetchParams( maxBytes = maxBytes, diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 55674e8873f7d..d10c5b630fc84 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -27,7 +27,7 @@ import javax.management.ObjectName import kafka.cluster.Partition import kafka.common.OffsetAndMetadata import kafka.log.{LogAppendInfo, UnifiedLog} -import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal} +import kafka.server.{HostedPartition, KafkaConfig, ReplicaManager, RequestLocal} import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription @@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion._ -import org.apache.kafka.server.log.internals.{AppendOrigin, LogOffsetMetadata} +import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, FetchIsolation, LogOffsetMetadata} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.KafkaScheduler import org.junit.jupiter.api.Assertions._ @@ -808,7 +808,7 @@ class GroupMetadataManagerTest { verify(logMock).logStartOffset verify(logMock).read(ArgumentMatchers.eq(startOffset), maxLength = anyInt(), - isolation = ArgumentMatchers.eq(FetchLogEnd), + isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END), minOneMessage = ArgumentMatchers.eq(true)) verify(replicaManager).getLog(groupTopicPartition) verify(replicaManager, times(2)).getLogEndOffset(groupTopicPartition) @@ -889,14 +889,14 @@ class GroupMetadataManagerTest { .thenReturn(segment2End) when(logMock.read(ArgumentMatchers.eq(segment1End), maxLength = anyInt(), - isolation = ArgumentMatchers.eq(FetchLogEnd), + isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END), minOneMessage = ArgumentMatchers.eq(true))) - .thenReturn(FetchDataInfo(new LogOffsetMetadata(segment1End), fileRecordsMock)) + .thenReturn(new FetchDataInfo(new LogOffsetMetadata(segment1End), fileRecordsMock)) when(logMock.read(ArgumentMatchers.eq(segment2End), maxLength = anyInt(), - isolation = ArgumentMatchers.eq(FetchLogEnd), + isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END), minOneMessage = ArgumentMatchers.eq(true))) - .thenReturn(FetchDataInfo(new LogOffsetMetadata(segment2End), fileRecordsMock)) + .thenReturn(new FetchDataInfo(new LogOffsetMetadata(segment2End), fileRecordsMock)) when(fileRecordsMock.sizeInBytes()) .thenReturn(segment1Records.sizeInBytes) .thenReturn(segment2Records.sizeInBytes) @@ -2375,9 +2375,9 @@ class GroupMetadataManagerTest { when(logMock.logStartOffset).thenReturn(startOffset) when(logMock.read(ArgumentMatchers.eq(startOffset), maxLength = anyInt(), - isolation = ArgumentMatchers.eq(FetchLogEnd), + isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END), minOneMessage = ArgumentMatchers.eq(true))) - .thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), mockRecords)) + .thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset), mockRecords)) when(replicaManager.getLog(groupMetadataTopicPartition)).thenReturn(Some(logMock)) when(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).thenReturn(Some[Long](18)) groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L) @@ -2532,9 +2532,9 @@ class GroupMetadataManagerTest { when(logMock.logStartOffset).thenReturn(startOffset) when(logMock.read(ArgumentMatchers.eq(startOffset), maxLength = anyInt(), - isolation = ArgumentMatchers.eq(FetchLogEnd), + isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END), minOneMessage = ArgumentMatchers.eq(true))) - .thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock)) + .thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock)) when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index 24798bd3df75a..02852d6b94337 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -23,7 +23,7 @@ import kafka.coordinator.AbstractCoordinatorConcurrencyTest import kafka.coordinator.AbstractCoordinatorConcurrencyTest._ import kafka.coordinator.transaction.TransactionCoordinatorConcurrencyTest._ import kafka.log.UnifiedLog -import kafka.server.{FetchDataInfo, FetchLogEnd, KafkaConfig, MetadataCache, RequestLocal} +import kafka.server.{KafkaConfig, MetadataCache, RequestLocal} import kafka.utils.{Pool, TestUtils} import org.apache.kafka.clients.{ClientResponse, NetworkClient} import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME @@ -34,7 +34,7 @@ import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecor import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch} import org.apache.kafka.common.{Node, TopicPartition} -import org.apache.kafka.server.log.internals.{LogConfig, LogOffsetMetadata} +import org.apache.kafka.server.log.internals.{FetchDataInfo, FetchIsolation, LogConfig, LogOffsetMetadata} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.{ArgumentCaptor, ArgumentMatchers} @@ -467,9 +467,9 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren when(logMock.logStartOffset).thenReturn(startOffset) when(logMock.read(ArgumentMatchers.eq(startOffset), maxLength = anyInt, - isolation = ArgumentMatchers.eq(FetchLogEnd), + isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END), minOneMessage = ArgumentMatchers.eq(true))) - .thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock)) + .thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock)) when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes) val bufferCaptor: ArgumentCaptor[ByteBuffer] = ArgumentCaptor.forClass(classOf[ByteBuffer]) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 12ca515283245..a15d51e555a37 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.locks.ReentrantLock import javax.management.ObjectName import kafka.log.UnifiedLog -import kafka.server.{FetchDataInfo, FetchLogEnd, ReplicaManager, RequestLocal} +import kafka.server.{ReplicaManager, RequestLocal} import kafka.utils.{Pool, TestUtils} import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition @@ -33,7 +33,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.utils.MockTime -import org.apache.kafka.server.log.internals.{AppendOrigin, LogConfig, LogOffsetMetadata} +import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, FetchIsolation, LogConfig, LogOffsetMetadata} import org.apache.kafka.server.util.MockScheduler import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -155,9 +155,9 @@ class TransactionStateManagerTest { when(logMock.logStartOffset).thenReturn(startOffset) when(logMock.read(ArgumentMatchers.eq(startOffset), maxLength = anyInt(), - isolation = ArgumentMatchers.eq(FetchLogEnd), + isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END), minOneMessage = ArgumentMatchers.eq(true)) - ).thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock)) + ).thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock)) when(replicaManager.getLogEndOffset(topicPartition)).thenReturn(Some(endOffset)) txnMetadata1.state = PrepareCommit @@ -839,9 +839,9 @@ class TransactionStateManagerTest { when(logMock.logStartOffset).thenReturn(startOffset) when(logMock.read(ArgumentMatchers.eq(startOffset), maxLength = anyInt(), - isolation = ArgumentMatchers.eq(FetchLogEnd), + isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END), minOneMessage = ArgumentMatchers.eq(true)) - ).thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), MemoryRecords.EMPTY)) + ).thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset), MemoryRecords.EMPTY)) when(replicaManager.getLogEndOffset(topicPartition)).thenReturn(Some(endOffset)) transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 0, (_, _, _, _) => ()) @@ -853,7 +853,7 @@ class TransactionStateManagerTest { verify(logMock).logStartOffset verify(logMock).read(ArgumentMatchers.eq(startOffset), maxLength = anyInt(), - isolation = ArgumentMatchers.eq(FetchLogEnd), + isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END), minOneMessage = ArgumentMatchers.eq(true)) verify(replicaManager, times(2)).getLogEndOffset(topicPartition) assertEquals(0, transactionManager.loadingPartitions.size) @@ -1016,9 +1016,9 @@ class TransactionStateManagerTest { when(logMock.logStartOffset).thenReturn(startOffset) when(logMock.read(ArgumentMatchers.eq(startOffset), maxLength = anyInt(), - isolation = ArgumentMatchers.eq(FetchLogEnd), + isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END), minOneMessage = ArgumentMatchers.eq(true))) - .thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock)) + .thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock)) when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes) diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index df77aaaed5fde..148d2b3560bea 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -17,12 +17,12 @@ package kafka.log -import kafka.server.{BrokerTopicStats, FetchLogEnd} +import kafka.server.BrokerTopicStats import kafka.utils._ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.log.internals.{LogConfig, LogDirFailureChannel} +import org.apache.kafka.server.log.internals.{FetchIsolation, LogConfig, LogDirFailureChannel} import org.apache.kafka.server.record.BrokerCompressionType import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api._ @@ -76,7 +76,7 @@ class BrokerCompressionTest { def readBatch(offset: Int): RecordBatch = { val fetchInfo = log.read(offset, maxLength = 4096, - isolation = FetchLogEnd, + isolation = FetchIsolation.LOG_END, minOneMessage = true) fetchInfo.records.batches.iterator.next() } diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala index ae79e42a98393..d9e0bafea276f 100644 --- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala @@ -22,13 +22,13 @@ import java.nio.channels.ClosedChannelException import java.nio.charset.StandardCharsets import java.util.regex.Pattern import java.util.Collections -import kafka.server.{FetchDataInfo, KafkaConfig} +import kafka.server.KafkaConfig import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, SimpleRecord} import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.server.log.internals.{LogConfig, LogDirFailureChannel, LogOffsetMetadata} +import org.apache.kafka.server.log.internals.{FetchDataInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata} import org.apache.kafka.server.util.Scheduler import org.junit.jupiter.api.Assertions.{assertFalse, _} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala index a66f6fe5a072c..a6961324f45c1 100644 --- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala @@ -19,12 +19,12 @@ package kafka.log import java.util.Properties import java.util.concurrent.{Callable, Executors} -import kafka.server.{BrokerTopicStats, FetchHighWatermark} +import kafka.server.BrokerTopicStats import kafka.utils.TestUtils import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record.SimpleRecord import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.server.log.internals.{LogConfig, LogDirFailureChannel} +import org.apache.kafka.server.log.internals.{FetchIsolation, LogConfig, LogDirFailureChannel} import org.apache.kafka.server.util.KafkaScheduler import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -93,7 +93,7 @@ class LogConcurrencyTest { val readInfo = log.read( startOffset = fetchOffset, maxLength = 1, - isolation = FetchHighWatermark, + isolation = FetchIsolation.HIGH_WATERMARK, minOneMessage = true ) readInfo.records.batches().forEach { batch => diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 4e870d553f812..464af9c6aa11f 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import java.nio.file.{Files, NoSuchFileException, Paths} import java.util.Properties import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} -import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig} +import kafka.server.{BrokerTopicStats, KafkaConfig} import kafka.server.metadata.MockConfigRepository import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.common.TopicPartition @@ -32,7 +32,7 @@ import org.apache.kafka.common.record.{CompressionType, ControlRecordType, Defau import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0 -import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, LogConfig, LogDirFailureChannel, OffsetIndex, SnapshotFile} +import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, OffsetIndex, SnapshotFile} import org.apache.kafka.server.util.Scheduler import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} import org.junit.jupiter.api.function.Executable diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 1e75ecbcff53d..22ebbfc81dda9 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -21,7 +21,7 @@ import com.yammer.metrics.core.{Gauge, MetricName} import kafka.log.remote.RemoteIndexCache import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.metadata.{ConfigRepository, MockConfigRepository} -import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchLogEnd} +import kafka.server.BrokerTopicStats import kafka.utils._ import org.apache.directory.api.util.FileUtils import org.apache.kafka.common.config.TopicConfig @@ -38,7 +38,7 @@ import java.io._ import java.nio.file.Files import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Future} import java.util.{Collections, Properties} -import org.apache.kafka.server.log.internals.{LogConfig, LogDirFailureChannel} +import org.apache.kafka.server.log.internals.{FetchDataInfo, FetchIsolation, LogConfig, LogDirFailureChannel} import org.apache.kafka.server.metrics.KafkaYammerMetrics import scala.collection.{Map, mutable} @@ -517,7 +517,7 @@ class LogManagerTest { } private def readLog(log: UnifiedLog, offset: Long, maxLength: Int = 1024): FetchDataInfo = { - log.read(offset, maxLength, isolation = FetchLogEnd, minOneMessage = true) + log.read(offset, maxLength, isolation = FetchIsolation.LOG_END, minOneMessage = true) } /** diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index f78c519e2b65b..a3cac4c27d35e 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -22,12 +22,12 @@ import kafka.log.remote.RemoteLogManager import java.io.File import java.util.Properties import kafka.server.checkpoints.LeaderEpochCheckpointFile -import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd} +import kafka.server.BrokerTopicStats import kafka.utils.TestUtils import org.apache.kafka.common.Uuid import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, LazyIndex, LogConfig, LogDirFailureChannel, TransactionIndex} +import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, FetchDataInfo, FetchIsolation, LazyIndex, LogConfig, LogDirFailureChannel, TransactionIndex} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse} import java.nio.file.Files @@ -234,7 +234,7 @@ object LogTestUtils { def readLog(log: UnifiedLog, startOffset: Long, maxLength: Int, - isolation: FetchIsolation = FetchLogEnd, + isolation: FetchIsolation = FetchIsolation.LOG_END, minOneMessage: Boolean = true): FetchDataInfo = { log.read(startOffset, maxLength, isolation, minOneMessage) } diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index ca56a20371a75..f86102d0b318c 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -26,7 +26,7 @@ import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException import kafka.log.remote.RemoteLogManager import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} -import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, PartitionMetadataFile} +import kafka.server.{BrokerTopicStats, KafkaConfig, PartitionMetadataFile} import kafka.utils._ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid} @@ -38,7 +38,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} -import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, LogConfig, LogOffsetMetadata, RecordValidationException} +import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, FetchIsolation, LogConfig, LogOffsetMetadata, RecordValidationException} import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.{KafkaScheduler, Scheduler} @@ -92,7 +92,7 @@ class UnifiedLogTest { val readInfo = log.read( startOffset = fetchOffset, maxLength = 2048, - isolation = FetchHighWatermark, + isolation = FetchIsolation.HIGH_WATERMARK, minOneMessage = false) assertEquals(expectedSize, readInfo.records.sizeInBytes) assertEquals(expectedOffsets, readInfo.records.records.asScala.map(_.offset)) @@ -285,9 +285,9 @@ class UnifiedLogTest { assertTrue(readInfo.records.sizeInBytes > 0) val upperBoundOffset = isolation match { - case FetchLogEnd => log.logEndOffset - case FetchHighWatermark => log.highWatermark - case FetchTxnCommitted => log.lastStableOffset + case FetchIsolation.LOG_END => log.logEndOffset + case FetchIsolation.HIGH_WATERMARK => log.highWatermark + case FetchIsolation.TXN_COMMITTED => log.lastStableOffset } for (record <- readInfo.records.records.asScala) @@ -324,7 +324,7 @@ class UnifiedLogTest { )), leaderEpoch = 0) (log.logStartOffset until log.logEndOffset).foreach { offset => - assertNonEmptyFetch(log, offset, FetchLogEnd) + assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END) } } @@ -345,11 +345,11 @@ class UnifiedLogTest { def assertHighWatermarkBoundedFetches(): Unit = { (log.logStartOffset until log.highWatermark).foreach { offset => - assertNonEmptyFetch(log, offset, FetchHighWatermark) + assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK) } (log.highWatermark to log.logEndOffset).foreach { offset => - assertEmptyFetch(log, offset, FetchHighWatermark) + assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK) } } @@ -441,11 +441,11 @@ class UnifiedLogTest { def assertLsoBoundedFetches(): Unit = { (log.logStartOffset until log.lastStableOffset).foreach { offset => - assertNonEmptyFetch(log, offset, FetchTxnCommitted) + assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED) } (log.lastStableOffset to log.logEndOffset).foreach { offset => - assertEmptyFetch(log, offset, FetchTxnCommitted) + assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED) } } @@ -2941,7 +2941,7 @@ class UnifiedLogTest { val readInfo = log.read( startOffset = currentLogEndOffset, maxLength = Int.MaxValue, - isolation = FetchTxnCommitted, + isolation = FetchIsolation.TXN_COMMITTED, minOneMessage = false) if (readInfo.records.sizeInBytes() > 0) @@ -3371,13 +3371,12 @@ class UnifiedLogTest { // now check that a fetch includes the aborted transaction val fetchDataInfo = log.read(0L, maxLength = 2048, - isolation = FetchTxnCommitted, + isolation = FetchIsolation.TXN_COMMITTED, minOneMessage = true) - assertEquals(1, fetchDataInfo.abortedTransactions.size) - assertTrue(fetchDataInfo.abortedTransactions.isDefined) - assertEquals(new FetchResponseData.AbortedTransaction().setProducerId(pid).setFirstOffset(0), - fetchDataInfo.abortedTransactions.get.head) + assertTrue(fetchDataInfo.abortedTransactions.isPresent) + assertEquals(1, fetchDataInfo.abortedTransactions.get.size) + assertEquals(new FetchResponseData.AbortedTransaction().setProducerId(pid).setFirstOffset(0), fetchDataInfo.abortedTransactions.get.get(0)) } @Test diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 81c2c9ffbe2f4..679a8a50a3b5e 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets import java.util import java.util.Arrays.asList import java.util.concurrent.{CompletableFuture, TimeUnit} -import java.util.{Collections, Optional, Properties} +import java.util.{Collections, Optional, OptionalInt, OptionalLong, Properties} import kafka.api.LeaderAndIsr import kafka.cluster.Broker import kafka.controller.{ControllerContext, KafkaController} @@ -92,7 +92,7 @@ import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartiti import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1} -import org.apache.kafka.server.log.internals.AppendOrigin +import org.apache.kafka.server.log.internals.{AppendOrigin, FetchParams, FetchPartitionData} class KafkaApisTest { private val requestChannel: RequestChannel = mock(classOf[RequestChannel]) @@ -2883,8 +2883,8 @@ class KafkaApisTest { val callback = invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8))) - callback(Seq(tidp -> FetchPartitionData(Errors.NONE, hw, 0, records, - None, None, None, Option.empty, isReassignmentFetch = false))) + callback(Seq(tidp -> new FetchPartitionData(Errors.NONE, hw, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false))) }) val fetchData = Map(tidp -> new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000, @@ -4007,8 +4007,8 @@ class KafkaApisTest { any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]() )).thenAnswer(invocation => { val callback = invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] - callback(Seq(tidp0 -> FetchPartitionData(Errors.NONE, hw, 0, records, - None, None, None, Option.empty, isReassignmentFetch = isReassigning))) + callback(Seq(tidp0 -> new FetchPartitionData(Errors.NONE, hw, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), isReassigning))) }) val fetchMetadata = new JFetchMetadata(0, 0) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index 09939f43fddad..55470fba7f9d0 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -16,7 +16,6 @@ */ package kafka.server -import kafka.api.Request import kafka.cluster.{BrokerEndPoint, Partition} import kafka.log.{LogManager, UnifiedLog} import kafka.server.AbstractFetcherThread.ResultWithPartitions @@ -32,13 +31,14 @@ import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.log.internals.{FetchIsolation, FetchParams, FetchPartitionData} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.mockito.ArgumentMatchers.{any, anyBoolean} import org.mockito.Mockito.{doNothing, mock, never, times, verify, when} import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} -import java.util.{Collections, Optional} +import java.util.{Collections, Optional, OptionalInt, OptionalLong} import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ @@ -136,16 +136,16 @@ class ReplicaAlterLogDirsThreadTest { val fencedRequestData = new FetchRequest.PartitionData(topicId, 0L, 0L, config.replicaFetchMaxBytes, Optional.of(leaderEpoch - 1)) - val fencedResponseData = FetchPartitionData( - error = Errors.FENCED_LEADER_EPOCH, - highWatermark = -1, - logStartOffset = -1, - records = MemoryRecords.EMPTY, - divergingEpoch = None, - lastStableOffset = None, - abortedTransactions = None, - preferredReadReplica = None, - isReassignmentFetch = false) + val fencedResponseData = new FetchPartitionData( + Errors.FENCED_LEADER_EPOCH, + -1, + -1, + MemoryRecords.EMPTY, + Optional.empty(), + OptionalLong.empty(), + Optional.empty(), + OptionalInt.empty(), + false) mockFetchFromCurrentLog(tid1p0, fencedRequestData, config, replicaManager, fencedResponseData) val endPoint = new BrokerEndPoint(0, "localhost", 1000) @@ -177,16 +177,16 @@ class ReplicaAlterLogDirsThreadTest { val requestData = new FetchRequest.PartitionData(topicId, 0L, 0L, config.replicaFetchMaxBytes, Optional.of(leaderEpoch)) - val responseData = FetchPartitionData( - error = Errors.NONE, - highWatermark = 0L, - logStartOffset = 0L, - records = MemoryRecords.EMPTY, - divergingEpoch = None, - lastStableOffset = None, - abortedTransactions = None, - preferredReadReplica = None, - isReassignmentFetch = false) + val responseData = new FetchPartitionData( + Errors.NONE, + 0L, + 0L, + MemoryRecords.EMPTY, + Optional.empty(), + OptionalLong.empty(), + Optional.empty(), + OptionalInt.empty(), + false) mockFetchFromCurrentLog(tid1p0, requestData, config, replicaManager, responseData) thread.doWork() @@ -235,16 +235,16 @@ class ReplicaAlterLogDirsThreadTest { val requestData = new FetchRequest.PartitionData(topicId, 0L, 0L, config.replicaFetchMaxBytes, Optional.of(leaderEpoch)) - val responseData = FetchPartitionData( - error = Errors.NONE, - highWatermark = 0L, - logStartOffset = 0L, - records = MemoryRecords.EMPTY, - divergingEpoch = None, - lastStableOffset = None, - abortedTransactions = None, - preferredReadReplica = None, - isReassignmentFetch = false) + val responseData = new FetchPartitionData( + Errors.NONE, + 0L, + 0L, + MemoryRecords.EMPTY, + Optional.empty(), + OptionalLong.empty(), + Optional.empty(), + OptionalInt.empty(), + false) mockFetchFromCurrentLog(tid1p0, requestData, config, replicaManager, responseData) val endPoint = new BrokerEndPoint(0, "localhost", 1000) @@ -276,25 +276,22 @@ class ReplicaAlterLogDirsThreadTest { val callbackCaptor: ArgumentCaptor[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] = ArgumentCaptor.forClass(classOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]) - val expectedFetchParams = FetchParams( - requestVersion = ApiKeys.FETCH.latestVersion, - replicaId = Request.FutureLocalReplicaId, - maxWaitMs = 0L, - minBytes = 0, - maxBytes = config.replicaFetchResponseMaxBytes, - isolation = FetchLogEnd, - clientMetadata = None + val expectedFetchParams = new FetchParams( + ApiKeys.FETCH.latestVersion, + FetchRequest.FUTURE_LOCAL_REPLICA_ID, + 0L, + 0, + config.replicaFetchResponseMaxBytes, + FetchIsolation.LOG_END, + Optional.empty() ) - println(expectedFetchParams) - when(replicaManager.fetchMessages( params = ArgumentMatchers.eq(expectedFetchParams), fetchInfos = ArgumentMatchers.eq(Seq(topicIdPartition -> requestData)), quota = ArgumentMatchers.eq(UnboundedQuota), responseCallback = callbackCaptor.capture(), )).thenAnswer(_ => { - println("Did we get the callback?") callbackCaptor.getValue.apply(Seq((topicIdPartition, responseData))) }) } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala index f2db6eeee6ee6..9b9c8bad89ead 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala @@ -38,7 +38,7 @@ import org.apache.kafka.common.{IsolationLevel, TopicIdPartition, TopicPartition import org.apache.kafka.image.{MetadataDelta, MetadataImage} import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.PartitionRegistration -import org.apache.kafka.server.log.internals.{AppendOrigin, LogConfig, LogDirFailureChannel} +import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} import org.mockito.Mockito @@ -227,14 +227,14 @@ class ReplicaManagerConcurrencyTest { } } - val fetchParams = FetchParams( - requestVersion = ApiKeys.FETCH.latestVersion, - replicaId = replicaId, - maxWaitMs = random.nextInt(100), - minBytes = 1, - maxBytes = 1024 * 1024, - isolation = FetchIsolation(replicaId, IsolationLevel.READ_UNCOMMITTED), - clientMetadata = Some(clientMetadata) + val fetchParams = new FetchParams( + ApiKeys.FETCH.latestVersion, + replicaId, + random.nextInt(100), + 1, + 1024 * 1024, + FetchIsolation.of(replicaId, IsolationLevel.READ_UNCOMMITTED), + Optional.of(clientMetadata) ) replicaManager.fetchMessages( diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 91dc64cf8ad87..58d7c7652d714 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.metadata.LeaderRecoveryState -import org.apache.kafka.server.log.internals.{LogDirFailureChannel, LogOffsetMetadata} +import org.apache.kafka.server.log.internals.{FetchDataInfo, FetchIsolation, FetchParams, LogDirFailureChannel, LogOffsetMetadata} import org.apache.kafka.server.util.KafkaScheduler import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} @@ -171,14 +171,14 @@ class ReplicaManagerQuotasTest { val fetchPartitionStatus = FetchPartitionStatus( new LogOffsetMetadata(50L, 0L, 250), new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty())) - val fetchParams = FetchParams( - requestVersion = ApiKeys.FETCH.latestVersion, - replicaId = 1, - maxWaitMs = 600, - minBytes = 1, - maxBytes = 1000, - isolation = FetchLogEnd, - clientMetadata = None + val fetchParams = new FetchParams( + ApiKeys.FETCH.latestVersion, + 1, + 600, + 1, + 1000, + FetchIsolation.LOG_END, + Optional.empty() ) new DelayedFetch( @@ -222,14 +222,14 @@ class ReplicaManagerQuotasTest { val fetchPartitionStatus = FetchPartitionStatus( new LogOffsetMetadata(50L, 0L, 250), new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty())) - val fetchParams = FetchParams( - requestVersion = ApiKeys.FETCH.latestVersion, - replicaId = FetchRequest.CONSUMER_REPLICA_ID, - maxWaitMs = 600, - minBytes = 1, - maxBytes = 1000, - isolation = FetchHighWatermark, - clientMetadata = None + val fetchParams = new FetchParams( + ApiKeys.FETCH.latestVersion, + FetchRequest.CONSUMER_REPLICA_ID, + 600L, + 1, + 1000, + FetchIsolation.HIGH_WATERMARK, + Optional.empty() ) new DelayedFetch( @@ -265,7 +265,7 @@ class ReplicaManagerQuotasTest { maxLength = AdditionalMatchers.geq(1), isolation = any[FetchIsolation], minOneMessage = anyBoolean)).thenReturn( - FetchDataInfo( + new FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), MemoryRecords.withRecords(CompressionType.NONE, record) )) @@ -275,7 +275,7 @@ class ReplicaManagerQuotasTest { maxLength = ArgumentMatchers.eq(0), isolation = any[FetchIsolation], minOneMessage = anyBoolean)).thenReturn( - FetchDataInfo( + new FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), MemoryRecords.EMPTY )) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index fc9c320fbb8ec..a6b25aaf324d0 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -24,7 +24,7 @@ import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.stream.IntStream -import java.util.{Collections, Optional, Properties} +import java.util.{Collections, Optional, OptionalLong, Properties} import kafka.api._ import kafka.cluster.{BrokerEndPoint, Partition} import kafka.log._ @@ -35,7 +35,6 @@ import kafka.utils.timer.MockTimer import kafka.utils.{MockTime, Pool, TestUtils} import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.errors.KafkaStorageException -import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.message.LeaderAndIsrRequestData import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset @@ -58,7 +57,7 @@ import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, C import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 -import org.apache.kafka.server.log.internals.{AppendOrigin, LogConfig, LogDirFailureChannel, LogOffsetMetadata} +import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata} import org.apache.kafka.server.util.MockScheduler import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -71,6 +70,7 @@ import org.mockito.ArgumentMatchers.{any, anyInt, anyString} import org.mockito.Mockito.{mock, never, reset, times, verify, when} import scala.collection.{Map, Seq, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 import scala.jdk.CollectionConverters._ class ReplicaManagerTest { @@ -584,8 +584,8 @@ class ReplicaManagerTest { var fetchData = consumerFetchResult.assertFired assertEquals(Errors.NONE, fetchData.error) assertTrue(fetchData.records.batches.asScala.isEmpty) - assertEquals(Some(0), fetchData.lastStableOffset) - assertEquals(Some(List.empty[FetchResponseData.AbortedTransaction]), fetchData.abortedTransactions) + assertEquals(OptionalLong.of(0), fetchData.lastStableOffset) + assertEquals(Optional.of(Collections.emptyList()), fetchData.abortedTransactions) // delayed fetch should timeout and return nothing consumerFetchResult = fetchPartitionAsConsumer( @@ -602,8 +602,8 @@ class ReplicaManagerTest { fetchData = consumerFetchResult.assertFired assertEquals(Errors.NONE, fetchData.error) assertTrue(fetchData.records.batches.asScala.isEmpty) - assertEquals(Some(0), fetchData.lastStableOffset) - assertEquals(Some(List.empty[FetchResponseData.AbortedTransaction]), fetchData.abortedTransactions) + assertEquals(OptionalLong.of(0), fetchData.lastStableOffset) + assertEquals(Optional.of(Collections.emptyList()), fetchData.abortedTransactions) // now commit the transaction val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0) @@ -640,8 +640,8 @@ class ReplicaManagerTest { fetchData = consumerFetchResult.assertFired assertEquals(Errors.NONE, fetchData.error) - assertEquals(Some(numRecords + 1), fetchData.lastStableOffset) - assertEquals(Some(List.empty[FetchResponseData.AbortedTransaction]), fetchData.abortedTransactions) + assertEquals(OptionalLong.of(numRecords + 1), fetchData.lastStableOffset) + assertEquals(Optional.of(Collections.emptyList()), fetchData.abortedTransactions) assertEquals(numRecords + 1, fetchData.records.batches.asScala.size) } finally { replicaManager.shutdown(checkpointHW = false) @@ -721,12 +721,12 @@ class ReplicaManagerTest { val fetchData = fetchResult.assertFired assertEquals(Errors.NONE, fetchData.error) - assertEquals(Some(numRecords + 1), fetchData.lastStableOffset) + assertEquals(OptionalLong.of(numRecords + 1), fetchData.lastStableOffset) assertEquals(numRecords + 1, fetchData.records.records.asScala.size) - assertTrue(fetchData.abortedTransactions.isDefined) + assertTrue(fetchData.abortedTransactions.isPresent) assertEquals(1, fetchData.abortedTransactions.get.size) - val abortedTransaction = fetchData.abortedTransactions.get.head + val abortedTransaction = fetchData.abortedTransactions.get.get(0) assertEquals(0L, abortedTransaction.firstOffset) assertEquals(producerId, abortedTransaction.producerId) } finally { @@ -885,7 +885,7 @@ class ReplicaManagerTest { ) assertEquals(Errors.NONE, divergingEpochResult.assertFired.error) - assertTrue(divergingEpochResult.assertFired.divergingEpoch.isDefined) + assertTrue(divergingEpochResult.assertFired.divergingEpoch.isPresent) assertEquals(0L, followerReplica.stateSnapshot.logStartOffset) assertEquals(0L, followerReplica.stateSnapshot.logEndOffset) } finally { @@ -1083,7 +1083,7 @@ class ReplicaManagerTest { // the response contains high watermark on the leader before it is updated based // on this fetch request assertEquals(0, tp0Status.get.highWatermark) - assertEquals(Some(0), tp0Status.get.lastStableOffset) + assertEquals(OptionalLong.of(0), tp0Status.get.lastStableOffset) assertEquals(Errors.NONE, tp0Status.get.error) assertTrue(tp0Status.get.records.batches.iterator.hasNext) @@ -1222,7 +1222,7 @@ class ReplicaManagerTest { // We expect to select the leader, which means we return None val preferredReadReplica: Option[Int] = replicaManager.findPreferredReadReplica( - partition, metadata, Request.OrdinaryConsumerId, 1L, System.currentTimeMillis) + partition, metadata, FetchRequest.ORDINARY_CONSUMER_ID, 1L, System.currentTimeMillis) assertFalse(preferredReadReplica.isDefined) } @@ -1274,7 +1274,7 @@ class ReplicaManagerTest { assertTrue(consumerResult.hasFired) // But only leader will compute preferred replica - assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty) + assertTrue(!consumerResult.assertFired.preferredReadReplica.isPresent) } finally { replicaManager.shutdown() } @@ -1330,7 +1330,7 @@ class ReplicaManagerTest { assertTrue(consumerResult.hasFired) // Returns a preferred replica (should just be the leader, which is None) - assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined) + assertFalse(consumerResult.assertFired.preferredReadReplica.isPresent) } finally { replicaManager.shutdown() } @@ -1460,7 +1460,7 @@ class ReplicaManagerTest { assertEquals(0, replicaManager.replicaSelectorOpt.get.asInstanceOf[MockReplicaSelector].getSelectionCount) // Only leader will compute preferred replica - assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty) + assertTrue(!consumerResult.assertFired.preferredReadReplica.isPresent) } finally replicaManager.shutdown(checkpointHW = false) } @@ -1527,7 +1527,7 @@ class ReplicaManagerTest { assertEquals(0, replicaManager.delayedFetchPurgatory.watched) // Returns a preferred replica - assertTrue(consumerResult.assertFired.preferredReadReplica.isDefined) + assertTrue(consumerResult.assertFired.preferredReadReplica.isPresent) } finally replicaManager.shutdown(checkpointHW = false) } @@ -2239,13 +2239,13 @@ class ReplicaManagerTest { clientMetadata: Option[ClientMetadata] = None, ): CallbackResult[FetchPartitionData] = { val isolation = isolationLevel match { - case IsolationLevel.READ_COMMITTED => FetchTxnCommitted - case IsolationLevel.READ_UNCOMMITTED => FetchHighWatermark + case IsolationLevel.READ_COMMITTED => FetchIsolation.TXN_COMMITTED + case IsolationLevel.READ_UNCOMMITTED => FetchIsolation.HIGH_WATERMARK } fetchPartition( replicaManager, - replicaId = Request.OrdinaryConsumerId, + replicaId = FetchRequest.ORDINARY_CONSUMER_ID, partition, partitionData, minBytes, @@ -2272,7 +2272,7 @@ class ReplicaManagerTest { partitionData, minBytes = minBytes, maxBytes = maxBytes, - isolation = FetchLogEnd, + isolation = FetchIsolation.LOG_END, clientMetadata = None, maxWaitMs = maxWaitMs ) @@ -2322,17 +2322,17 @@ class ReplicaManagerTest { minBytes: Int = 1, maxBytes: Int = 1024 * 1024, quota: ReplicaQuota = UnboundedQuota, - isolation: FetchIsolation = FetchLogEnd, + isolation: FetchIsolation = FetchIsolation.LOG_END, clientMetadata: Option[ClientMetadata] = None ): Unit = { - val params = FetchParams( - requestVersion = requestVersion, - replicaId = replicaId, - maxWaitMs = maxWaitMs, - minBytes = minBytes, - maxBytes = maxBytes, - isolation = isolation, - clientMetadata = clientMetadata + val params = new FetchParams( + requestVersion, + replicaId, + maxWaitMs, + minBytes, + maxBytes, + isolation, + clientMetadata.asJava ) replicaManager.fetchMessages( diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index ddacb506778aa..980c8159c218a 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -23,7 +23,7 @@ import java.util import java.util.Properties import kafka.log.{LogTestUtils, ProducerStateManagerConfig, UnifiedLog} import kafka.raft.{KafkaMetadataLog, MetadataLogConfig} -import kafka.server.{BrokerTopicStats, FetchLogEnd, KafkaRaftServer} +import kafka.server.{BrokerTopicStats, KafkaRaftServer} import kafka.tools.DumpLogSegments.TimeIndexDumpErrors import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.common.Uuid @@ -36,7 +36,7 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch} import org.apache.kafka.server.common.ApiMessageAndVersion -import org.apache.kafka.server.log.internals.{AppendOrigin, LogConfig, LogDirFailureChannel} +import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel} import org.apache.kafka.snapshot.RecordsSnapshotWriter import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -480,7 +480,7 @@ class DumpLogSegmentsTest { val logReadInfo = log.read( startOffset = 0, maxLength = Int.MaxValue, - isolation = FetchLogEnd, + isolation = FetchIsolation.LOG_END, minOneMessage = true ) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 04c6e3cd060ef..c864c6329790c 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -61,7 +61,7 @@ import org.apache.kafka.common.network.{ClientInformation, ListenerName, Mode} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity} import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, RequestContext, RequestHeader} +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, FetchRequest, RequestContext, RequestHeader} import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer} @@ -1218,7 +1218,7 @@ object TestUtils extends Logging { waitUntilTrue( () => brokers.forall { broker => broker.metadataCache.getPartitionInfo(topic, partition) match { - case Some(partitionState) => Request.isValidBrokerId(partitionState.leader) + case Some(partitionState) => FetchRequest.isValidBrokerId(partitionState.leader) case _ => false } }, diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/FetchDataInfo.java b/storage/src/main/java/org/apache/kafka/server/log/internals/FetchDataInfo.java new file mode 100644 index 0000000000000..a07bc7710904c --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/FetchDataInfo.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Records; + +import java.util.List; +import java.util.Optional; + +public class FetchDataInfo { + public final LogOffsetMetadata fetchOffsetMetadata; + public final Records records; + public final boolean firstEntryIncomplete; + public final Optional> abortedTransactions; + + public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, + Records records) { + this(fetchOffsetMetadata, records, false, Optional.empty()); + } + + public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, + Records records, + boolean firstEntryIncomplete, + Optional> abortedTransactions) { + this.fetchOffsetMetadata = fetchOffsetMetadata; + this.records = records; + this.firstEntryIncomplete = firstEntryIncomplete; + this.abortedTransactions = abortedTransactions; + } + + public static FetchDataInfo empty(long fetchOffset) { + return new FetchDataInfo(new LogOffsetMetadata(fetchOffset), MemoryRecords.EMPTY); + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/FetchIsolation.java b/storage/src/main/java/org/apache/kafka/server/log/internals/FetchIsolation.java new file mode 100644 index 0000000000000..47457af686d32 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/FetchIsolation.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.requests.FetchRequest; + +public enum FetchIsolation { + LOG_END, + HIGH_WATERMARK, + TXN_COMMITTED; + + public static FetchIsolation of(FetchRequest request) { + return of(request.replicaId(), request.isolationLevel()); + } + + public static FetchIsolation of(int replicaId, IsolationLevel isolationLevel) { + if (!FetchRequest.isConsumer(replicaId)) { + return LOG_END; + } else if (isolationLevel == IsolationLevel.READ_COMMITTED) { + return TXN_COMMITTED; + } else { + return HIGH_WATERMARK; + } + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/FetchParams.java b/storage/src/main/java/org/apache/kafka/server/log/internals/FetchParams.java new file mode 100644 index 0000000000000..3175b6e85fbb3 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/FetchParams.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.replica.ClientMetadata; +import org.apache.kafka.common.requests.FetchRequest; + +import java.util.Objects; +import java.util.Optional; + +public class FetchParams { + public final short requestVersion; + public final int replicaId; + public final long maxWaitMs; + public final int minBytes; + public final int maxBytes; + public final FetchIsolation isolation; + public final Optional clientMetadata; + + public FetchParams(short requestVersion, + int replicaId, + long maxWaitMs, + int minBytes, + int maxBytes, + FetchIsolation isolation, + Optional clientMetadata) { + Objects.requireNonNull(isolation); + Objects.requireNonNull(clientMetadata); + this.requestVersion = requestVersion; + this.replicaId = replicaId; + this.maxWaitMs = maxWaitMs; + this.minBytes = minBytes; + this.maxBytes = maxBytes; + this.isolation = isolation; + this.clientMetadata = clientMetadata; + } + + public boolean isFromFollower() { + return FetchRequest.isValidBrokerId(replicaId); + } + + public boolean isFromConsumer() { + return FetchRequest.isConsumer(replicaId); + } + + public boolean fetchOnlyLeader() { + return isFromFollower() || (isFromConsumer() && !clientMetadata.isPresent()); + } + + public boolean hardMaxBytesLimit() { + return requestVersion <= 2; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FetchParams that = (FetchParams) o; + return requestVersion == that.requestVersion + && replicaId == that.replicaId + && maxWaitMs == that.maxWaitMs + && minBytes == that.minBytes + && maxBytes == that.maxBytes + && isolation.equals(that.isolation) + && clientMetadata.equals(that.clientMetadata); + } + + @Override + public int hashCode() { + int result = requestVersion; + result = 31 * result + replicaId; + result = 31 * result + Long.hashCode(32); + result = 31 * result + minBytes; + result = 31 * result + maxBytes; + result = 31 * result + isolation.hashCode(); + result = 31 * result + clientMetadata.hashCode(); + return result; + } + + @Override + public String toString() { + return "FetchParams(" + + "requestVersion=" + requestVersion + + ", replicaId=" + replicaId + + ", maxWaitMs=" + maxWaitMs + + ", minBytes=" + minBytes + + ", maxBytes=" + maxBytes + + ", isolation=" + isolation + + ", clientMetadata=" + clientMetadata + + ')'; + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/FetchPartitionData.java b/storage/src/main/java/org/apache/kafka/server/log/internals/FetchPartitionData.java new file mode 100644 index 0000000000000..c101f90976209 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/FetchPartitionData.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.OptionalInt; +import java.util.OptionalLong; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.Records; + +import java.util.List; +import java.util.Optional; + +public class FetchPartitionData { + public final Errors error; + public final long highWatermark; + public final long logStartOffset; + public final Records records; + public final Optional divergingEpoch; + public final OptionalLong lastStableOffset; + public final Optional> abortedTransactions; + public final OptionalInt preferredReadReplica; + public final boolean isReassignmentFetch; + + public FetchPartitionData(Errors error, + long highWatermark, + long logStartOffset, + Records records, + Optional divergingEpoch, + OptionalLong lastStableOffset, + Optional> abortedTransactions, + OptionalInt preferredReadReplica, + boolean isReassignmentFetch) { + this.error = error; + this.highWatermark = highWatermark; + this.logStartOffset = logStartOffset; + this.records = records; + this.divergingEpoch = divergingEpoch; + this.lastStableOffset = lastStableOffset; + this.abortedTransactions = abortedTransactions; + this.preferredReadReplica = preferredReadReplica; + this.isReassignmentFetch = isReassignmentFetch; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index f0915c8b88eea..14478f7b140b2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.integration.utils; -import kafka.api.Request; import kafka.server.KafkaServer; import kafka.server.MetadataCache; import org.apache.kafka.clients.admin.Admin; @@ -35,6 +34,7 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState; +import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; @@ -949,7 +949,7 @@ private static void waitUntilMetadataIsPropagated(final List server } final UpdateMetadataPartitionState metadataPartitionState = partitionInfo.get(); - if (!Request.isValidBrokerId(metadataPartitionState.leader())) { + if (!FetchRequest.isValidBrokerId(metadataPartitionState.leader())) { invalidBrokerIds.add(server); } }