diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index 485f2bdce31d0..fd37367a7fc03 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -70,9 +70,9 @@ object ApiVersion { // Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental fetch requests, // and KafkaStorageException for fetch requests. KAFKA_1_1_IV0, - // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 + // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log divergence between leader and follower after fast leader fail over) KAFKA_2_0_IV0, - // Introduced ApiVersionsRequest V2 via KIP-219 + // Several request versions were bumped due to KIP-219 (Improve quota communication) KAFKA_2_0_IV1, // Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211) KAFKA_2_1_IV0 @@ -257,4 +257,4 @@ case object KAFKA_2_1_IV0 extends DefaultApiVersion { val subVersion = "IV0" val recordVersion = RecordVersion.V2 val id: Int = 18 -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala index 1ee713b10bef3..ed04dcf096fb8 100644 --- a/core/src/main/scala/kafka/server/QuotaFactory.scala +++ b/core/src/main/scala/kafka/server/QuotaFactory.scala @@ -37,7 +37,8 @@ object QuotaFactory extends Logging { object UnboundedQuota extends ReplicaQuota { override def isThrottled(topicPartition: TopicPartition): Boolean = false - override def isQuotaExceeded(): Boolean = false + override def isQuotaExceeded: Boolean = false + def record(value: Long): Unit = () } case class QuotaManagers(fetch: ClientQuotaManager, diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index ce6e350d0bda7..27defd3b50a10 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -47,7 +47,7 @@ class ReplicaFetcherThread(name: String, replicaMgr: ReplicaManager, metrics: Metrics, time: Time, - quota: ReplicationQuotaManager, + quota: ReplicaQuota, leaderEndpointBlockingSend: Option[BlockingSend] = None) extends AbstractFetcherThread(name = name, clientId = name, @@ -63,20 +63,34 @@ class ReplicaFetcherThread(name: String, private val logContext = new LogContext(s"[ReplicaFetcher replicaId=$replicaId, leaderId=${sourceBroker.id}, " + s"fetcherId=$fetcherId] ") this.logIdent = logContext.logPrefix + private val leaderEndpoint = leaderEndpointBlockingSend.getOrElse( new ReplicaFetcherBlockingSend(sourceBroker, brokerConfig, metrics, time, fetcherId, s"broker-$replicaId-fetcher-$fetcherId", logContext)) - private val fetchRequestVersion: Short = - if (brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0) 7 + + // Visible for testing + private[server] val fetchRequestVersion: Short = + if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 8 + else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0) 7 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV1) 3 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1 else 0 - private val offsetForLeaderEpochRequestVersion: Short = + + // Visible for testing + private[server] val offsetForLeaderEpochRequestVersion: Short = if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV0) 1 else 0 + + // Visible for testing + private[server] val listOffsetRequestVersion: Short = + if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 3 + else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 2 + else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) 1 + else 0 + private val fetchMetadataSupported = brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0 private val maxWait = brokerConfig.replicaFetchWaitMaxMs private val minBytes = brokerConfig.replicaFetchMinBytes @@ -242,10 +256,10 @@ class ReplicaFetcherThread(name: String, private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long): Long = { val requestBuilder = if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) { val partitions = Map(topicPartition -> (earliestOrLatest: java.lang.Long)) - ListOffsetRequest.Builder.forReplica(1, replicaId).setTargetTimes(partitions.asJava) + ListOffsetRequest.Builder.forReplica(listOffsetRequestVersion, replicaId).setTargetTimes(partitions.asJava) } else { val partitions = Map(topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1)) - ListOffsetRequest.Builder.forReplica(0, replicaId).setOffsetData(partitions.asJava) + ListOffsetRequest.Builder.forReplica(listOffsetRequestVersion, replicaId).setOffsetData(partitions.asJava) } val clientResponse = leaderEndpoint.sendRequest(requestBuilder) val response = clientResponse.responseBody.asInstanceOf[ListOffsetResponse] diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala index 84004e3f8e66a..7835c9dbafe4e 100644 --- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala @@ -51,8 +51,9 @@ object ReplicationQuotaManagerConfig { } trait ReplicaQuota { + def record(value: Long): Unit def isThrottled(topicPartition: TopicPartition): Boolean - def isQuotaExceeded(): Boolean + def isQuotaExceeded: Boolean } object Constants { @@ -99,7 +100,7 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig, * * @return */ - override def isQuotaExceeded(): Boolean = { + override def isQuotaExceeded: Boolean = { try { sensor().checkQuotas() } catch { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index ac5b7edc3c7f1..fbf77404b026c 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -19,12 +19,13 @@ package kafka.server import kafka.cluster.{BrokerEndPoint, Replica} import kafka.log.LogManager import kafka.cluster.Partition +import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.epoch.LeaderEpochCache import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.requests.EpochEndOffset import org.apache.kafka.common.requests.EpochEndOffset._ @@ -45,6 +46,25 @@ class ReplicaFetcherThreadTest { private val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000) + @Test + def shouldSendLatestRequestVersionsByDefault(): Unit = { + val props = TestUtils.createBrokerConfig(1, "localhost:1234") + val config = KafkaConfig.fromProps(props) + val thread = new ReplicaFetcherThread( + name = "bob", + fetcherId = 0, + sourceBroker = brokerEndPoint, + brokerConfig = config, + replicaMgr = null, + metrics = new Metrics(), + time = new SystemTime(), + quota = UnboundedQuota, + leaderEndpointBlockingSend = None) + assertEquals(ApiKeys.FETCH.latestVersion, thread.fetchRequestVersion) + assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, thread.offsetForLeaderEpochRequestVersion) + assertEquals(ApiKeys.LIST_OFFSETS.latestVersion, thread.listOffsetRequestVersion) + } + @Test def shouldNotIssueLeaderEpochRequestIfInterbrokerVersionBelow11(): Unit = { val props = TestUtils.createBrokerConfig(1, "localhost:1234") @@ -108,7 +128,6 @@ class ReplicaFetcherThreadTest { val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) //Setup all dependencies - val quota = createNiceMock(classOf[ReplicationQuotaManager]) val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) val logManager = createMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) @@ -131,14 +150,15 @@ class ReplicaFetcherThreadTest { //Expectations expect(partition.truncateTo(anyLong(), anyBoolean())).once - replay(leaderEpochs, replicaManager, logManager, quota, replica) + replay(leaderEpochs, replicaManager, logManager, replica) //Define the offsets for the OffsetsForLeaderEpochResponse val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1), t1p1 -> new EpochEndOffset(leaderEpoch, 1)).asJava //Create the fetcher thread val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, brokerEndPoint, new SystemTime()) - val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) + val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, replicaManager, + new Metrics, new SystemTime, UnboundedQuota, Some(mockNetwork)) thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0)) //Loop 1 diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index c6efca573de7e..66a2c8e5fec11 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -52,8 +52,8 @@ class ReplicaManagerQuotasTest { val followerReplicaId = configs.last.brokerId val quota = mockQuota(1000000) - expect(quota.isQuotaExceeded()).andReturn(false).once() - expect(quota.isQuotaExceeded()).andReturn(true).once() + expect(quota.isQuotaExceeded).andReturn(false).once() + expect(quota.isQuotaExceeded).andReturn(true).once() replay(quota) val fetch = replicaManager.readFromLocalLog( @@ -78,8 +78,8 @@ class ReplicaManagerQuotasTest { val followerReplicaId = configs.last.brokerId val quota = mockQuota(1000000) - expect(quota.isQuotaExceeded()).andReturn(true).once() - expect(quota.isQuotaExceeded()).andReturn(true).once() + expect(quota.isQuotaExceeded).andReturn(true).once() + expect(quota.isQuotaExceeded).andReturn(true).once() replay(quota) val fetch = replicaManager.readFromLocalLog( @@ -103,8 +103,8 @@ class ReplicaManagerQuotasTest { val followerReplicaId = configs.last.brokerId val quota = mockQuota(1000000) - expect(quota.isQuotaExceeded()).andReturn(false).once() - expect(quota.isQuotaExceeded()).andReturn(false).once() + expect(quota.isQuotaExceeded).andReturn(false).once() + expect(quota.isQuotaExceeded).andReturn(false).once() replay(quota) val fetch = replicaManager.readFromLocalLog( @@ -128,8 +128,8 @@ class ReplicaManagerQuotasTest { val followerReplicaId = configs.last.brokerId val quota = mockQuota(1000000) - expect(quota.isQuotaExceeded()).andReturn(false).once() - expect(quota.isQuotaExceeded()).andReturn(true).once() + expect(quota.isQuotaExceeded).andReturn(false).once() + expect(quota.isQuotaExceeded).andReturn(true).once() replay(quota) val fetch = replicaManager.readFromLocalLog( diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala index 54b506d768a6e..b1edd010020d2 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala @@ -50,7 +50,7 @@ class ReplicationQuotaManagerTest { quota.updateQuota(new Quota(100, true)) //Quota should not be broken when we start - assertFalse(quota.isQuotaExceeded()) + assertFalse(quota.isQuotaExceeded) //First window is fixed, so we'll skip it time.sleep(1000) @@ -60,24 +60,24 @@ class ReplicationQuotaManagerTest { quota.record(1) //Then it should not break the quota - assertFalse(quota.isQuotaExceeded()) + assertFalse(quota.isQuotaExceeded) //When we record half the quota (half way through the window), we still should not break quota.record(149) //150B, 1.5s - assertFalse(quota.isQuotaExceeded()) + assertFalse(quota.isQuotaExceeded) //Add a byte to push over quota quota.record(1) //151B, 1.5s //Then it should break the quota assertEquals(151 / 1.5, rate(metrics), 0) //151B, 1.5s - assertTrue(quota.isQuotaExceeded()) + assertTrue(quota.isQuotaExceeded) //When we sleep for the remaining half the window time.sleep(500) //151B, 2s //Then Our rate should have halved (i.e back down below the quota) - assertFalse(quota.isQuotaExceeded()) + assertFalse(quota.isQuotaExceeded) assertEquals(151d / 2, rate(metrics), 0.1) //151B, 2s //When we sleep for another half a window (now half way through second window) @@ -86,14 +86,14 @@ class ReplicationQuotaManagerTest { //Then the rate should be exceeded again assertEquals(250 / 2.5, rate(metrics), 0) //250B, 2.5s - assertFalse(quota.isQuotaExceeded()) + assertFalse(quota.isQuotaExceeded) quota.record(1) - assertTrue(quota.isQuotaExceeded()) + assertTrue(quota.isQuotaExceeded) assertEquals(251 / 2.5, rate(metrics), 0) //Sleep for 2 more window time.sleep(2 * 1000) //so now at 3.5s - assertFalse(quota.isQuotaExceeded()) + assertFalse(quota.isQuotaExceeded) assertEquals(251d / 4.5, rate(metrics), 0) }