Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,28 @@ class Partition(val topicPartition: TopicPartition,
}
}

private def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
val props = stateStore.fetchTopicConfig()
val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
val log = logManager.getOrCreateLog(topicPartition, config, isNew, isFutureReplica)
val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse {
info(s"No checkpointed highwatermark is found for partition $topicPartition")
0L
// Visible for testing
private[cluster] def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
val fetchLogConfig = () => {
val props = stateStore.fetchTopicConfig()
LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
}

logManager.initializingLog(topicPartition)
var maybeLog: Option[Log] = None
try {
val log = logManager.getOrCreateLog(topicPartition, fetchLogConfig(), isNew, isFutureReplica)
val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse {
info(s"No checkpointed highwatermark is found for partition $topicPartition")
0L
}
val initialHighWatermark = log.updateHighWatermark(checkpointHighWatermark)
info(s"Log loaded for partition $topicPartition with initial high watermark $initialHighWatermark")
maybeLog = Some(log)
log
} finally {
logManager.finishedInitializingLog(topicPartition, maybeLog, fetchLogConfig)
}
val initialHighWatermark = log.updateHighWatermark(checkpointHighWatermark)
info(s"Log loaded for partition $topicPartition with initial high watermark $initialHighWatermark")
log
}

def getReplica(replicaId: Int): Option[Replica] = Option(remoteReplicasMap.get(replicaId))
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,15 @@ class Log(@volatile var dir: File,
0
}

def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = {
def updateConfig(newConfig: LogConfig): Unit = {
val oldConfig = this.config
this.config = newConfig
if (updatedKeys.contains(LogConfig.MessageFormatVersionProp)) {
val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
val newRecordVersion = newConfig.messageFormatVersion.recordVersion
if (newRecordVersion.precedes(oldRecordVersion))
warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
val newRecordVersion = newConfig.messageFormatVersion.recordVersion
if (newRecordVersion.precedes(oldRecordVersion))
warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
if (newRecordVersion.value != oldRecordVersion.value)
initializeLeaderEpochCache()
}
}

private def checkIfMemoryMappedBufferClosed(): Unit = {
Expand Down
55 changes: 52 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ class LogManager(logDirs: Seq[File],
@volatile private var _currentDefaultConfig = initialDefaultConfig
@volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir

// This map contains all partitions whose logs are getting loaded and initialized. If log configuration
// of these partitions get updated at the same time, the corresponding entry in this map is set to "true",
// which triggers a config reload after initialization is finished (to get the latest config value).
// See KAFKA-8813 for more detail on the race condition
// Visible for testing
private[log] val partitionsInitializing = new ConcurrentHashMap[TopicPartition, Boolean]().asScala

def reconfigureDefaultLogConfig(logConfig: LogConfig): Unit = {
this._currentDefaultConfig = logConfig
}
Expand Down Expand Up @@ -659,6 +666,48 @@ class LogManager(logDirs: Seq[File],
Option(currentLogs.get(topicPartition))
}

/**
* Method to indicate that logs are getting initialized for the partition passed in as argument.
* This method should always be followed by [[kafka.log.LogManager#finishedInitializingLog]] to indicate that log
* initialization is done.
*/
def initializingLog(topicPartition: TopicPartition): Unit = {
partitionsInitializing(topicPartition) = false
}

/**
* Mark the partition configuration for all partitions that are getting initialized for topic
* as dirty. That will result in reloading of configuration once initialization is done.
*/
def topicConfigUpdated(topic: String): Unit = {
partitionsInitializing.keys.filter(_.topic() == topic).foreach {
topicPartition => partitionsInitializing.replace(topicPartition, false, true)
}
}

/**
* Mark all in progress partitions having dirty configuration if broker configuration is updated.
*/
def brokerConfigUpdated(): Unit = {
partitionsInitializing.keys.foreach {
topicPartition => partitionsInitializing.replace(topicPartition, false, true)
}
}

/**
* Method to indicate that the log initialization for the partition passed in as argument is
* finished. This method should follow a call to [[kafka.log.LogManager#initializingLog]]
*/
def finishedInitializingLog(topicPartition: TopicPartition,
maybeLog: Option[Log],
fetchLogConfig: () => LogConfig): Unit = {
if (partitionsInitializing(topicPartition)) {
maybeLog.foreach(_.updateConfig(fetchLogConfig()))
}

partitionsInitializing -= topicPartition
}

/**
* If the log already exists, just return a copy of the existing log
* Otherwise if isNew=true or if there is no offline log directory, create a log for the given topic and the given partition
Expand Down Expand Up @@ -955,9 +1004,9 @@ class LogManager(logDirs: Seq[File],
def allLogs: Iterable[Log] = currentLogs.values ++ futureLogs.values

def logsByTopic(topic: String): Seq[Log] = {
(currentLogs.toList ++ futureLogs.toList).filter { case (topicPartition, _) =>
topicPartition.topic() == topic
}.map { case (_, log) => log }
(currentLogs.toList ++ futureLogs.toList).collect {
case (topicPartition, log) if topicPartition.topic == topic => log
}
}

/**
Expand Down
19 changes: 13 additions & 6 deletions core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,27 @@ trait ConfigHandler {
*/
class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig, val quotas: QuotaManagers, kafkaController: KafkaController) extends ConfigHandler with Logging {

def processConfigChanges(topic: String, topicConfig: Properties): Unit = {
// Validate the configurations.
val configNamesToExclude = excludedConfigs(topic, topicConfig)

val logs = logManager.logsByTopic(topic).toBuffer
private def updateLogConfig(topic: String,
topicConfig: Properties,
configNamesToExclude: Set[String]): Unit = {
logManager.topicConfigUpdated(topic)
val logs = logManager.logsByTopic(topic)
if (logs.nonEmpty) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
val props = new Properties()
topicConfig.asScala.foreach { case (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}
val logConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
logs.foreach(_.updateConfig(topicConfig.asScala.keySet, logConfig))
logs.foreach(_.updateConfig(logConfig))
}
}

def processConfigChanges(topic: String, topicConfig: Properties): Unit = {
// Validate the configurations.
val configNamesToExclude = excludedConfigs(topic, topicConfig)

updateLogConfig(topic, topicConfig, configNamesToExclude)

def updateThrottledList(prop: String, quotaManager: ReplicationQuotaManager) = {
if (topicConfig.containsKey(prop) && topicConfig.getProperty(prop).length > 0) {
Expand Down
20 changes: 13 additions & 7 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,18 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Brok
// validation, no additional validation is performed.
}

private def updateLogsConfig(newBrokerDefaults: Map[String, Object]): Unit = {
logManager.brokerConfigUpdated()
logManager.allLogs.foreach { log =>
val props = mutable.Map.empty[Any, Any]
props ++= newBrokerDefaults
props ++= log.config.originals.asScala.filterKeys(log.config.overriddenConfigs.contains)

val logConfig = LogConfig(props.asJava)
log.updateConfig(logConfig)
}
}

override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
val currentLogConfig = logManager.currentDefaultConfig
val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable
Expand All @@ -626,14 +638,8 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Brok

logManager.reconfigureDefaultLogConfig(LogConfig(newBrokerDefaults))

logManager.allLogs.foreach { log =>
val props = mutable.Map.empty[Any, Any]
props ++= newBrokerDefaults.asScala
props ++= log.config.originals.asScala.filterKeys(log.config.overriddenConfigs.contains)
updateLogsConfig(newBrokerDefaults.asScala)

val logConfig = LogConfig(props.asJava)
log.updateConfig(newBrokerDefaults.asScala.keySet, logConfig)
}
if (logManager.currentDefaultConfig.uncleanLeaderElectionEnable && !origUncleanLeaderElectionEnable) {
server.kafkaController.enableDefaultUncleanLeaderElection()
}
Expand Down
105 changes: 103 additions & 2 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, ListOffsetRequest}
import org.junit.{After, Before, Test}
import org.junit.Assert._
import org.mockito.Mockito.{doNothing, mock, when}
import org.mockito.Mockito.{doAnswer, doNothing, mock, spy, times, verify, when}
import org.scalatest.Assertions.assertThrows
import org.mockito.ArgumentMatchers
import org.mockito.invocation.InvocationOnMock
Expand Down Expand Up @@ -1545,12 +1545,113 @@ class PartitionTest {
assertEquals(Set(), Metrics.defaultRegistry().allMetrics().asScala.keySet.filter(_.getType == "Partition"))
}

/**
* Test when log is getting initialized, its config remains untouched after initialization is done.
*/
@Test
def testLogConfigNotDirty(): Unit = {
val spyLogManager = spy(logManager)
val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
delayedOperations,
metadataCache,
spyLogManager)

partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)

// Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition),
ArgumentMatchers.any(),
ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation

// We should get config from ZK only once
verify(stateStore).fetchTopicConfig()
}

/**
* Test when log is getting initialized, its config remains gets reloaded if Topic config gets changed
* before initialization is done.
*/
@Test
def testLogConfigDirtyAsTopicUpdated(): Unit = {
val spyLogManager = spy(logManager)
doAnswer(new Answer[Unit] {
def answer(invocation: InvocationOnMock): Unit = {
logManager.initializingLog(topicPartition)
logManager.topicConfigUpdated(topicPartition.topic())
}
}).when(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))

val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
delayedOperations,
metadataCache,
spyLogManager)

partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)

// Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition),
ArgumentMatchers.any(),
ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation

// We should get config from ZK twice, once before log is created, and second time once
// we find log config is dirty and refresh it.
verify(stateStore, times(2)).fetchTopicConfig()
}

/**
* Test when log is getting initialized, its config remains gets reloaded if Broker config gets changed
* before initialization is done.
*/
@Test
def testLogConfigDirtyAsBrokerUpdated(): Unit = {
val spyLogManager = spy(logManager)
doAnswer(new Answer[Unit] {
def answer(invocation: InvocationOnMock): Unit = {
logManager.initializingLog(topicPartition)
logManager.brokerConfigUpdated()
}
}).when(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))

val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
delayedOperations,
metadataCache,
spyLogManager)

partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)

// Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition),
ArgumentMatchers.any(),
ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation

// We should get config from ZK twice, once before log is created, and second time once
// we find log config is dirty and refresh it.
verify(stateStore, times(2)).fetchTopicConfig()
}

private def seedLogData(log: Log, numRecords: Int, leaderEpoch: Int): Unit = {
for (i <- 0 until numRecords) {
val records = MemoryRecords.withRecords(0L, CompressionType.NONE, leaderEpoch,
new SimpleRecord(s"k$i".getBytes, s"v$i".getBytes))
log.appendAsLeader(records, leaderEpoch)
}
}

}
Loading