Skip to content

KAFKA-14485: Move LogCleaner to storage module #19387

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3737,7 +3737,7 @@ project(':connect:mirror') {
testImplementation project(':core')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':server')
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':server-common')


testRuntimeOnly project(':connect:runtime')
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-server-common.xml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
</subpackage>
<subpackage name="config">
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.clients"/>
</subpackage>
</subpackage>

Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-storage.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.config" />
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage.internals"/>
<allow pkg="org.apache.kafka.storage.log.metrics"/>
Expand Down
12 changes: 11 additions & 1 deletion config/log4j2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,17 @@ Configuration:
AppenderRef:
ref: ControllerAppender
# LogCleaner logger
- name: kafka.log.LogCleaner
- name: org.apache.kafka.storage.internals.log.LogCleaner
level: INFO
additivity: false
AppenderRef:
ref: CleanerAppender
- name: org.apache.kafka.storage.internals.log.LogCleaner$CleanerThread
level: INFO
additivity: false
AppenderRef:
ref: CleanerAppender
- name: org.apache.kafka.storage.internals.log.Cleaner
level: INFO
additivity: false
AppenderRef:
Expand Down
1,307 changes: 0 additions & 1,307 deletions core/src/main/scala/kafka/log/LogCleaner.scala

This file was deleted.

28 changes: 16 additions & 12 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
import java.util.{Collections, Optional, OptionalLong, Properties}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{FileLock, Scheduler}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, LogOffsetsListener, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner, LogConfig, LogDirFailureChannel, LogOffsetsListener, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog}
import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats

import java.util
import java.util.stream.Collectors

/**
* The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
Expand Down Expand Up @@ -629,7 +630,7 @@ class LogManager(logDirs: Seq[File],
initialTaskDelayMs)
}
if (cleanerConfig.enableCleaner) {
_cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
_cleaner = new LogCleaner(cleanerConfig, liveLogDirs.asJava, currentLogs, logDirFailureChannel, time)
_cleaner.startup()
}
}
Expand Down Expand Up @@ -894,7 +895,7 @@ class LogManager(logDirs: Seq[File],
*/
private def resumeCleaning(topicPartition: TopicPartition): Unit = {
if (cleaner != null) {
cleaner.resumeCleaning(Seq(topicPartition))
cleaner.resumeCleaning(util.Set.of(topicPartition))
info(s"Cleaning for partition $topicPartition is resumed")
}
}
Expand Down Expand Up @@ -1286,7 +1287,7 @@ class LogManager(logDirs: Seq[File],
if (cleaner != null && !isFuture) {
cleaner.abortCleaning(topicPartition)
if (checkpoint) {
cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option(topicPartition))
cleaner.updateCheckpoints(removedLog.parentDirFile, Optional.of(topicPartition))
}
}
if (isStray) {
Expand Down Expand Up @@ -1344,7 +1345,7 @@ class LogManager(logDirs: Seq[File],

val logsByDirCached = logsByDir
logDirs.foreach { logDir =>
if (cleaner != null) cleaner.updateCheckpoints(logDir)
if (cleaner != null) cleaner.updateCheckpoints(logDir, Optional.empty())
val logsToCheckpoint = logsInDir(logsByDirCached, logDir)
checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
Expand Down Expand Up @@ -1382,19 +1383,22 @@ class LogManager(logDirs: Seq[File],
val startMs = time.milliseconds

// clean current logs.
val deletableLogs = {
val deletableLogs: util.Map[TopicPartition, UnifiedLog] = {
if (cleaner != null) {
// prevent cleaner from working on same partitions when changing cleanup policy
cleaner.pauseCleaningForNonCompactedPartitions()
} else {
currentLogs.asScala.filter {
case (_, log) => !log.config.compact
}
currentLogs.entrySet().stream()
.filter(e => !e.getValue.config.compact)
.collect(Collectors.toMap(
(e: util.Map.Entry[TopicPartition, UnifiedLog]) => e.getKey,
(e: util.Map.Entry[TopicPartition, UnifiedLog]) => e.getValue
))
}
}

try {
deletableLogs.foreach {
deletableLogs.forEach {
case (topicPartition, log) =>
debug(s"Garbage collecting '${log.name}'")
total += log.deleteOldSegments()
Expand All @@ -1408,7 +1412,7 @@ class LogManager(logDirs: Seq[File],
}
} finally {
if (cleaner != null) {
cleaner.resumeCleaning(deletableLogs.map(_._1))
cleaner.resumeCleaning(deletableLogs.keySet())
}
}

Expand Down Expand Up @@ -1548,7 +1552,7 @@ object LogManager {
LogConfig.validateBrokerLogConfigValues(defaultProps, config.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
val defaultLogConfig = new LogConfig(defaultProps)

val cleanerConfig = LogCleaner.cleanerConfig(config)
val cleanerConfig = new CleanerConfig(config)
val transactionLogConfig = new TransactionLogConfig(config)

new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util
import java.util.{Collections, Properties}
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.log.{LogCleaner, LogManager}
import kafka.log.LogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.server.DynamicBrokerConfig._
Expand All @@ -46,7 +46,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs}
import org.apache.kafka.server.telemetry.ClientTelemetry
import org.apache.kafka.snapshot.RecordsSnapshotReader
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.storage.internals.log.{LogCleaner, LogConfig}

import scala.collection._
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -89,7 +89,7 @@ object DynamicBrokerConfig {
private[server] val DynamicProducerStateManagerConfig = Set(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)

val AllDynamicConfigs = DynamicSecurityConfigs ++
LogCleaner.ReconfigurableConfigs ++
LogCleaner.RECONFIGURABLE_CONFIGS.asScala ++
DynamicLogConfig.ReconfigurableConfigs ++
DynamicThreadPool.RECONFIGURABLE_CONFIGS.asScala ++
Set(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
val broker = cluster.brokers.asScala.head._2
val log = broker.logManager.getLog(tp).get
log.roll()
assertTrue(broker.logManager.cleaner.awaitCleaned(tp, 0))
assertTrue(broker.logManager.cleaner.awaitCleaned(tp, 0, 60000L))
}

private def withAdmin(f: Admin => Unit): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
configs.get(topicResource1).get(TopicConfig.RETENTION_MS_CONFIG).value)

val maxMessageBytes2 = configs.get(topicResource2).get(TopicConfig.MAX_MESSAGE_BYTES_CONFIG)
assertEquals(LogConfig.DEFAULT_MAX_MESSAGE_BYTES.toString, maxMessageBytes2.value)
assertEquals(ServerLogConfigs.MAX_MESSAGE_BYTES_DEFAULT.toString, maxMessageBytes2.value)
assertEquals(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageBytes2.name)
assertTrue(maxMessageBytes2.isDefault)
assertFalse(maxMessageBytes2.isSensitive)
Expand Down Expand Up @@ -3467,7 +3467,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(2, configs.size)

assertEquals(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO.toString, configs.get(topic1Resource).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE, configs.get(topic1Resource).get(TopicConfig.COMPRESSION_TYPE_CONFIG).value)
assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, configs.get(topic1Resource).get(TopicConfig.COMPRESSION_TYPE_CONFIG).value)
assertEquals("0.9", configs.get(topic2Resource).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)

// Check invalid use of append/subtract operation types
Expand Down Expand Up @@ -4120,12 +4120,12 @@ object PlaintextAdminIntegrationTest {

assertEquals(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO.toString,
configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE,
assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT,
configs.get(topicResource1).get(TopicConfig.COMPRESSION_TYPE_CONFIG).value)

assertEquals("snappy", configs.get(topicResource2).get(TopicConfig.COMPRESSION_TYPE_CONFIG).value)

assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE, configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value)
assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value)

// Alter configs with validateOnly = true: first and third are invalid, second is valid
alterConfigs.put(topicResource1, util.Arrays.asList(
Expand All @@ -4149,11 +4149,11 @@ object PlaintextAdminIntegrationTest {

assertEquals(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO.toString,
configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE,
assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT,
configs.get(topicResource1).get(TopicConfig.COMPRESSION_TYPE_CONFIG).value)

assertEquals("snappy", configs.get(topicResource2).get(TopicConfig.COMPRESSION_TYPE_CONFIG).value)

assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE, configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value)
assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException}
import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType}
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.server.config.ServerLogConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.params.ParameterizedTest
Expand Down Expand Up @@ -262,7 +262,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
val valueLengthSize = 3
val overhead = Records.LOG_OVERHEAD + DefaultRecordBatch.RECORD_BATCH_OVERHEAD + DefaultRecord.MAX_RECORD_OVERHEAD +
keyLengthSize + headerLengthSize + valueLengthSize
val valueSize = LogConfig.DEFAULT_MAX_MESSAGE_BYTES - overhead
val valueSize = ServerLogConfigs.MAX_MESSAGE_BYTES_DEFAULT - overhead

val record0 = new ProducerRecord(topic, new Array[Byte](0), new Array[Byte](valueSize))
assertEquals(record0.value.length, producer.send(record0).get.serializedValueSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.SecurityUtils
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, WILDCARD_PRINCIPAL_STRING}
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, ServerConfigs}
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
Expand Down Expand Up @@ -584,7 +583,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
assertEquals(100000, segmentBytesConfig.value.toLong)
assertEquals(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG, segmentBytesConfig.source)
val compressionConfig = topicConfigs.find(_.name == TopicConfig.COMPRESSION_TYPE_CONFIG).get
assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE, compressionConfig.value)
assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, compressionConfig.value)
assertEquals(ConfigEntry.ConfigSource.DEFAULT_CONFIG, compressionConfig.source)

assertFutureThrows(classOf[TopicAuthorizationException], result.numPartitions(topic2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion, RequestLocal, TransactionVersion}
import org.apache.kafka.server.common.TransactionVersion.{TV_0, TV_2}
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.MockScheduler
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LogConfig, LogOffsetMetadata, UnifiedLog}
Expand Down Expand Up @@ -1134,7 +1135,7 @@ class TransactionStateManagerTest {
val partitionIds = 0 until numPartitions

loadTransactionsForPartitions(partitionIds)
expectLogConfig(partitionIds, LogConfig.DEFAULT_MAX_MESSAGE_BYTES)
expectLogConfig(partitionIds, ServerLogConfigs.MAX_MESSAGE_BYTES_DEFAULT)

txnMetadata1.txnLastUpdateTimestamp = time.milliseconds() - txnConfig.transactionalIdExpirationMs
txnMetadata1.state = txnState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/
package kafka.log

import java.io.File
import java.nio.file.Files
import java.util.{Optional, Properties}
import kafka.utils.TestUtils
import kafka.utils.Implicits._
import org.apache.kafka.common.TopicPartition
Expand All @@ -28,10 +25,14 @@ import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordVersion
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, UnifiedLog}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, UnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.{AfterEach, Tag}

import java.io.File
import java.nio.file.Files
import java.util
import java.util.{Optional, Properties}
import scala.collection.Seq
import scala.collection.mutable.ListBuffer
import scala.util.Random
Expand Down Expand Up @@ -93,7 +94,7 @@ abstract class AbstractLogCleanerIntegrationTest {
cleanerIoBufferSize: Option[Int] = None,
propertyOverrides: Properties = new Properties()): LogCleaner = {

val logMap = new java.util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]()
val logMap = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]()
for (partition <- partitions) {
val dir = new File(logDir, s"${partition.topic}-${partition.partition}")
Files.createDirectories(dir.toPath)
Expand Down Expand Up @@ -133,10 +134,10 @@ abstract class AbstractLogCleanerIntegrationTest {
backoffMs,
true)
new LogCleaner(cleanerConfig,
logDirs = Array(logDir),
logs = logMap,
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)
util.List.of(logDir),
logMap,
new LogDirFailureChannel(1),
time)
}

private var ctr = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
val firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset

// the first block should get cleaned
cleaner.awaitCleaned(new TopicPartition("log", 0), firstBlockCleanableSegmentOffset)
cleaner.awaitCleaned(new TopicPartition("log", 0), firstBlockCleanableSegmentOffset, 60000L)

val read1 = readFromLog(log)
val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(new TopicPartition("log", 0))
Expand All @@ -181,7 +181,7 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {

time.sleep(maxCompactionLagMs + 1)
// the second block should get cleaned. only zero keys left
cleaner.awaitCleaned(new TopicPartition("log", 0), activeSegAtT1.baseOffset)
cleaner.awaitCleaned(new TopicPartition("log", 0), activeSegAtT1.baseOffset, 60000L)

val read2 = readFromLog(log)

Expand Down Expand Up @@ -222,10 +222,10 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
cleaner.startup()
assertEquals(0, cleaner.deadThreadCount)
// we simulate the unexpected error with an interrupt
cleaner.cleaners.foreach(_.interrupt())
cleaner.cleaners.forEach(_.interrupt())
// wait until interruption is propagated to all the threads
TestUtils.waitUntilTrue(
() => cleaner.cleaners.foldLeft(true)((result, thread) => {
() => cleaner.cleaners.asScala.foldLeft(true)((result, thread) => {
thread.isThreadFailed && result
}), "Threads didn't terminate unexpectedly"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class LogCleanerLagIntegrationTest extends AbstractLogCleanerIntegrationTest wit
val firstBlock1SegmentBaseOffset = activeSegAtT0.baseOffset

// the first block should get cleaned
cleaner.awaitCleaned(new TopicPartition("log", 0), activeSegAtT0.baseOffset)
cleaner.awaitCleaned(new TopicPartition("log", 0), activeSegAtT0.baseOffset, 60000L)

// check the data is the same
val read1 = readFromLog(log)
Expand Down
Loading