diff --git a/build.gradle b/build.gradle
index b535dd39a6933..26fafd5b3842e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1071,6 +1071,7 @@ project(':metadata') {
dependencies {
implementation project(':clients')
+ implementation project(':raft')
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
implementation libs.metrics
@@ -1268,7 +1269,6 @@ project(':raft') {
dependencies {
implementation project(':clients')
- implementation project(':metadata')
implementation libs.slf4jApi
implementation libs.jacksonDatabind
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e7e2e4dca414f..51a72ebb6ab1e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -220,6 +220,7 @@
+
@@ -231,6 +232,8 @@
+
+
@@ -239,6 +242,8 @@
+
+
@@ -282,7 +287,6 @@
-
@@ -406,7 +410,6 @@
-
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index ee06a95a78ab4..7a25f2e5ee81b 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
-import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest, RecordSerde}
+import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, LeaderAndEpoch, RaftClient, RaftConfig, RaftRequest, RecordSerde}
import scala.jdk.CollectionConverters._
@@ -101,6 +101,10 @@ trait RaftManager[T] {
epoch: Int,
records: Seq[T]
): Option[Long]
+
+ def leaderAndEpoch: LeaderAndEpoch
+
+ def client: RaftClient[T]
}
class KafkaRaftManager[T](
@@ -126,10 +130,10 @@ class KafkaRaftManager[T](
private val dataDir = createDataDir()
private val metadataLog = buildMetadataLog()
private val netChannel = buildNetworkChannel()
- private val raftClient = buildRaftClient()
- private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix)
+ val client: KafkaRaftClient[T] = buildRaftClient()
+ private val raftIoThread = new RaftIoThread(client, threadNamePrefix)
- def kafkaRaftClient: KafkaRaftClient[T] = raftClient
+ def kafkaRaftClient: KafkaRaftClient[T] = client
def startup(): Unit = {
// Update the voter endpoints (if valid) with what's in RaftConfig
@@ -152,7 +156,7 @@ class KafkaRaftManager[T](
def shutdown(): Unit = {
raftIoThread.shutdown()
- raftClient.close()
+ client.close()
scheduler.shutdown()
netChannel.close()
metadataLog.close()
@@ -161,7 +165,7 @@ class KafkaRaftManager[T](
override def register(
listener: RaftClient.Listener[T]
): Unit = {
- raftClient.register(listener)
+ client.register(listener)
}
override def scheduleAtomicAppend(
@@ -184,9 +188,9 @@ class KafkaRaftManager[T](
isAtomic: Boolean
): Option[Long] = {
val offset = if (isAtomic) {
- raftClient.scheduleAtomicAppend(epoch, records.asJava)
+ client.scheduleAtomicAppend(epoch, records.asJava)
} else {
- raftClient.scheduleAppend(epoch, records.asJava)
+ client.scheduleAppend(epoch, records.asJava)
}
Option(offset).map(Long.unbox)
@@ -203,7 +207,7 @@ class KafkaRaftManager[T](
createdTimeMs
)
- raftClient.handle(inboundRequest)
+ client.handle(inboundRequest)
inboundRequest.completion.thenApply { response =>
response.data
@@ -308,4 +312,7 @@ class KafkaRaftManager[T](
)
}
+ override def leaderAndEpoch: LeaderAndEpoch = {
+ client.leaderAndEpoch
+ }
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 49edbfe295068..00fd390f91efd 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -18,9 +18,9 @@
package kafka.server
import java.util
-import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
import java.net.InetAddress
import kafka.cluster.Broker.ServerInfo
@@ -29,6 +29,7 @@ import kafka.coordinator.transaction.{ProducerIdGenerator, TransactionCoordinato
import kafka.log.LogManager
import kafka.metrics.KafkaYammerMetrics
import kafka.network.SocketServer
+import kafka.raft.RaftManager
import kafka.security.CredentialProvider
import kafka.server.metadata.{BrokerMetadataListener, CachedConfigRepository, ClientQuotaCache, ClientQuotaMetadataManager, RaftMetadataCache}
import kafka.utils.{CoreUtils, KafkaScheduler}
@@ -42,8 +43,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
-import org.apache.kafka.metadata.{BrokerState, VersionRange}
-import org.apache.kafka.metalog.MetaLogManager
+import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerState, VersionRange}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
@@ -55,16 +55,16 @@ import scala.jdk.CollectionConverters._
* A Kafka broker that runs in KRaft (Kafka Raft) mode.
*/
class BrokerServer(
- val config: KafkaConfig,
- val metaProps: MetaProperties,
- val metaLogManager: MetaLogManager,
- val time: Time,
- val metrics: Metrics,
- val threadNamePrefix: Option[String],
- val initialOfflineDirs: Seq[String],
- val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
- val supportedFeatures: util.Map[String, VersionRange]
- ) extends KafkaBroker {
+ val config: KafkaConfig,
+ val metaProps: MetaProperties,
+ val raftManager: RaftManager[ApiMessageAndVersion],
+ val time: Time,
+ val metrics: Metrics,
+ val threadNamePrefix: Option[String],
+ val initialOfflineDirs: Seq[String],
+ val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
+ val supportedFeatures: util.Map[String, VersionRange]
+) extends KafkaBroker {
import kafka.server.Server._
@@ -181,7 +181,7 @@ class BrokerServer(
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
- val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
+ val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
clientToControllerChannelManager = BrokerToControllerChannelManager(
controllerNodeProvider,
@@ -284,7 +284,7 @@ class BrokerServer(
metaProps.clusterId, networkListeners, supportedFeatures)
// Register a listener with the Raft layer to receive metadata event notifications
- metaLogManager.register(brokerMetadataListener)
+ raftManager.register(brokerMetadataListener)
val endpoints = new util.ArrayList[Endpoint](networkListeners.size())
var interBrokerListener: Endpoint = null
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 5834a17942286..d3ae104fd563f 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.atomic.AtomicReference
import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import kafka.raft.RaftManager
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.common.Node
@@ -31,9 +32,10 @@ import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.metalog.MetaLogManager
+import org.apache.kafka.metadata.ApiMessageAndVersion
import scala.collection.Seq
+import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
trait ControllerNodeProvider {
@@ -77,15 +79,14 @@ class MetadataCacheControllerNodeProvider(
}
object RaftControllerNodeProvider {
- def apply(metaLogManager: MetaLogManager,
+ def apply(raftManager: RaftManager[ApiMessageAndVersion],
config: KafkaConfig,
controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = {
-
val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
val controllerSaslMechanism = config.saslMechanismControllerProtocol
new RaftControllerNodeProvider(
- metaLogManager,
+ raftManager,
controllerQuorumVoterNodes,
controllerListenerName,
controllerSecurityProtocol,
@@ -98,7 +99,7 @@ object RaftControllerNodeProvider {
* Finds the controller node by checking the metadata log manager.
* This provider is used when we are using a Raft-based metadata quorum.
*/
-class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
+class RaftControllerNodeProvider(val raftManager: RaftManager[ApiMessageAndVersion],
controllerQuorumVoterNodes: Seq[Node],
val listenerName: ListenerName,
val securityProtocol: SecurityProtocol,
@@ -107,14 +108,7 @@ class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap
override def get(): Option[Node] = {
- val leader = metaLogManager.leader()
- if (leader == null) {
- None
- } else if (leader.nodeId() < 0) {
- None
- } else {
- idToNode.get(leader.nodeId())
- }
+ raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode)
}
}
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 6a2844af8a25d..f3fe22bf8d4ab 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -17,9 +17,9 @@
package kafka.server
-import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util
import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.{CompletableFuture, TimeUnit}
import kafka.cluster.Broker.ServerInfo
import kafka.log.LogConfig
@@ -38,7 +38,6 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics}
import org.apache.kafka.metadata.{ApiMessageAndVersion, VersionRange}
-import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
@@ -49,15 +48,14 @@ import scala.jdk.CollectionConverters._
* A Kafka controller that runs in KRaft (Kafka Raft) mode.
*/
class ControllerServer(
- val metaProperties: MetaProperties,
- val config: KafkaConfig,
- val metaLogManager: MetaLogManager,
- val raftManager: RaftManager[ApiMessageAndVersion],
- val time: Time,
- val metrics: Metrics,
- val threadNamePrefix: Option[String],
- val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
- ) extends Logging with KafkaMetricsGroup {
+ val metaProperties: MetaProperties,
+ val config: KafkaConfig,
+ val raftManager: RaftManager[ApiMessageAndVersion],
+ val time: Time,
+ val metrics: Metrics,
+ val threadNamePrefix: Option[String],
+ val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
+) extends Logging with KafkaMetricsGroup {
import kafka.server.Server._
val lock = new ReentrantLock()
@@ -148,7 +146,7 @@ class ControllerServer(
setTime(time).
setThreadNamePrefix(threadNamePrefixAsString).
setConfigDefs(configDefs).
- setLogManager(metaLogManager).
+ setRaftClient(raftManager.client).
setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
setDefaultNumPartitions(config.numPartitions.intValue()).
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index e1fc81f09ab8e..8087c37cd672b 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -27,9 +27,8 @@ import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.utils.{AppInfoParser, Time}
-import org.apache.kafka.metadata.ApiMessageAndVersion
+import org.apache.kafka.metadata.{ApiMessageAndVersion, MetadataRecordSerde}
import org.apache.kafka.raft.RaftConfig
-import org.apache.kafka.raft.metadata.{MetaLogRaftShim, MetadataRecordSerde}
import scala.collection.Seq
@@ -56,7 +55,7 @@ class KafkaRaftServer(
private val metrics = Server.initializeMetrics(
config,
time,
- metaProps.clusterId.toString
+ metaProps.clusterId
)
private val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
@@ -74,13 +73,11 @@ class KafkaRaftServer(
controllerQuorumVotersFuture
)
- private val metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient, config.nodeId)
-
private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) {
Some(new BrokerServer(
config,
metaProps,
- metaLogShim,
+ raftManager,
time,
metrics,
threadNamePrefix,
@@ -96,7 +93,6 @@ class KafkaRaftServer(
Some(new ControllerServer(
metaProps,
config,
- metaLogShim,
raftManager,
time,
metrics,
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 8d07f8ea9fc0f..053f28d904416 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -16,8 +16,8 @@
*/
package kafka.server.metadata
-import java.util
import java.util.concurrent.TimeUnit
+
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.metrics.KafkaMetricsGroup
@@ -27,9 +27,11 @@ import org.apache.kafka.common.metadata.MetadataRecordType._
import org.apache.kafka.common.metadata._
import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.metalog.{MetaLogLeader, MetaLogListener}
+import org.apache.kafka.metadata.ApiMessageAndVersion
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.raft.{BatchReader, LeaderAndEpoch, RaftClient}
+import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
object BrokerMetadataListener{
@@ -37,16 +39,17 @@ object BrokerMetadataListener{
val MetadataBatchSizes = "MetadataBatchSizes"
}
-class BrokerMetadataListener(brokerId: Int,
- time: Time,
- metadataCache: RaftMetadataCache,
- configRepository: CachedConfigRepository,
- groupCoordinator: GroupCoordinator,
- replicaManager: RaftReplicaManager,
- txnCoordinator: TransactionCoordinator,
- threadNamePrefix: Option[String],
- clientQuotaManager: ClientQuotaMetadataManager
- ) extends MetaLogListener with KafkaMetricsGroup {
+class BrokerMetadataListener(
+ brokerId: Int,
+ time: Time,
+ metadataCache: RaftMetadataCache,
+ configRepository: CachedConfigRepository,
+ groupCoordinator: GroupCoordinator,
+ replicaManager: RaftReplicaManager,
+ txnCoordinator: TransactionCoordinator,
+ threadNamePrefix: Option[String],
+ clientQuotaManager: ClientQuotaMetadataManager
+) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
private val logContext = new LogContext(s"[BrokerMetadataListener id=${brokerId}] ")
private val log = logContext.logger(classOf[BrokerMetadataListener])
logIdent = logContext.logPrefix()
@@ -73,21 +76,33 @@ class BrokerMetadataListener(brokerId: Int,
/**
* Handle new metadata records.
*/
- override def handleCommits(lastOffset: Long, records: util.List[ApiMessage]): Unit = {
- eventQueue.append(new HandleCommitsEvent(lastOffset, records))
+ override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit = {
+ eventQueue.append(new HandleCommitsEvent(reader))
}
- // Visible for testing. It's useful to execute events synchronously
- private[metadata] def execCommits(lastOffset: Long, records: util.List[ApiMessage]): Unit = {
- new HandleCommitsEvent(lastOffset, records).run()
+ // Visible for testing. It's useful to execute events synchronously in order
+ // to make tests deterministic
+ private[metadata] def execCommits(batch: BatchReader.Batch[ApiMessageAndVersion]): Unit = {
+ new HandleCommitsEvent(BatchReader.singleton(batch)).run()
}
- class HandleCommitsEvent(lastOffset: Long,
- records: util.List[ApiMessage])
- extends EventQueue.FailureLoggingEvent(log) {
+ class HandleCommitsEvent(
+ reader: BatchReader[ApiMessageAndVersion]
+ ) extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
+ try {
+ apply(reader.next())
+ } finally {
+ reader.close()
+ }
+ }
+
+ private def apply(batch: BatchReader.Batch[ApiMessageAndVersion]): Unit = {
+ val records = batch.records
+ val lastOffset = batch.lastOffset
+
if (isDebugEnabled) {
- debug(s"Metadata batch ${lastOffset}: handling ${records.size()} record(s).")
+ debug(s"Metadata batch $lastOffset: handling ${records.size()} record(s).")
}
val imageBuilder =
MetadataImageBuilder(brokerId, log, metadataCache.currentImage())
@@ -100,37 +115,37 @@ class BrokerMetadataListener(brokerId: Int,
trace("Metadata batch %d: processing [%d/%d]: %s.".format(lastOffset, index + 1,
records.size(), record.toString))
}
- handleMessage(imageBuilder, record, lastOffset)
+ handleMessage(imageBuilder, record.message, lastOffset)
} catch {
- case e: Exception => error(s"Unable to handle record ${index} in batch " +
- s"ending at offset ${lastOffset}", e)
+ case e: Exception => error(s"Unable to handle record $index in batch " +
+ s"ending at offset $lastOffset", e)
}
index = index + 1
}
if (imageBuilder.hasChanges) {
val newImage = imageBuilder.build()
if (isTraceEnabled) {
- trace(s"Metadata batch ${lastOffset}: creating new metadata image ${newImage}")
+ trace(s"Metadata batch $lastOffset: creating new metadata image ${newImage}")
} else if (isDebugEnabled) {
- debug(s"Metadata batch ${lastOffset}: creating new metadata image")
+ debug(s"Metadata batch $lastOffset: creating new metadata image")
}
metadataCache.image(newImage)
} else if (isDebugEnabled) {
- debug(s"Metadata batch ${lastOffset}: no new metadata image required.")
+ debug(s"Metadata batch $lastOffset: no new metadata image required.")
}
if (imageBuilder.hasPartitionChanges) {
if (isDebugEnabled) {
- debug(s"Metadata batch ${lastOffset}: applying partition changes")
+ debug(s"Metadata batch $lastOffset: applying partition changes")
}
replicaManager.handleMetadataRecords(imageBuilder, lastOffset,
RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))
} else if (isDebugEnabled) {
- debug(s"Metadata batch ${lastOffset}: no partition changes found.")
+ debug(s"Metadata batch $lastOffset: no partition changes found.")
}
_highestMetadataOffset = lastOffset
val endNs = time.nanoseconds()
val deltaUs = TimeUnit.MICROSECONDS.convert(endNs - startNs, TimeUnit.NANOSECONDS)
- debug(s"Metadata batch ${lastOffset}: advanced highest metadata offset in ${deltaUs} " +
+ debug(s"Metadata batch $lastOffset: advanced highest metadata offset in ${deltaUs} " +
"microseconds.")
batchProcessingTimeHist.update(deltaUs)
}
@@ -234,22 +249,20 @@ class BrokerMetadataListener(brokerId: Int,
clientQuotaManager.handleQuotaRecord(record)
}
- class HandleNewLeaderEvent(leader: MetaLogLeader)
+ class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch)
extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
val imageBuilder =
MetadataImageBuilder(brokerId, log, metadataCache.currentImage())
- if (leader.nodeId() < 0) {
- imageBuilder.controllerId(None)
- } else {
- imageBuilder.controllerId(Some(leader.nodeId()))
- }
+ imageBuilder.controllerId(leaderAndEpoch.leaderId.asScala)
metadataCache.image(imageBuilder.build())
}
}
- override def handleNewLeader(leader: MetaLogLeader): Unit = {
- eventQueue.append(new HandleNewLeaderEvent(leader))
+ override def handleLeaderChange(leader: LeaderAndEpoch): Unit = {
+ if (leader.isLeader(brokerId)) {
+ eventQueue.append(new HandleNewLeaderEvent(leader))
+ }
}
class ShutdownEvent() extends EventQueue.FailureLoggingEvent(log) {
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 75796ad13f57c..c495dbca5fc79 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordT
import org.apache.kafka.common.protocol.ByteBufferAccessor
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.raft.metadata.MetadataRecordSerde
+import org.apache.kafka.metadata.MetadataRecordSerde
import scala.jdk.CollectionConverters._
import scala.collection.mutable
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index c0c1c65df349c..432d52bcf9aa9 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -19,6 +19,7 @@ package kafka.tools
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit}
+
import joptsimple.OptionException
import kafka.network.SocketServer
import kafka.raft.{KafkaRaftManager, RaftManager}
@@ -36,7 +37,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.BatchReader.Batch
-import org.apache.kafka.raft.{BatchReader, RaftClient, RaftConfig, RecordSerde}
+import org.apache.kafka.raft.{BatchReader, LeaderAndEpoch, RaftClient, RaftConfig, RecordSerde}
import scala.jdk.CollectionConverters._
@@ -163,12 +164,12 @@ class TestRaftServer(
raftManager.register(this)
- override def handleClaim(epoch: Int): Unit = {
- eventQueue.offer(HandleClaim(epoch))
- }
-
- override def handleResign(epoch: Int): Unit = {
- eventQueue.offer(HandleResign)
+ override def handleLeaderChange(newLeaderAndEpoch: LeaderAndEpoch): Unit = {
+ if (newLeaderAndEpoch.isLeader(config.nodeId)) {
+ eventQueue.offer(HandleClaim(newLeaderAndEpoch.epoch))
+ } else if (claimedEpoch.isDefined) {
+ eventQueue.offer(HandleResign)
+ }
}
override def handleCommit(reader: BatchReader[Array[Byte]]): Unit = {
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 77e6a54fff8b9..3fe3608d9027e 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -37,10 +37,8 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.metadata.ApiMessageAndVersion;
-import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.raft.RaftConfig;
-import org.apache.kafka.raft.metadata.MetaLogRaftShim;
-import org.apache.kafka.raft.metadata.MetadataRecordSerde;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -175,11 +173,9 @@ public KafkaClusterTestKit build() throws Exception {
KafkaRaftManager raftManager = new KafkaRaftManager<>(
metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(),
Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
- MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
ControllerServer controller = new ControllerServer(
nodes.controllerProperties(node.id()),
config,
- metaLogShim,
raftManager,
Time.SYSTEM,
new Metrics(),
@@ -228,11 +224,10 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS
KafkaRaftManager raftManager = new KafkaRaftManager<>(
metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(),
Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
- MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
BrokerServer broker = new BrokerServer(
config,
nodes.brokerProperties(node.id()),
- metaLogShim,
+ raftManager,
Time.SYSTEM,
new Metrics(),
Option.apply(threadNamePrefix),
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 545fe48afa544..b3558c51ccd17 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -25,9 +25,10 @@ import kafka.server.RaftReplicaManager
import kafka.utils.Implicits._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.metadata.{ConfigRecord, PartitionRecord, RemoveTopicRecord, TopicRecord}
-import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.metadata.ApiMessageAndVersion
+import org.apache.kafka.raft.BatchReader
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers._
@@ -39,6 +40,7 @@ import scala.jdk.CollectionConverters._
class BrokerMetadataListenerTest {
+ private val leaderEpoch = 5
private val brokerId = 1
private val time = new MockTime()
private val configRepository = new CachedConfigRepository
@@ -82,11 +84,10 @@ class BrokerMetadataListenerTest {
): Unit = {
val deleteRecord = new RemoveTopicRecord()
.setTopicId(topicId)
- lastMetadataOffset += 1
- listener.execCommits(lastOffset = lastMetadataOffset, List[ApiMessage](
- deleteRecord,
- ).asJava)
+ applyBatch(List[ApiMessageAndVersion](
+ new ApiMessageAndVersion(deleteRecord, 0.toShort),
+ ))
assertFalse(metadataCache.contains(topic))
assertEquals(new Properties, configRepository.topicConfig(topic))
@@ -108,6 +109,18 @@ class BrokerMetadataListenerTest {
assertEquals(localPartitions, localRemoved.map(_.toTopicPartition).toSet)
}
+ private def applyBatch(
+ records: List[ApiMessageAndVersion]
+ ): Unit = {
+ val baseOffset = lastMetadataOffset + 1
+ lastMetadataOffset += records.size
+ listener.execCommits(new BatchReader.Batch(
+ baseOffset,
+ leaderEpoch,
+ records.asJava
+ ))
+ }
+
private def createAndAssert(
topicId: Uuid,
topic: String,
@@ -115,11 +128,10 @@ class BrokerMetadataListenerTest {
numPartitions: Int,
numBrokers: Int
): Set[TopicPartition] = {
- val records = new java.util.ArrayList[ApiMessage]
- records.add(new TopicRecord()
+ val records = mutable.ListBuffer.empty[ApiMessageAndVersion]
+ records += new ApiMessageAndVersion(new TopicRecord()
.setName(topic)
- .setTopicId(topicId)
- )
+ .setTopicId(topicId), 0)
val localTopicPartitions = mutable.Set.empty[TopicPartition]
(0 until numPartitions).map { partitionId =>
@@ -134,28 +146,25 @@ class BrokerMetadataListenerTest {
localTopicPartitions.add(new TopicPartition(topic, partitionId))
}
- records.add(new PartitionRecord()
+ records += new ApiMessageAndVersion(new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(partitionId)
.setLeader(preferredLeaderId)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
.setReplicas(replicas)
- .setIsr(replicas)
- )
+ .setIsr(replicas), 0)
}
topicConfig.forKeyValue { (key, value) =>
- records.add(new ConfigRecord()
+ records += new ApiMessageAndVersion(new ConfigRecord()
.setResourceName(topic)
.setResourceType(ConfigResource.Type.TOPIC.id())
.setName(key)
- .setValue(value)
- )
+ .setValue(value), 0)
}
- lastMetadataOffset += records.size()
- listener.execCommits(lastOffset = lastMetadataOffset, records)
+ applyBatch(records.toList)
assertTrue(metadataCache.contains(topic))
assertEquals(Some(numPartitions), metadataCache.numPartitions(topic))
assertEquals(topicConfig, configRepository.topicConfig(topic).asScala)
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 256262a99b4e0..36283360801e1 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -31,8 +31,7 @@ import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRe
import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.metadata.ApiMessageAndVersion
-import org.apache.kafka.raft.metadata.MetadataRecordSerde
+import org.apache.kafka.metadata.{ApiMessageAndVersion, MetadataRecordSerde}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 86faa5ede8e6c..16cf5a2376475 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -17,21 +17,6 @@
package org.apache.kafka.controller;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Map;
-import java.util.Optional;
-import java.util.OptionalLong;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigDef;
@@ -79,15 +64,32 @@
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.VersionRange;
-import org.apache.kafka.metalog.MetaLogLeader;
-import org.apache.kafka.metalog.MetaLogListener;
-import org.apache.kafka.metalog.MetaLogManager;
-import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -118,7 +120,7 @@ static public class Builder {
private String threadNamePrefix = null;
private LogContext logContext = null;
private Map configDefs = Collections.emptyMap();
- private MetaLogManager logManager = null;
+ private RaftClient raftClient = null;
private Map supportedFeatures = Collections.emptyMap();
private short defaultReplicationFactor = 3;
private int defaultNumPartitions = 1;
@@ -153,8 +155,8 @@ public Builder setConfigDefs(Map configDefs) {
return this;
}
- public Builder setLogManager(MetaLogManager logManager) {
- this.logManager = logManager;
+ public Builder setRaftClient(RaftClient logManager) {
+ this.raftClient = logManager;
return this;
}
@@ -200,7 +202,7 @@ public Builder setMetrics(ControllerMetrics controllerMetrics) {
@SuppressWarnings("unchecked")
public QuorumController build() throws Exception {
- if (logManager == null) {
+ if (raftClient == null) {
throw new RuntimeException("You must set a metadata log manager.");
}
if (threadNamePrefix == null) {
@@ -223,7 +225,7 @@ public QuorumController build() throws Exception {
try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
return new QuorumController(logContext, nodeId, queue, time, configDefs,
- logManager, supportedFeatures, defaultReplicationFactor,
+ raftClient, supportedFeatures, defaultReplicationFactor,
defaultNumPartitions, replicaPlacementPolicy, snapshotWriterBuilder,
snapshotReader, sessionTimeoutNs, controllerMetrics);
} catch (Exception e) {
@@ -239,12 +241,12 @@ public QuorumController build() throws Exception {
"The active controller appears to be node ";
private NotControllerException newNotControllerException() {
- int latestController = logManager.leader().nodeId();
- if (latestController < 0) {
- return new NotControllerException("No controller appears to be active.");
- } else {
+ OptionalInt latestController = raftClient.leaderAndEpoch().leaderId;
+ if (latestController.isPresent()) {
return new NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX +
- latestController);
+ latestController.getAsInt());
+ } else {
+ return new NotControllerException("No controller appears to be active.");
}
}
@@ -529,7 +531,7 @@ CompletableFuture future() {
public void run() throws Exception {
long now = time.nanoseconds();
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
- long controllerEpoch = curClaimEpoch;
+ int controllerEpoch = curClaimEpoch;
if (controllerEpoch == -1) {
throw newNotControllerException();
}
@@ -562,9 +564,9 @@ public void run() throws Exception {
// out asynchronously.
final long offset;
if (result.isAtomic()) {
- offset = logManager.scheduleAtomicWrite(controllerEpoch, result.records());
+ offset = raftClient.scheduleAtomicAppend(controllerEpoch, result.records());
} else {
- offset = logManager.scheduleWrite(controllerEpoch, result.records());
+ offset = raftClient.scheduleAppend(controllerEpoch, result.records());
}
op.processBatchEndOffset(offset);
writeOffset = offset;
@@ -617,50 +619,64 @@ private CompletableFuture appendWriteEvent(String name,
return event.future();
}
- class QuorumMetaLogListener implements MetaLogListener {
+ class QuorumMetaLogListener implements RaftClient.Listener {
+
@Override
- public void handleCommits(long offset, List messages) {
- appendControlEvent("handleCommits[" + offset + "]", () -> {
- if (curClaimEpoch == -1) {
- // If the controller is a standby, replay the records that were
- // created by the active controller.
- if (log.isDebugEnabled()) {
- if (log.isTraceEnabled()) {
- log.trace("Replaying commits from the active node up to " +
- "offset {}: {}.", offset, messages.stream().
- map(m -> m.toString()).collect(Collectors.joining(", ")));
+ public void handleCommit(BatchReader reader) {
+ appendControlEvent("handleCommits[baseOffset=" + reader.baseOffset() + "]", () -> {
+ try {
+ boolean isActiveController = curClaimEpoch != -1;
+ while (reader.hasNext()) {
+ BatchReader.Batch batch = reader.next();
+ long offset = batch.lastOffset();
+ List messages = batch.records();
+
+ if (isActiveController) {
+ // If the controller is active, the records were already replayed,
+ // so we don't need to do it here.
+ log.debug("Completing purgatory items up to offset {}.", offset);
+
+ // Complete any events in the purgatory that were waiting for this offset.
+ purgatory.completeUpTo(offset);
+
+ // Delete all the in-memory snapshots that we no longer need.
+ // If we are writing a new snapshot, then we need to keep that around;
+ // otherwise, we should delete up to the current committed offset.
+ snapshotRegistry.deleteSnapshotsUpTo(
+ Math.min(offset, snapshotGeneratorManager.snapshotEpoch()));
+
} else {
- log.debug("Replaying commits from the active node up to " +
- "offset {}.", offset);
+ // If the controller is a standby, replay the records that were
+ // created by the active controller.
+ if (log.isDebugEnabled()) {
+ if (log.isTraceEnabled()) {
+ log.trace("Replaying commits from the active node up to " +
+ "offset {}: {}.", offset, messages.stream()
+ .map(ApiMessageAndVersion::toString)
+ .collect(Collectors.joining(", ")));
+ } else {
+ log.debug("Replaying commits from the active node up to " +
+ "offset {}.", offset);
+ }
+ }
+ for (ApiMessageAndVersion messageAndVersion : messages) {
+ replay(messageAndVersion.message(), -1, offset);
+ }
}
+ lastCommittedOffset = offset;
}
- for (ApiMessage message : messages) {
- replay(message, -1, offset);
- }
- } else {
- // If the controller is active, the records were already replayed,
- // so we don't need to do it here.
- log.debug("Completing purgatory items up to offset {}.", offset);
-
- // Complete any events in the purgatory that were waiting for this offset.
- purgatory.completeUpTo(offset);
-
- // Delete all the in-memory snapshots that we no longer need.
- // If we are writing a new snapshot, then we need to keep that around;
- // otherwise, we should delete up to the current committed offset.
- snapshotRegistry.deleteSnapshotsUpTo(
- Math.min(offset, snapshotGeneratorManager.snapshotEpoch()));
+ } finally {
+ reader.close();
}
- lastCommittedOffset = offset;
});
}
@Override
- public void handleNewLeader(MetaLogLeader newLeader) {
- if (newLeader.nodeId() == nodeId) {
- final long newEpoch = newLeader.epoch();
+ public void handleLeaderChange(LeaderAndEpoch newLeader) {
+ if (newLeader.isLeader(nodeId)) {
+ final int newEpoch = newLeader.epoch;
appendControlEvent("handleClaim[" + newEpoch + "]", () -> {
- long curEpoch = curClaimEpoch;
+ int curEpoch = curClaimEpoch;
if (curEpoch != -1) {
throw new RuntimeException("Tried to claim controller epoch " +
newEpoch + ", but we never renounced controller epoch " +
@@ -672,19 +688,14 @@ public void handleNewLeader(MetaLogLeader newLeader) {
writeOffset = lastCommittedOffset;
clusterControl.activate();
});
- }
- }
-
- @Override
- public void handleRenounce(long oldEpoch) {
- appendControlEvent("handleRenounce[" + oldEpoch + "]", () -> {
- if (curClaimEpoch == oldEpoch) {
+ } else if (curClaimEpoch != -1) {
+ appendControlEvent("handleRenounce[" + curClaimEpoch + "]", () -> {
log.warn("Renouncing the leadership at oldEpoch {} due to a metadata " +
- "log event. Reverting to last committed offset {}.", curClaimEpoch,
- lastCommittedOffset);
+ "log event. Reverting to last committed offset {}.", curClaimEpoch,
+ lastCommittedOffset);
renounce();
- }
- });
+ });
+ }
}
@Override
@@ -872,7 +883,7 @@ private void replay(ApiMessage message, long snapshotEpoch, long offset) {
/**
* The interface that we use to mutate the Raft log.
*/
- private final MetaLogManager logManager;
+ private final RaftClient raftClient;
/**
* The interface that receives callbacks from the Raft log. These callbacks are
@@ -885,7 +896,7 @@ private void replay(ApiMessage message, long snapshotEpoch, long offset) {
* Otherwise, this is -1. This variable must be modified only from the controller
* thread, but it can be read from other threads.
*/
- private volatile long curClaimEpoch;
+ private volatile int curClaimEpoch;
/**
* The last offset we have committed, or -1 if we have not committed any offsets.
@@ -902,7 +913,7 @@ private QuorumController(LogContext logContext,
KafkaEventQueue queue,
Time time,
Map configDefs,
- MetaLogManager logManager,
+ RaftClient raftClient,
Map supportedFeatures,
short defaultReplicationFactor,
int defaultNumPartitions,
@@ -910,7 +921,7 @@ private QuorumController(LogContext logContext,
Function snapshotWriterBuilder,
SnapshotReader snapshotReader,
long sessionTimeoutNs,
- ControllerMetrics controllerMetrics) throws Exception {
+ ControllerMetrics controllerMetrics) {
this.logContext = logContext;
this.log = logContext.logger(QuorumController.class);
this.nodeId = nodeId;
@@ -929,9 +940,9 @@ private QuorumController(LogContext logContext,
this.replicationControl = new ReplicationControlManager(snapshotRegistry,
logContext, defaultReplicationFactor, defaultNumPartitions,
configurationControl, clusterControl);
- this.logManager = logManager;
+ this.raftClient = raftClient;
this.metaLogListener = new QuorumMetaLogListener();
- this.curClaimEpoch = -1L;
+ this.curClaimEpoch = -1;
this.lastCommittedOffset = snapshotReader.epoch();
this.writeOffset = -1L;
@@ -943,7 +954,7 @@ private QuorumController(LogContext logContext,
}
}
snapshotRegistry.createSnapshot(lastCommittedOffset);
- this.logManager.register(metaLogListener);
+ this.raftClient.register(metaLogListener);
}
@Override
diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java b/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
similarity index 96%
rename from raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java
rename to metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
index c740497fb317e..72def41a3f3a9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.raft.metadata;
+package org.apache.kafka.metadata;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.metadata.MetadataRecordType;
@@ -23,7 +23,6 @@
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.protocol.Writable;
import org.apache.kafka.common.utils.ByteUtils;
-import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.raft.RecordSerde;
public class MetadataRecordSerde implements RecordSerde {
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogLeader.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogLeader.java
deleted file mode 100644
index 2bf4f7c718bd5..0000000000000
--- a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogLeader.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metalog;
-
-import java.util.Objects;
-
-/**
- * The current leader of the MetaLog.
- */
-public class MetaLogLeader {
- private final int nodeId;
- private final long epoch;
-
- public MetaLogLeader(int nodeId, long epoch) {
- this.nodeId = nodeId;
- this.epoch = epoch;
- }
-
- public int nodeId() {
- return nodeId;
- }
-
- public long epoch() {
- return epoch;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof MetaLogLeader)) return false;
- MetaLogLeader other = (MetaLogLeader) o;
- return other.nodeId == nodeId && other.epoch == epoch;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(nodeId, epoch);
- }
-
- @Override
- public String toString() {
- return "MetaLogLeader(nodeId=" + nodeId + ", epoch=" + epoch + ")";
- }
-}
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogListener.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogListener.java
deleted file mode 100644
index 93744202dc90f..0000000000000
--- a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogListener.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metalog;
-
-import org.apache.kafka.common.protocol.ApiMessage;
-
-import java.util.List;
-
-/**
- * Listeners receive notifications from the MetaLogManager.
- */
-public interface MetaLogListener {
- /**
- * Called when the MetaLogManager commits some messages.
- *
- * @param lastOffset The last offset found in all the given messages.
- * @param messages The messages.
- */
- void handleCommits(long lastOffset, List messages);
-
- /**
- * Called when a new leader is elected.
- *
- * @param leader The new leader id and epoch.
- */
- default void handleNewLeader(MetaLogLeader leader) {}
-
- /**
- * Called when the MetaLogManager has renounced the leadership.
- *
- * @param epoch The controller epoch that has ended.
- */
- default void handleRenounce(long epoch) {}
-
- /**
- * Called when the MetaLogManager has finished shutting down, and wants to tell its
- * listener that it is safe to shut down as well.
- */
- default void beginShutdown() {}
-}
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
deleted file mode 100644
index 9126245ef3855..0000000000000
--- a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metalog;
-
-import org.apache.kafka.metadata.ApiMessageAndVersion;
-
-import java.util.List;
-
-/**
- * The MetaLogManager handles storing metadata and electing leaders.
- */
-public interface MetaLogManager {
-
- /**
- * Start this meta log manager.
- * The manager must be ready to accept incoming calls after this function returns.
- * It is an error to initialize a MetaLogManager more than once.
- */
- void initialize() throws Exception;
-
- /**
- * Register the listener. The manager must be initialized already.
- * The listener must be ready to accept incoming calls immediately.
- *
- * @param listener The listener to register.
- */
- void register(MetaLogListener listener) throws Exception;
-
- /**
- * Schedule a write to the log.
- *
- * The write will be scheduled to happen at some time in the future. There is no
- * error return or exception thrown if the write fails. Instead, the listener may
- * regard the write as successful if and only if the MetaLogManager reaches the given
- * offset before renouncing its leadership. The listener should determine this by
- * monitoring the committed offsets.
- *
- * @param epoch the controller epoch
- * @param batch the batch of messages to write
- *
- * @return the offset of the last message in the batch
- * @throws IllegalArgumentException if buffer allocatio failed and the client should backoff
- */
- long scheduleWrite(long epoch, List batch);
-
- /**
- * Schedule a atomic write to the log.
- *
- * The write will be scheduled to happen at some time in the future. All of the messages in batch
- * will be appended atomically in one batch. The listener may regard the write as successful
- * if and only if the MetaLogManager reaches the given offset before renouncing its leadership.
- * The listener should determine this by monitoring the committed offsets.
- *
- * @param epoch the controller epoch
- * @param batch the batch of messages to write
- *
- * @return the offset of the last message in the batch
- * @throws IllegalArgumentException if buffer allocatio failed and the client should backoff
- */
- long scheduleAtomicWrite(long epoch, List batch);
-
- /**
- * Renounce the leadership.
- *
- * @param epoch The epoch. If this does not match the current epoch, this
- * call will be ignored.
- */
- void renounce(long epoch);
-
- /**
- * Returns the current leader. The active node may change immediately after this
- * function is called, of course.
- */
- MetaLogLeader leader();
-
- /**
- * Returns the node id.
- */
- int nodeId();
-
-}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 99270422fcf2c..db3acfba2a72f 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -41,7 +41,7 @@ public QuorumControllerTestEnv(LocalLogManagerTestEnv logEnv,
try {
for (int i = 0; i < numControllers; i++) {
QuorumController.Builder builder = new QuorumController.Builder(i);
- builder.setLogManager(logEnv.logManagers().get(i));
+ builder.setRaftClient(logEnv.logManagers().get(i));
builderConsumer.accept(builder);
this.controllers.add(builder.build());
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
similarity index 96%
rename from raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java
rename to metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
index 2071814ed9532..756c32d0e4823 100644
--- a/raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.raft.metadata;
+package org.apache.kafka.metadata;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.SerializationException;
@@ -22,7 +22,6 @@
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.utils.ByteUtils;
-import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index e58848eda4ed2..15551169acf54 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -17,12 +17,16 @@
package org.apache.kafka.metalog;
-import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.snapshot.SnapshotWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +37,7 @@
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.OptionalInt;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -43,15 +48,15 @@
/**
* The LocalLogManager is a test implementation that relies on the contents of memory.
*/
-public final class LocalLogManager implements MetaLogManager, AutoCloseable {
+public final class LocalLogManager implements RaftClient, AutoCloseable {
interface LocalBatch {
int size();
}
static class LeaderChangeBatch implements LocalBatch {
- private final MetaLogLeader newLeader;
+ private final LeaderAndEpoch newLeader;
- LeaderChangeBatch(MetaLogLeader newLeader) {
+ LeaderChangeBatch(LeaderAndEpoch newLeader) {
this.newLeader = newLeader;
}
@@ -80,9 +85,11 @@ public String toString() {
}
static class LocalRecordBatch implements LocalBatch {
- private final List records;
+ private final long leaderEpoch;
+ private final List records;
- LocalRecordBatch(List records) {
+ LocalRecordBatch(long leaderEpoch, List records) {
+ this.leaderEpoch = leaderEpoch;
this.records = records;
}
@@ -126,7 +133,7 @@ public static class SharedLogData {
/**
* The current leader.
*/
- private MetaLogLeader leader = new MetaLogLeader(-1, -1);
+ private LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0);
/**
* The start offset of the last batch that was created, or -1 if no batches have
@@ -135,7 +142,7 @@ public static class SharedLogData {
private long prevOffset = -1;
synchronized void registerLogManager(LocalLogManager logManager) {
- if (logManagers.put(logManager.nodeId(), logManager) != null) {
+ if (logManagers.put(logManager.nodeId, logManager) != null) {
throw new RuntimeException("Can't have multiple LocalLogManagers " +
"with id " + logManager.nodeId());
}
@@ -143,21 +150,21 @@ synchronized void registerLogManager(LocalLogManager logManager) {
}
synchronized void unregisterLogManager(LocalLogManager logManager) {
- if (!logManagers.remove(logManager.nodeId(), logManager)) {
+ if (!logManagers.remove(logManager.nodeId, logManager)) {
throw new RuntimeException("Log manager " + logManager.nodeId() +
" was not found.");
}
}
synchronized long tryAppend(int nodeId, long epoch, LocalBatch batch) {
- if (epoch != leader.epoch()) {
+ if (epoch != leader.epoch) {
log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch does not " +
- "match the current leader epoch of {}.", nodeId, epoch, leader.epoch());
+ "match the current leader epoch of {}.", nodeId, epoch, leader.epoch);
return Long.MAX_VALUE;
}
- if (nodeId != leader.nodeId()) {
+ if (!leader.isLeader(nodeId)) {
log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " +
- "match the current leader id of {}.", nodeId, epoch, leader.nodeId());
+ "match the current leader id of {}.", nodeId, epoch, leader.leaderId);
return Long.MAX_VALUE;
}
log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
@@ -181,7 +188,7 @@ synchronized long append(LocalBatch batch) {
}
synchronized void electLeaderIfNeeded() {
- if (leader.nodeId() != -1 || logManagers.isEmpty()) {
+ if (leader.leaderId.isPresent() || logManagers.isEmpty()) {
return;
}
int nextLeaderIndex = ThreadLocalRandom.current().nextInt(logManagers.size());
@@ -190,7 +197,7 @@ synchronized void electLeaderIfNeeded() {
for (int i = 0; i <= nextLeaderIndex; i++) {
nextLeaderNode = iter.next();
}
- MetaLogLeader newLeader = new MetaLogLeader(nextLeaderNode, leader.epoch() + 1);
+ LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(nextLeaderNode), leader.epoch + 1);
log.info("Elected new leader: {}.", newLeader);
append(new LeaderChangeBatch(newLeader));
}
@@ -206,9 +213,9 @@ synchronized Entry nextBatch(long offset) {
private static class MetaLogListenerData {
private long offset = -1;
- private final MetaLogListener listener;
+ private final RaftClient.Listener listener;
- MetaLogListenerData(MetaLogListener listener) {
+ MetaLogListenerData(RaftClient.Listener listener) {
this.listener = listener;
}
}
@@ -218,7 +225,7 @@ private static class MetaLogListenerData {
/**
* The node ID of this local log manager. Each log manager must have a unique ID.
*/
- private final int nodeId;
+ public final int nodeId;
/**
* A reference to the in-memory state that unites all the log managers in use.
@@ -254,7 +261,7 @@ private static class MetaLogListenerData {
/**
* The current leader, as seen by this log manager.
*/
- private volatile MetaLogLeader leader = new MetaLogLeader(-1, -1);
+ private volatile LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0);
public LocalLogManager(LogContext logContext,
int nodeId,
@@ -291,15 +298,19 @@ private void scheduleLogCheck() {
LeaderChangeBatch batch = (LeaderChangeBatch) entry.getValue();
log.trace("Node {}: handling LeaderChange to {}.",
nodeId, batch.newLeader);
- listenerData.listener.handleNewLeader(batch.newLeader);
- if (batch.newLeader.epoch() > leader.epoch()) {
+ listenerData.listener.handleLeaderChange(batch.newLeader);
+ if (batch.newLeader.epoch > leader.epoch) {
leader = batch.newLeader;
}
} else if (entry.getValue() instanceof LocalRecordBatch) {
LocalRecordBatch batch = (LocalRecordBatch) entry.getValue();
log.trace("Node {}: handling LocalRecordBatch with offset {}.",
nodeId, entryOffset);
- listenerData.listener.handleCommits(entryOffset, batch.records);
+ listenerData.listener.handleCommit(BatchReader.singleton(new BatchReader.Batch<>(
+ entryOffset - batch.records.size() + 1,
+ Math.toIntExact(batch.leaderEpoch),
+ batch.records
+ )));
}
numEntriesFound++;
listenerData.offset = entryOffset;
@@ -317,7 +328,7 @@ public void beginShutdown() {
try {
if (initialized && !shutdown) {
log.debug("Node {}: beginning shutdown.", nodeId);
- renounce(leader.epoch());
+ resign(leader.epoch);
for (MetaLogListenerData listenerData : listeners) {
listenerData.listener.beginShutdown();
}
@@ -331,14 +342,32 @@ public void beginShutdown() {
}
@Override
- public void close() throws InterruptedException {
+ public void close() {
log.debug("Node {}: closing.", nodeId);
beginShutdown();
- eventQueue.close();
+
+ try {
+ eventQueue.close();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture shutdown(int timeoutMs) {
+ CompletableFuture shutdownFuture = new CompletableFuture<>();
+ try {
+ close();
+ shutdownFuture.complete(null);
+ } catch (Throwable t) {
+ shutdownFuture.completeExceptionally(t);
+ }
+ return shutdownFuture;
}
@Override
- public void initialize() throws Exception {
+ public void initialize() {
eventQueue.append(() -> {
log.debug("initialized local log manager for node " + nodeId);
initialized = true;
@@ -346,7 +375,7 @@ public void initialize() throws Exception {
}
@Override
- public void register(MetaLogListener listener) throws Exception {
+ public void register(RaftClient.Listener listener) {
CompletableFuture future = new CompletableFuture<>();
eventQueue.append(() -> {
if (shutdown) {
@@ -366,47 +395,54 @@ public void register(MetaLogListener listener) throws Exception {
"LocalLogManager was not initialized."));
}
});
- future.get();
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
- public long scheduleWrite(long epoch, List batch) {
- return scheduleAtomicWrite(epoch, batch);
+ public Long scheduleAppend(int epoch, List batch) {
+ return scheduleAtomicAppend(epoch, batch);
}
@Override
- public long scheduleAtomicWrite(long epoch, List batch) {
+ public Long scheduleAtomicAppend(int epoch, List batch) {
return shared.tryAppend(
nodeId,
- leader.epoch(),
- new LocalRecordBatch(
- batch
- .stream()
- .map(ApiMessageAndVersion::message)
- .collect(Collectors.toList())
- )
+ leader.epoch,
+ new LocalRecordBatch(leader.epoch, batch)
);
}
@Override
- public void renounce(long epoch) {
- MetaLogLeader curLeader = leader;
- MetaLogLeader nextLeader = new MetaLogLeader(-1, curLeader.epoch() + 1);
- shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader));
+ public void resign(int epoch) {
+ LeaderAndEpoch curLeader = leader;
+ LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), curLeader.epoch + 1);
+ shared.tryAppend(nodeId, curLeader.epoch, new LeaderChangeBatch(nextLeader));
+ }
+
+ @Override
+ public SnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) {
+ throw new UnsupportedOperationException();
}
@Override
- public MetaLogLeader leader() {
+ public LeaderAndEpoch leaderAndEpoch() {
return leader;
}
@Override
- public int nodeId() {
- return nodeId;
+ public OptionalInt nodeId() {
+ return OptionalInt.of(nodeId);
}
- public List listeners() {
- final CompletableFuture> future = new CompletableFuture<>();
+ public List> listeners() {
+ final CompletableFuture>> future = new CompletableFuture<>();
eventQueue.append(() -> {
future.complete(listeners.stream().map(l -> l.listener).collect(Collectors.toList()));
});
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
index ac578fb635807..b10c747576e5e 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
@@ -19,14 +19,14 @@
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
+import java.util.OptionalInt;
import java.util.stream.Collectors;
import static org.apache.kafka.metalog.MockMetaLogManagerListener.COMMIT;
@@ -37,7 +37,6 @@
@Timeout(value = 40)
public class LocalLogManagerTest {
- private static final Logger log = LoggerFactory.getLogger(LocalLogManagerTest.class);
/**
* Test creating a LocalLogManager and closing it.
@@ -58,7 +57,7 @@ public void testCreateAndClose() throws Exception {
public void testClaimsLeadership() throws Exception {
try (LocalLogManagerTestEnv env =
LocalLogManagerTestEnv.createWithMockListeners(1)) {
- assertEquals(new MetaLogLeader(0, 0), env.waitForLeader());
+ assertEquals(new LeaderAndEpoch(OptionalInt.of(0), 1), env.waitForLeader());
env.close();
assertEquals(null, env.firstError.get());
}
@@ -71,20 +70,24 @@ public void testClaimsLeadership() throws Exception {
public void testPassLeadership() throws Exception {
try (LocalLogManagerTestEnv env =
LocalLogManagerTestEnv.createWithMockListeners(3)) {
- MetaLogLeader first = env.waitForLeader();
- MetaLogLeader cur = first;
+ LeaderAndEpoch first = env.waitForLeader();
+ LeaderAndEpoch cur = first;
do {
- env.logManagers().get(cur.nodeId()).renounce(cur.epoch());
- MetaLogLeader next = env.waitForLeader();
- while (next.epoch() == cur.epoch()) {
+ int currentLeaderId = cur.leaderId.orElseThrow(() ->
+ new AssertionError("Current leader is undefined")
+ );
+ env.logManagers().get(currentLeaderId).resign(cur.epoch);
+
+ LeaderAndEpoch next = env.waitForLeader();
+ while (next.epoch == cur.epoch) {
Thread.sleep(1);
next = env.waitForLeader();
}
- long expectedNextEpoch = cur.epoch() + 2;
- assertEquals(expectedNextEpoch, next.epoch(), "Expected next epoch to be " + expectedNextEpoch +
+ long expectedNextEpoch = cur.epoch + 2;
+ assertEquals(expectedNextEpoch, next.epoch, "Expected next epoch to be " + expectedNextEpoch +
", but found " + next);
cur = next;
- } while (cur.nodeId() == first.nodeId());
+ } while (cur.leaderId.equals(first.leaderId));
env.close();
assertEquals(null, env.firstError.get());
}
@@ -121,14 +124,18 @@ private static void waitForLastCommittedOffset(long targetOffset,
public void testCommits() throws Exception {
try (LocalLogManagerTestEnv env =
LocalLogManagerTestEnv.createWithMockListeners(3)) {
- MetaLogLeader leaderInfo = env.waitForLeader();
- LocalLogManager activeLogManager = env.logManagers().get(leaderInfo.nodeId());
- long epoch = activeLogManager.leader().epoch();
+ LeaderAndEpoch leaderInfo = env.waitForLeader();
+ int leaderId = leaderInfo.leaderId.orElseThrow(() ->
+ new AssertionError("Current leader is undefined")
+ );
+
+ LocalLogManager activeLogManager = env.logManagers().get(leaderId);
+ int epoch = activeLogManager.leaderAndEpoch().epoch;
List messages = Arrays.asList(
new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(0), (short) 0),
new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(1), (short) 0),
new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(2), (short) 0));
- assertEquals(3, activeLogManager.scheduleWrite(epoch, messages));
+ assertEquals(3, activeLogManager.scheduleAppend(epoch, messages));
for (LocalLogManager logManager : env.logManagers()) {
waitForLastCommittedOffset(3, logManager);
}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
index 52aeea052bdde..94e5116c118a9 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
@@ -20,6 +20,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metalog.LocalLogManager.SharedLogData;
+import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +60,7 @@ public static LocalLogManagerTestEnv createWithMockListeners(int numManagers) th
LocalLogManagerTestEnv testEnv = new LocalLogManagerTestEnv(numManagers);
try {
for (LocalLogManager logManager : testEnv.logManagers) {
- logManager.register(new MockMetaLogManagerListener());
+ logManager.register(new MockMetaLogManagerListener(logManager.nodeId));
}
} catch (Exception e) {
testEnv.close();
@@ -100,16 +101,16 @@ File dir() {
return dir;
}
- MetaLogLeader waitForLeader() throws InterruptedException {
- AtomicReference value = new AtomicReference<>(null);
+ LeaderAndEpoch waitForLeader() throws InterruptedException {
+ AtomicReference value = new AtomicReference<>(null);
TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
- MetaLogLeader result = null;
+ LeaderAndEpoch result = null;
for (LocalLogManager logManager : logManagers) {
- MetaLogLeader leader = logManager.leader();
- if (leader.nodeId() == logManager.nodeId()) {
+ LeaderAndEpoch leader = logManager.leaderAndEpoch();
+ if (leader.isLeader(logManager.nodeId)) {
if (result != null) {
- throw new RuntimeException("node " + leader.nodeId() +
- " thinks it's the leader, but so does " + result.nodeId());
+ throw new RuntimeException("node " + logManager.nodeId +
+ " thinks it's the leader, but so does " + result.leaderId);
}
result = leader;
}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
index fe61ec070285b..5e61417bf0be9 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
@@ -18,46 +18,65 @@
package org.apache.kafka.metalog;
import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.RaftClient;
import java.util.ArrayList;
import java.util.List;
+import java.util.OptionalInt;
-public class MockMetaLogManagerListener implements MetaLogListener {
+public class MockMetaLogManagerListener implements RaftClient.Listener {
public static final String COMMIT = "COMMIT";
public static final String LAST_COMMITTED_OFFSET = "LAST_COMMITTED_OFFSET";
public static final String NEW_LEADER = "NEW_LEADER";
public static final String RENOUNCE = "RENOUNCE";
public static final String SHUTDOWN = "SHUTDOWN";
+ private final int nodeId;
private final List serializedEvents = new ArrayList<>();
+ private LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), 0);
- @Override
- public synchronized void handleCommits(long lastCommittedOffset, List messages) {
- for (ApiMessage message : messages) {
- StringBuilder bld = new StringBuilder();
- bld.append(COMMIT).append(" ").append(message.toString());
- serializedEvents.add(bld.toString());
- }
- StringBuilder bld = new StringBuilder();
- bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset);
- serializedEvents.add(bld.toString());
+ public MockMetaLogManagerListener(int nodeId) {
+ this.nodeId = nodeId;
}
@Override
- public void handleNewLeader(MetaLogLeader leader) {
- StringBuilder bld = new StringBuilder();
- bld.append(NEW_LEADER).append(" ").
- append(leader.nodeId()).append(" ").append(leader.epoch());
- synchronized (this) {
- serializedEvents.add(bld.toString());
+ public synchronized void handleCommit(BatchReader reader) {
+ try {
+ while (reader.hasNext()) {
+ BatchReader.Batch batch = reader.next();
+ long lastCommittedOffset = batch.lastOffset();
+
+ for (ApiMessageAndVersion messageAndVersion : batch.records()) {
+ ApiMessage message = messageAndVersion.message();
+ StringBuilder bld = new StringBuilder();
+ bld.append(COMMIT).append(" ").append(message.toString());
+ serializedEvents.add(bld.toString());
+ }
+ StringBuilder bld = new StringBuilder();
+ bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset);
+ serializedEvents.add(bld.toString());
+ }
+ } finally {
+ reader.close();
}
}
@Override
- public void handleRenounce(long epoch) {
- StringBuilder bld = new StringBuilder();
- bld.append(RENOUNCE).append(" ").append(epoch);
- synchronized (this) {
+ public synchronized void handleLeaderChange(LeaderAndEpoch newLeaderAndEpoch) {
+ LeaderAndEpoch oldLeaderAndEpoch = this.leaderAndEpoch;
+ this.leaderAndEpoch = newLeaderAndEpoch;
+
+ if (newLeaderAndEpoch.isLeader(nodeId)) {
+ StringBuilder bld = new StringBuilder();
+ bld.append(NEW_LEADER).append(" ").
+ append(nodeId).append(" ").append(newLeaderAndEpoch.epoch);
+ serializedEvents.add(bld.toString());
+ } else if (oldLeaderAndEpoch.isLeader(nodeId)) {
+ StringBuilder bld = new StringBuilder();
+ bld.append(RENOUNCE).append(" ").append(newLeaderAndEpoch.epoch);
serializedEvents.add(bld.toString());
}
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/BatchReader.java b/raft/src/main/java/org/apache/kafka/raft/BatchReader.java
index e5f9e38612a0d..a302d5ae62130 100644
--- a/raft/src/main/java/org/apache/kafka/raft/BatchReader.java
+++ b/raft/src/main/java/org/apache/kafka/raft/BatchReader.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.raft.internals.MemoryBatchReader;
+
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
@@ -112,4 +115,9 @@ public int hashCode() {
}
}
+ static BatchReader singleton(Batch batch) {
+ return new MemoryBatchReader<>(Collections.singletonList(batch), reader -> {
+ });
+ }
+
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index a3dbbdd3a66fd..95a76b6cd49e9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -40,8 +40,6 @@
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
@@ -53,10 +51,12 @@
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochResponse;
+import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
+import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
@@ -340,17 +340,15 @@ private void maybeFireHandleCommit(long baseOffset, int epoch, List records)
}
}
- private void maybeFireHandleClaim(LeaderState state) {
- int leaderEpoch = state.epoch();
- long epochStartOffset = state.epochStartOffset();
+ private void maybeFireLeaderChange(LeaderState state) {
for (ListenerContext listenerContext : listenerContexts) {
- listenerContext.maybeFireHandleClaim(leaderEpoch, epochStartOffset);
+ listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch(), state.epochStartOffset());
}
}
- private void fireHandleResign(int epoch) {
+ private void maybeFireLeaderChange() {
for (ListenerContext listenerContext : listenerContexts) {
- listenerContext.fireHandleResign(epoch);
+ listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch());
}
}
@@ -387,6 +385,11 @@ public LeaderAndEpoch leaderAndEpoch() {
return quorum.leaderAndEpoch();
}
+ @Override
+ public OptionalInt nodeId() {
+ return quorum.localId();
+ }
+
private OffsetAndEpoch endOffset() {
return new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch());
}
@@ -395,9 +398,7 @@ private void resetConnections() {
requestManager.resetAll();
}
- private void onBecomeLeader(long currentTimeMs) {
- LeaderState state = quorum.leaderStateOrThrow();
-
+ private void onBecomeLeader(LeaderState state, long currentTimeMs) {
log.initializeLeaderEpoch(quorum.epoch());
// The high watermark can only be advanced once we have written a record
@@ -461,7 +462,10 @@ private boolean maybeTransitionToLeader(CandidateState state, long currentTimeMs
if (state.isVoteGranted()) {
long endOffset = log.endOffset().offset;
quorum.transitionToLeader(endOffset);
- onBecomeLeader(currentTimeMs);
+
+ LeaderState leaderState = quorum.leaderStateOrThrow();
+ maybeFireLeaderChange(leaderState);
+ onBecomeLeader(leaderState, currentTimeMs);
return true;
} else {
return false;
@@ -477,10 +481,6 @@ private void onBecomeCandidate(long currentTimeMs) throws IOException {
}
private void maybeResignLeadership() {
- if (quorum.isLeader()) {
- fireHandleResign(quorum.epoch());
- }
-
if (accumulator != null) {
accumulator.close();
accumulator = null;
@@ -490,24 +490,28 @@ private void maybeResignLeadership() {
private void transitionToCandidate(long currentTimeMs) throws IOException {
maybeResignLeadership();
quorum.transitionToCandidate();
+ maybeFireLeaderChange();
onBecomeCandidate(currentTimeMs);
}
private void transitionToUnattached(int epoch) throws IOException {
maybeResignLeadership();
quorum.transitionToUnattached(epoch);
+ maybeFireLeaderChange();
resetConnections();
}
private void transitionToResigned(List preferredSuccessors) {
fetchPurgatory.completeAllExceptionally(Errors.BROKER_NOT_AVAILABLE.exception("The broker is shutting down"));
quorum.transitionToResigned(preferredSuccessors);
+ maybeFireLeaderChange();
resetConnections();
}
private void transitionToVoted(int candidateId, int epoch) throws IOException {
maybeResignLeadership();
quorum.transitionToVoted(epoch, candidateId);
+ maybeFireLeaderChange();
resetConnections();
}
@@ -533,6 +537,7 @@ private void transitionToFollower(
) throws IOException {
maybeResignLeadership();
quorum.transitionToFollower(epoch, leaderId);
+ maybeFireLeaderChange();
onBecomeFollower(currentTimeMs);
}
@@ -1926,7 +1931,7 @@ private long pollResigned(long currentTimeMs) throws IOException {
private long pollLeader(long currentTimeMs) {
LeaderState state = quorum.leaderStateOrThrow();
- maybeFireHandleClaim(state);
+ maybeFireLeaderChange(state);
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) {
@@ -2256,6 +2261,11 @@ public CompletableFuture shutdown(int timeoutMs) {
return shutdownComplete;
}
+ @Override
+ public void resign(int epoch) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public SnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws IOException {
return new SnapshotWriter<>(
@@ -2324,7 +2334,7 @@ public void complete() {
private final class ListenerContext implements CloseListener> {
private final RaftClient.Listener listener;
// This field is used only by the Raft IO thread
- private int claimedEpoch = 0;
+ private LeaderAndEpoch lastFiredLeaderChange = new LeaderAndEpoch(OptionalInt.empty(), 0);
// These fields are visible to both the Raft IO thread and the listener
// and are protected through synchronization on this `ListenerContext` instance
@@ -2408,19 +2418,32 @@ private void fireHandleCommit(BatchReader reader) {
listener.handleCommit(reader);
}
- void maybeFireHandleClaim(int epoch, long epochStartOffset) {
- // We can fire `handleClaim` as soon as the listener has caught
- // up to the start of the leader epoch. This guarantees that the
- // state machine has seen the full committed state before it becomes
- // leader and begins writing to the log.
- if (epoch > claimedEpoch && nextOffset() >= epochStartOffset) {
- claimedEpoch = epoch;
- listener.handleClaim(epoch);
+ void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+ if (shouldFireLeaderChange(leaderAndEpoch)) {
+ lastFiredLeaderChange = leaderAndEpoch;
+ listener.handleLeaderChange(leaderAndEpoch);
}
}
- void fireHandleResign(int epoch) {
- listener.handleResign(epoch);
+ private boolean shouldFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+ if (leaderAndEpoch.equals(lastFiredLeaderChange)) {
+ return false;
+ } else if (leaderAndEpoch.epoch > lastFiredLeaderChange.epoch) {
+ return true;
+ } else {
+ return leaderAndEpoch.leaderId.isPresent() && !lastFiredLeaderChange.leaderId.isPresent();
+ }
+ }
+
+ void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long epochStartOffset) {
+ // If this node is becoming the leader, then we can fire `handleClaim` as soon
+ // as the listener has caught up to the start of the leader epoch. This guarantees
+ // that the state machine has seen the full committed state before it becomes
+ // leader and begins writing to the log.
+ if (shouldFireLeaderChange(leaderAndEpoch) && nextOffset() >= epochStartOffset) {
+ lastFiredLeaderChange = leaderAndEpoch;
+ listener.handleLeaderChange(leaderAndEpoch);
+ }
}
public synchronized void onClose(BatchReader reader) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java b/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
index 47bd404ae2798..0f803a890ebed 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
@@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) {
this.epoch = epoch;
}
+ public boolean isLeader(int nodeId) {
+ return leaderId.isPresent() && leaderId.getAsInt() == nodeId;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -41,4 +45,12 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(leaderId, epoch);
}
+
+ @Override
+ public String toString() {
+ return "LeaderAndEpoch(" +
+ "leaderId=" + leaderId +
+ ", epoch=" + epoch +
+ ')';
+ }
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index 86f8c187d8f35..8187a081dc5e2 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -222,6 +222,10 @@ public int localIdOrThrow() {
return localId.orElseThrow(() -> new IllegalStateException("Required local id is not present"));
}
+ public OptionalInt localId() {
+ return localId;
+ }
+
public int epoch() {
return state.epoch();
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 74488b450ede1..9d847c7a535e2 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -21,6 +21,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
public interface RaftClient extends Closeable {
@@ -45,24 +46,14 @@ interface Listener {
void handleCommit(BatchReader reader);
/**
- * Invoked after this node has become a leader. This is only called after
- * all commits up to the start of the leader's epoch have been sent to
- * {@link #handleCommit(BatchReader)}.
+ * Called on any change to leadership. This includes both when a leader is elected and
+ * when a leader steps down or fails.
*
- * After becoming a leader, the client is eligible to write to the log
- * using {@link #scheduleAppend(int, List)} or {@link #scheduleAtomicAppend(int, List)}.
- *
- * @param epoch the claimed leader epoch
+ * @param leader the current leader and epoch
*/
- default void handleClaim(int epoch) {}
+ default void handleLeaderChange(LeaderAndEpoch leader) {}
- /**
- * Invoked after a leader has stepped down. This callback may or may not
- * fire before the next leader has been elected.
- *
- * @param epoch the epoch that the leader is resigning from
- */
- default void handleResign(int epoch) {}
+ default void beginShutdown() {}
}
/**
@@ -86,6 +77,14 @@ default void handleResign(int epoch) {}
*/
LeaderAndEpoch leaderAndEpoch();
+ /**
+ * Get local nodeId if one is defined. This may be absent when the client is used
+ * as an anonymous observer, as in the case of the metadata shell.
+ *
+ * @return optional node id
+ */
+ OptionalInt nodeId();
+
/**
* Append a list of records to the log. The write will be scheduled for some time
* in the future. There is no guarantee that appended records will be written to
@@ -147,6 +146,16 @@ default void handleResign(int epoch) {}
*/
CompletableFuture shutdown(int timeoutMs);
+ /**
+ * Resign the leadership. The leader will give up its leadership in the current epoch,
+ * and a new election will be held. Note that nothing prevents this leader from getting
+ * reelected.
+ *
+ * @param epoch the epoch to resign from. If this does not match the current epoch, this
+ * call will be ignored.
+ */
+ void resign(int epoch);
+
/**
* Create a writable snapshot file for a given offset and epoch.
*
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
index 3db4d736a53f7..fe498d1060adc 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -88,18 +88,17 @@ public synchronized void handleCommit(BatchReader reader) {
}
@Override
- public synchronized void handleClaim(int epoch) {
- log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}",
- committed, epoch);
- this.uncommitted = committed;
- this.claimedEpoch = Optional.of(epoch);
- }
-
- @Override
- public synchronized void handleResign(int epoch) {
- log.debug("Counter uncommitted value reset after resigning leadership");
- this.uncommitted = -1;
- this.claimedEpoch = Optional.empty();
+ public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) {
+ if (newLeader.isLeader(nodeId)) {
+ log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}",
+ committed, newLeader);
+ this.uncommitted = committed;
+ this.claimedEpoch = Optional.of(newLeader.epoch);
+ } else {
+ log.debug("Counter uncommitted value reset after resigning leadership");
+ this.uncommitted = -1;
+ this.claimedEpoch = Optional.empty();
+ }
}
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
deleted file mode 100644
index 1ca63f1b9c3cd..0000000000000
--- a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.raft.metadata;
-
-import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.metadata.ApiMessageAndVersion;
-import org.apache.kafka.metalog.MetaLogLeader;
-import org.apache.kafka.metalog.MetaLogListener;
-import org.apache.kafka.metalog.MetaLogManager;
-import org.apache.kafka.raft.BatchReader;
-import org.apache.kafka.raft.LeaderAndEpoch;
-import org.apache.kafka.raft.RaftClient;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * For now, we rely on a shim to translate from `RaftClient` to `MetaLogManager`.
- * Once we check in to trunk, we can drop `RaftClient` and implement `MetaLogManager`
- * directly.
- */
-public class MetaLogRaftShim implements MetaLogManager {
- private final RaftClient client;
- private final int nodeId;
-
- public MetaLogRaftShim(RaftClient client, int nodeId) {
- this.client = client;
- this.nodeId = nodeId;
- }
-
- @Override
- public void initialize() {
- // NO-OP - The RaftClient is initialized externally
- }
-
- @Override
- public void register(MetaLogListener listener) {
- client.register(new ListenerShim(listener));
- }
-
- @Override
- public long scheduleAtomicWrite(long epoch, List batch) {
- return write(epoch, batch, true);
- }
-
- @Override
- public long scheduleWrite(long epoch, List batch) {
- return write(epoch, batch, false);
- }
-
- private long write(long epoch, List batch, boolean isAtomic) {
- final Long result;
- if (isAtomic) {
- result = client.scheduleAtomicAppend((int) epoch, batch);
- } else {
- result = client.scheduleAppend((int) epoch, batch);
- }
-
- if (result == null) {
- throw new IllegalArgumentException(
- String.format(
- "Unable to alloate a buffer for the schedule write operation: epoch %s, batch %s)",
- epoch,
- batch
- )
- );
- } else {
- return result;
- }
- }
-
- @Override
- public void renounce(long epoch) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public MetaLogLeader leader() {
- LeaderAndEpoch leaderAndEpoch = client.leaderAndEpoch();
- return new MetaLogLeader(leaderAndEpoch.leaderId.orElse(-1), leaderAndEpoch.epoch);
- }
-
- @Override
- public int nodeId() {
- return nodeId;
- }
-
- private class ListenerShim implements RaftClient.Listener {
- private final MetaLogListener listener;
-
- private ListenerShim(MetaLogListener listener) {
- this.listener = listener;
- }
-
- @Override
- public void handleCommit(BatchReader reader) {
- try {
- // TODO: The `BatchReader` might need to read from disk if this is
- // not a leader. We want to move this IO to the state machine so that
- // it does not block Raft replication
- while (reader.hasNext()) {
- BatchReader.Batch batch = reader.next();
- List records = batch.records().stream()
- .map(ApiMessageAndVersion::message)
- .collect(Collectors.toList());
- listener.handleCommits(batch.lastOffset(), records);
- }
- } finally {
- reader.close();
- }
- }
-
- @Override
- public void handleClaim(int epoch) {
- listener.handleNewLeader(new MetaLogLeader(nodeId, epoch));
- }
-
- @Override
- public void handleResign(int epoch) {
- listener.handleRenounce(epoch);
- }
-
- @Override
- public String toString() {
- return "ListenerShim(" +
- "listener=" + listener +
- ')';
- }
- }
-
-}
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 55d4e16dc86e7..8cf7f4874c186 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -2296,7 +2296,7 @@ public void testLateRegisteredListenerCatchesUp() throws Exception {
assertEquals(OptionalInt.of(epoch), context.listener.currentClaimedEpoch());
// Register a second listener and allow it to catch up to the high watermark
- RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener();
+ RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener);
context.client.poll();
assertEquals(OptionalLong.of(8L), secondListener.lastCommitOffset());
@@ -2388,7 +2388,7 @@ public void testHandleCommitCallbackFiresInVotedState() throws Exception {
assertEquals(OptionalLong.of(10L), context.client.highWatermark());
// Register another listener and verify that it catches up while we remain 'voted'
- RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener();
+ RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener);
context.client.poll();
context.assertVotedCandidate(candidateEpoch, otherNodeId);
@@ -2437,7 +2437,7 @@ public void testHandleCommitCallbackFiresInCandidateState() throws Exception {
context.assertVotedCandidate(candidateEpoch, localId);
// Register another listener and verify that it catches up
- RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener();
+ RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener);
context.client.poll();
context.assertVotedCandidate(candidateEpoch, localId);
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 024095af91de5..567625470cd66 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -203,7 +203,7 @@ public RaftClientTestContext build() throws IOException {
Metrics metrics = new Metrics(time);
MockNetworkChannel channel = new MockNetworkChannel(voters);
LogContext logContext = new LogContext();
- MockListener listener = new MockListener();
+ MockListener listener = new MockListener(localId);
Map voterAddressMap = voters.stream()
.collect(Collectors.toMap(id -> id, RaftClientTestContext::mockAddress));
RaftConfig raftConfig = new RaftConfig(voterAddressMap, requestTimeoutMs, RETRY_BACKOFF_MS, electionTimeoutMs,
@@ -1036,6 +1036,11 @@ static class MockListener implements RaftClient.Listener {
private final List> commits = new ArrayList<>();
private final Map claimedEpochStartOffsets = new HashMap<>();
private OptionalInt currentClaimedEpoch = OptionalInt.empty();
+ private final OptionalInt localId;
+
+ MockListener(OptionalInt localId) {
+ this.localId = localId;
+ }
int numCommittedBatches() {
return commits.size();
@@ -1082,19 +1087,18 @@ List commitWithLastOffset(long lastOffset) {
}
@Override
- public void handleClaim(int epoch) {
+ public void handleLeaderChange(LeaderAndEpoch leader) {
// We record the next expected offset as the claimed epoch's start
// offset. This is useful to verify that the `handleClaim` callback
// was not received early.
- long claimedEpochStartOffset = lastCommitOffset().isPresent() ?
- lastCommitOffset().getAsLong() + 1 : 0L;
- this.currentClaimedEpoch = OptionalInt.of(epoch);
- this.claimedEpochStartOffsets.put(epoch, claimedEpochStartOffset);
- }
-
- @Override
- public void handleResign(int epoch) {
- this.currentClaimedEpoch = OptionalInt.empty();
+ if (localId.isPresent() && leader.isLeader(localId.getAsInt())) {
+ long claimedEpochStartOffset = lastCommitOffset().isPresent() ?
+ lastCommitOffset().getAsLong() + 1 : 0L;
+ this.currentClaimedEpoch = OptionalInt.of(leader.epoch);
+ this.claimedEpochStartOffsets.put(leader.epoch, claimedEpochStartOffset);
+ } else {
+ this.currentClaimedEpoch = OptionalInt.empty();
+ }
}
@Override
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
index 739e0278d5cda..781d0e5d8c0ac 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
@@ -37,18 +37,16 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.ApiMessageAndVersion;
-import org.apache.kafka.metalog.MetaLogLeader;
-import org.apache.kafka.metalog.MetaLogListener;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.shell.MetadataNode.DirectoryNode;
import org.apache.kafka.shell.MetadataNode.FileNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
@@ -77,13 +75,15 @@ public void setWorkingDirectory(String workingDirectory) {
}
}
- class LogListener implements MetaLogListener, RaftClient.Listener {
+ class LogListener implements RaftClient.Listener {
@Override
public void handleCommit(BatchReader reader) {
try {
- // TODO: handle lastOffset
while (reader.hasNext()) {
BatchReader.Batch batch = reader.next();
+ log.debug("handleCommits " + batch.records() + " at offset " + batch.lastOffset());
+ DirectoryNode dir = data.root.mkdirs("metadataQuorum");
+ dir.create("offset").setContents(String.valueOf(batch.lastOffset()));
for (ApiMessageAndVersion messageAndVersion : batch.records()) {
handleMessage(messageAndVersion.message());
}
@@ -94,19 +94,7 @@ public void handleCommit(BatchReader reader) {
}
@Override
- public void handleCommits(long lastOffset, List messages) {
- appendEvent("handleCommits", () -> {
- log.debug("handleCommits " + messages + " at offset " + lastOffset);
- DirectoryNode dir = data.root.mkdirs("metadataQuorum");
- dir.create("offset").setContents(String.valueOf(lastOffset));
- for (ApiMessage message : messages) {
- handleMessage(message);
- }
- }, null);
- }
-
- @Override
- public void handleNewLeader(MetaLogLeader leader) {
+ public void handleLeaderChange(LeaderAndEpoch leader) {
appendEvent("handleNewLeader", () -> {
log.debug("handleNewLeader " + leader);
DirectoryNode dir = data.root.mkdirs("metadataQuorum");
@@ -114,18 +102,6 @@ public void handleNewLeader(MetaLogLeader leader) {
}, null);
}
- @Override
- public void handleClaim(int epoch) {
- // This shouldn't happen because we should never be the leader.
- log.debug("RaftClient.Listener sent handleClaim(epoch=" + epoch + ")");
- }
-
- @Override
- public void handleRenounce(long epoch) {
- // This shouldn't happen because we should never be the leader.
- log.debug("MetaLogListener sent handleRenounce(epoch=" + epoch + ")");
- }
-
@Override
public void beginShutdown() {
log.debug("MetaLogListener sent beginShutdown");
diff --git a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
index 907b4db467ae1..e7d6d1d3e1fac 100644
--- a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
+++ b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
@@ -18,7 +18,6 @@
package org.apache.kafka.shell;
import org.apache.kafka.common.message.LeaderChangeMessage;
-import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
@@ -27,11 +26,12 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.ApiMessageAndVersion;
-import org.apache.kafka.metalog.MetaLogLeader;
-import org.apache.kafka.metalog.MetaLogListener;
-import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.RaftClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +39,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
@@ -49,14 +50,14 @@ public final class SnapshotFileReader implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(SnapshotFileReader.class);
private final String snapshotPath;
- private final MetaLogListener listener;
+ private final RaftClient.Listener listener;
private final KafkaEventQueue queue;
private final CompletableFuture caughtUpFuture;
private FileRecords fileRecords;
private Iterator batchIterator;
private final MetadataRecordSerde serde = new MetadataRecordSerde();
- public SnapshotFileReader(String snapshotPath, MetaLogListener listener) {
+ public SnapshotFileReader(String snapshotPath, RaftClient.Listener listener) {
this.snapshotPath = snapshotPath;
this.listener = listener;
this.queue = new KafkaEventQueue(Time.SYSTEM,
@@ -101,7 +102,7 @@ private void handleNextBatch() {
private void scheduleHandleNextBatch() {
queue.append(new EventQueue.Event() {
@Override
- public void run() throws Exception {
+ public void run() {
handleNextBatch();
}
@@ -123,8 +124,10 @@ private void handleControlBatch(FileChannelRecordBatch batch) {
case LEADER_CHANGE:
LeaderChangeMessage message = new LeaderChangeMessage();
message.read(new ByteBufferAccessor(record.value()), (short) 0);
- listener.handleNewLeader(new MetaLogLeader(message.leaderId(),
- batch.partitionLeaderEpoch()));
+ listener.handleLeaderChange(new LeaderAndEpoch(
+ OptionalInt.of(message.leaderId()),
+ batch.partitionLeaderEpoch()
+ ));
break;
default:
log.error("Ignoring control record with type {} at offset {}",
@@ -137,18 +140,21 @@ private void handleControlBatch(FileChannelRecordBatch batch) {
}
private void handleMetadataBatch(FileChannelRecordBatch batch) {
- List messages = new ArrayList<>();
- for (Iterator iter = batch.iterator(); iter.hasNext(); ) {
- Record record = iter.next();
+ List messages = new ArrayList<>();
+ for (Record record : batch) {
ByteBufferAccessor accessor = new ByteBufferAccessor(record.value());
try {
ApiMessageAndVersion messageAndVersion = serde.read(accessor, record.valueSize());
- messages.add(messageAndVersion.message());
+ messages.add(messageAndVersion);
} catch (Throwable e) {
log.error("unable to read metadata record at offset {}", record.offset(), e);
}
}
- listener.handleCommits(batch.lastOffset(), messages);
+ listener.handleCommit(BatchReader.singleton(new BatchReader.Batch<>(
+ batch.baseOffset(),
+ batch.partitionLeaderEpoch(),
+ messages
+ )));
}
public void beginShutdown(String reason) {