Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.policy"/>
Expand Down Expand Up @@ -279,6 +280,7 @@
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.test" />
<subpackage name="authorizer">
Expand Down Expand Up @@ -362,7 +364,9 @@
<allow pkg="kafka.utils" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.log" />
<allow pkg="org.apache.kafka.server.record" />
<allow pkg="org.apache.kafka.test" />

<subpackage name="remote">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package kafka.server.builders;

import kafka.log.CleanerConfig;
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.log.ProducerStateManagerConfig;
import kafka.server.BrokerTopicStats;
import kafka.server.metadata.ConfigRepository;
import kafka.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.log.internals.CleanerConfig;
import org.apache.kafka.server.log.internals.LogConfig;
import org.apache.kafka.server.log.internals.LogDirFailureChannel;
import scala.collection.JavaConverters;

Expand Down
11 changes: 5 additions & 6 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,23 @@ package kafka.admin
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}

import joptsimple._
import kafka.log.LogConfig
import kafka.server.DynamicConfig.QuotaConfigs
import kafka.server.{ConfigEntityName, ConfigType, Defaults, DynamicBrokerConfig, DynamicConfig, KafkaConfig}
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging, PasswordEncoder}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
import org.apache.kafka.server.log.internals.LogConfig
import org.apache.zookeeper.client.ZKClientConfig

import scala.annotation.nowarn
Expand Down Expand Up @@ -293,8 +292,8 @@ object ConfigCommand extends Logging {
//Create properties, parsing square brackets from values if necessary
configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).replaceAll("\\[?\\]?", "").trim))
}
if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
println(s"WARNING: The configuration ${LogConfig.MessageFormatVersionProp}=${props.getProperty(LogConfig.MessageFormatVersionProp)} is specified. " +
if (props.containsKey(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)) {
println(s"WARNING: The configuration ${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG}=${props.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)} is specified. " +
"This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker or " +
"if the inter.broker.protocol.version is 3.0 or newer. This configuration is deprecated and it will be removed in Apache Kafka 4.0.")
}
Expand Down Expand Up @@ -786,7 +785,7 @@ object ConfigCommand extends Logging {

val nl = System.getProperty("line.separator")
val addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
"For entity-type '" + ConfigType.Topic + "': " + LogConfig.configNames.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.Topic + "': " + LogConfig.configNames.asScala.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.Broker + "': " + DynamicConfig.Broker.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.User + "': " + DynamicConfig.User.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.Client + "': " + DynamicConfig.Client.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package kafka.admin
import java.util
import java.util.Optional
import java.util.concurrent.ExecutionException

import kafka.common.AdminCommandFailedException
import kafka.log.LogConfig
import kafka.server.DynamicConfig
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, Json, Logging}
import kafka.utils.Implicits._
Expand All @@ -32,6 +30,7 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ReplicaNotAvailableException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, KafkaFuture, TopicPartition, TopicPartitionReplica}
import org.apache.kafka.server.log.internals.LogConfig

import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq, mutable}
Expand Down Expand Up @@ -70,9 +69,9 @@ object ReassignPartitionsCommand extends Logging {

// Throttles that are set at the level of an individual topic.
private[admin] val topicLevelLeaderThrottle =
LogConfig.LeaderReplicationThrottledReplicasProp
LogConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG
private[admin] val topicLevelFollowerThrottle =
LogConfig.FollowerReplicationThrottledReplicasProp
LogConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG
private[admin] val topicLevelThrottles = Seq(
topicLevelLeaderThrottle,
topicLevelFollowerThrottle
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util
import java.util.{Collections, Properties}
import joptsimple._
import kafka.common.AdminCommandFailedException
import kafka.log.LogConfig
import kafka.utils._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.CreatePartitionsOptions
Expand All @@ -34,6 +33,7 @@ import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, TopicExistsException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.LogConfig

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -435,8 +435,8 @@ object TopicCommand extends Logging {
val props = new Properties
configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
LogConfig.validate(props)
if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
println(s"WARNING: The configuration ${LogConfig.MessageFormatVersionProp}=${props.getProperty(LogConfig.MessageFormatVersionProp)} is specified. " +
if (props.containsKey(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)) {
println(s"WARNING: The configuration ${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG}=${props.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)} is specified. " +
"This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker or " +
"if the inter.broker.protocol.version is 3.0 or newer. This configuration is deprecated and it will be removed in Apache Kafka 4.0.")
}
Expand Down Expand Up @@ -518,7 +518,7 @@ object TopicCommand extends Logging {
private val kafkaConfigsCanAlterTopicConfigsViaBootstrapServer =
" (the kafka-configs CLI supports altering topic configs with a --bootstrap-server option)"
private val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered." +
" The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
" The following is a list of valid configurations: " + nl + LogConfig.configNames.asScala.map("\t" + _).mkString(nl) + nl +
"See the Kafka documentation for full details on the topic configs." +
" It is supported only in combination with --create if --bootstrap-server option is used" +
kafkaConfigsCanAlterTopicConfigsViaBootstrapServer + ".")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package kafka.coordinator.group
import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.OffsetAndMetadata
import kafka.log.LogConfig
import kafka.server._
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
Expand Down Expand Up @@ -88,9 +88,9 @@ class GroupCoordinator(val brokerId: Int,

def offsetsTopicConfigs: Properties = {
val props = new Properties
props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
props.put(LogConfig.CompressionTypeProp, BrokerCompressionType.PRODUCER.name)
props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
props.put(TopicConfig.SEGMENT_BYTES_CONFIG, offsetConfig.offsetsTopicSegmentBytes.toString)
props.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.PRODUCER.name)

props
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package kafka.coordinator.transaction

import kafka.log.LogConfig

import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.TimeUnit
Expand All @@ -27,6 +25,7 @@ import kafka.server.{Defaults, FetchLogEnd, ReplicaManager, RequestLocal}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils.{Logging, Pool, Scheduler}
import kafka.utils.Implicits._
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.ListTransactionsResponseData
import org.apache.kafka.common.metrics.Metrics
Expand Down Expand Up @@ -402,11 +401,11 @@ class TransactionStateManager(brokerId: Int,
val props = new Properties

// enforce disabled unclean leader election, no compression types, and compact cleanup policy
props.put(LogConfig.UncleanLeaderElectionEnableProp, "false")
props.put(LogConfig.CompressionTypeProp, BrokerCompressionType.UNCOMPRESSED.name)
props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
props.put(LogConfig.MinInSyncReplicasProp, config.transactionLogMinInsyncReplicas.toString)
props.put(LogConfig.SegmentBytesProp, config.transactionLogSegmentBytes.toString)
props.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
props.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.UNCOMPRESSED.name)
props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
props.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, config.transactionLogMinInsyncReplicas.toString)
props.put(TopicConfig.SEGMENT_BYTES_CONFIG, config.transactionLogSegmentBytes.toString)

props
}
Expand Down
41 changes: 0 additions & 41 deletions core/src/main/scala/kafka/log/CleanerConfig.scala

This file was deleted.

2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.LogFileUtils.offsetFromFileName
import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, LogOffsetMetadata, OffsetPosition}
import org.apache.kafka.server.log.internals.{AbortedTxn, LogConfig, LogDirFailureChannel, LogOffsetMetadata, OffsetPosition}

import scala.jdk.CollectionConverters._
import scala.collection.{Seq, immutable}
Expand Down
23 changes: 11 additions & 12 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.log.internals.{AbortedTxn, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex}
import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex}

import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -174,8 +174,7 @@ class LogCleaner(initialConfig: CleanerConfig,
}

override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
val newCleanerConfig = LogCleaner.cleanerConfig(newConfig)
val numThreads = newCleanerConfig.numThreads
val numThreads = LogCleaner.cleanerConfig(newConfig).numThreads
val currentThreads = config.numThreads
if (numThreads < 1)
throw new ConfigException(s"Log cleaner threads should be at least 1")
Expand Down Expand Up @@ -332,7 +331,7 @@ class LogCleaner(initialConfig: CleanerConfig,
override def doWork(): Unit = {
val cleaned = tryCleanFilthiestLog()
if (!cleaned)
pause(config.backOffMs, TimeUnit.MILLISECONDS)
pause(config.backoffMs, TimeUnit.MILLISECONDS)

cleanerManager.maintainUncleanablePartitions()
}
Expand Down Expand Up @@ -454,14 +453,14 @@ object LogCleaner {
)

def cleanerConfig(config: KafkaConfig): CleanerConfig = {
CleanerConfig(numThreads = config.logCleanerThreads,
dedupeBufferSize = config.logCleanerDedupeBufferSize,
dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
ioBufferSize = config.logCleanerIoBufferSize,
maxMessageSize = config.messageMaxBytes,
maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
backOffMs = config.logCleanerBackoffMs,
enableCleaner = config.logCleanerEnable)
new CleanerConfig(config.logCleanerThreads,
config.logCleanerDedupeBufferSize,
config.logCleanerDedupeBufferLoadFactor,
config.logCleanerIoBufferSize,
config.messageMaxBytes,
config.logCleanerIoMaxBytesPerSecond,
config.logCleanerBackoffMs,
config.logCleanerEnable)

}
}
Expand Down
Loading