Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.kafka.common.config.ConfigDef.Range;

import java.util.Set;

public class SaslConfigs {

private static final String OAUTHBEARER_NOTE = " Currently applies only to OAUTHBEARER.";
Expand Down Expand Up @@ -196,6 +198,20 @@ public class SaslConfigs {
public static final boolean DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE = false;
public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_DOC = "The (optional) setting to enable the OAuth client to URL-encode the client_id and client_secret in the authorization header"
+ " in accordance with RFC6749, see <a href=\"https://datatracker.ietf.org/doc/html/rfc6749#section-2.3.1\">here</a> for more details. The default value is set to 'false' for backward compatibility";

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SaslConfigs.java is public, so maybe we can move this set to DynamicConfig?

SaslConfigs.SASL_JAAS_CONFIG,
SaslConfigs.SASL_KERBEROS_SERVICE_NAME,
SaslConfigs.SASL_KERBEROS_KINIT_CMD,
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR,
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER,
SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN,
SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR,
SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER,
SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS,
SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS
);

public static void addClientSaslSupport(ConfigDef config) {
config.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,24 @@ public static void addClientSslSupport(ConfigDef config) {
SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG,
SslConfigs.SSL_KEYSTORE_KEY_CONFIG,
SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG);

public static final Set<String> DYNAMIC_LISTENER_CONFIGS = Set.of(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

SslConfigs.SSL_PROTOCOL_CONFIG,
SslConfigs.SSL_PROVIDER_CONFIG,
SslConfigs.SSL_CIPHER_SUITES_CONFIG,
SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
SslConfigs.SSL_KEY_PASSWORD_CONFIG,
SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG,
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG,
SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG);

public static final Set<String> NON_RECONFIGURABLE_CONFIGS = Set.of(
BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Set;

import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
Expand Down Expand Up @@ -200,4 +201,11 @@ public class BrokerSecurityConfigs {
.define(SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, INT, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, LOW, SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE, LIST, null, LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER, STRING, null, LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG);
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.{ExecutionException, TimeUnit}
import java.util.{Collections, Properties}
import joptsimple._
import kafka.server.DynamicConfig
import org.apache.kafka.server.config.DynamicConfig
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
Expand Down
11 changes: 1 addition & 10 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class LogCleaner(initialConfig: CleanerConfig,
* @return A set of configs that is reconfigurable in LogCleaner
*/
override def reconfigurableConfigs: Set[String] = {
LogCleaner.ReconfigurableConfigs
CleanerConfig.RECONFIGURABLE_CONFIGS.asScala ++ Set(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG)
}

/**
Expand Down Expand Up @@ -500,15 +500,6 @@ class LogCleaner(initialConfig: CleanerConfig,
}

object LogCleaner {
val ReconfigurableConfigs: Set[String] = Set(
CleanerConfig.LOG_CLEANER_THREADS_PROP,
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP,
CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP,
ServerConfigs.MESSAGE_MAX_BYTES_CONFIG,
CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP
)

def cleanerConfig(config: KafkaConfig): CleanerConfig = {
new CleanerConfig(config.logCleanerThreads,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ConfigHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.metadata.ConfigRepository
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.server.config.{DynamicConfig, ServerTopicConfigSynonyms}
import org.apache.kafka.storage.internals.log.LogConfig

import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -250,7 +250,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
.filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG.id)
val synonyms = if (!includeSynonyms) List.empty else allSynonyms
val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG.id else allSynonyms.head.source
val readOnly = !DynamicBrokerConfig.AllDynamicConfigs.contains(name)
val readOnly = !DynamicConfig.ALL_DYNAMIC_CONFIGS.contains(name)

val dataType = configResponseType(configEntryType)
val configDocumentation = if (includeDocumentation) brokerDocumentation(name) else null
Expand Down
33 changes: 11 additions & 22 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,30 @@

package kafka.server

import java.util
import java.util.{Collections, Properties}
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.log.{LogCleaner, LogManager}
import kafka.log.LogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.network.EndPoint
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs}
import org.apache.kafka.common.config._
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.{ConfigUtils, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.{EndPoint, SocketServerConfigs}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.config._
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs}
import org.apache.kafka.server.telemetry.ClientTelemetry
import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}

import java.util
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.{Collections, Properties}
import scala.collection._
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -82,16 +81,6 @@ object DynamicBrokerConfig {
private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
private[server] val DynamicProducerStateManagerConfig = Set(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)

val AllDynamicConfigs = DynamicSecurityConfigs ++
LogCleaner.ReconfigurableConfigs ++
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please move ReconfigurableConfigs to CleanerConfig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 PTAL

DynamicLogConfig.ReconfigurableConfigs ++
DynamicThreadPool.ReconfigurableConfigs ++
Set(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG) ++
DynamicListenerConfig.ReconfigurableConfigs ++
SocketServer.ReconfigurableConfigs ++
DynamicProducerStateManagerConfig ++
DynamicRemoteLogConfig.ReconfigurableConfigs

private val ClusterLevelListenerConfigs = Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff(
ClusterLevelListenerConfigs)
Expand Down Expand Up @@ -152,7 +141,7 @@ object DynamicBrokerConfig {
}

private def nonDynamicConfigs(props: Properties): Set[String] = {
props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps)
props.asScala.keySet.intersect(DynamicConfig.Broker.NON_DYNAMIC_PROPS.asScala)
}

private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = {
Expand All @@ -169,7 +158,7 @@ object DynamicBrokerConfig {
}

private[server] def dynamicConfigUpdateModes: util.Map[String, String] = {
AllDynamicConfigs.map { name =>
DynamicConfig.ALL_DYNAMIC_CONFIGS.asScala.map { name =>
val mode = if (PerBrokerConfigs.contains(name)) "per-broker" else "cluster-wide"
name -> mode
}.toMap.asJava
Expand Down Expand Up @@ -282,7 +271,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}

private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = {
val nonDynamic = configNames.intersect(DynamicConfig.Broker.nonDynamicProps)
val nonDynamic = configNames.intersect(DynamicConfig.Broker.NON_DYNAMIC_PROPS.asScala)
require(nonDynamic.isEmpty, s"Reconfigurable contains non-dynamic configs $nonDynamic")
}

Expand Down
111 changes: 0 additions & 111 deletions core/src/main/scala/kafka/server/DynamicConfig.scala

This file was deleted.

1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.config.DynamicConfig
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -177,6 +178,15 @@ public class SocketServerConfigs {

public static final Map<ListenerName, SecurityProtocol> DEFAULT_NAME_TO_SECURITY_PROTO;

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG,
SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
SocketServerConfigs.LISTENERS_CONFIG,
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG);

static {
HashMap<ListenerName, SecurityProtocol> nameToSecurityProtocol = new HashMap<>();
for (SecurityProtocol securityProtocol : SecurityProtocol.values()) {
Expand Down
Loading