Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ object ConfigCommand extends Logging {
encoderConfigs.get(KafkaConfig.PasswordEncoderSecretProp)
val encoderSecret = encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderSecretProp,
throw new IllegalArgumentException("Password encoder secret not specified"))
new PasswordEncoder(new Password(encoderSecret),
PasswordEncoder.encrypting(new Password(encoderSecret),
None,
encoderConfigs.get(KafkaConfig.PasswordEncoderCipherAlgorithmProp).getOrElse(Defaults.PasswordEncoderCipherAlgorithm),
encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderKeyLength),
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var currentConfig: KafkaConfig = null
private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
} else {
Some(PasswordEncoder.noop())
}

private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = {
currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
Expand Down Expand Up @@ -340,7 +344,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging

private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = {
secret.map { secret =>
new PasswordEncoder(secret,
PasswordEncoder.encrypting(secret,
kafkaConfig.passwordEncoderKeyFactoryAlgorithm,
kafkaConfig.passwordEncoderCipherAlgorithm,
kafkaConfig.passwordEncoderKeyLength,
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ trait KafkaBroker extends KafkaMetricsGroup {
def shutdown(): Unit
def brokerTopicStats: BrokerTopicStats
def credentialProvider: CredentialProvider
def clientToControllerChannelManager: BrokerToControllerChannelManager

// For backwards compatibility, we need to keep older metrics tied
// to their original name when this class was named `KafkaServer`
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami

// Cache the current config to avoid acquiring read lock to access from dynamicConfig
@volatile private var currentConfig = this
val processRoles: Set[ProcessRole] = parseProcessRoles()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, what's the reason to move this?

Copy link
Member Author

@mumrah mumrah Jul 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the line below, we are creating the DynamicBrokerConfig which gets a partially initialized KafkaConfig. In this PR, we're now reading the processRoles to determine which encoder to create. Since the KafkaConfig isn't fully initialized, this was null when DynamicBrokerConfig was constructed.

Moving parseProcessRoles up here seemed simpler than refactoring a bunch of this config code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the explanation. sounds good

private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this))

private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = {
Expand Down Expand Up @@ -1612,7 +1613,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
val nodeId: Int = getInt(KafkaConfig.NodeIdProp)
val processRoles: Set[ProcessRole] = parseProcessRoles()
val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp)
val brokerHeartbeatIntervalMs: Int = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp)
val brokerSessionTimeoutMs: Int = getInt(KafkaConfig.BrokerSessionTimeoutMsProp)
Expand Down
45 changes: 36 additions & 9 deletions core/src/main/scala/kafka/utils/PasswordEncoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,33 @@ object PasswordEncoder {
val IterationsProp = "iterations"
val EncyrptedPasswordProp = "encryptedPassword"
val PasswordLengthProp = "passwordLength"

def encrypting(secret: Password,
keyFactoryAlgorithm: Option[String],
cipherAlgorithm: String,
keyLength: Int,
iterations: Int): EncryptingPasswordEncoder = {
new EncryptingPasswordEncoder(secret, keyFactoryAlgorithm, cipherAlgorithm, keyLength, iterations)
}

def noop(): NoOpPasswordEncoder = {
new NoOpPasswordEncoder()
}
}

trait PasswordEncoder {
def encode(password: Password): String
def decode(encodedPassword: String): Password

private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded)
}

/**
* A password encoder that does not modify the given password. This is used in KRaft mode only.
*/
class NoOpPasswordEncoder extends PasswordEncoder {
override def encode(password: Password): String = password.value()
override def decode(encodedPassword: String): Password = new Password(encodedPassword)
}

/**
Expand All @@ -55,16 +82,18 @@ object PasswordEncoder {
* The values used for encoding are stored along with the encoded password and the stored values are used for decoding.
*
*/
class PasswordEncoder(secret: Password,
keyFactoryAlgorithm: Option[String],
cipherAlgorithm: String,
keyLength: Int,
iterations: Int) extends Logging {
class EncryptingPasswordEncoder(
secret: Password,
keyFactoryAlgorithm: Option[String],
cipherAlgorithm: String,
keyLength: Int,
iterations: Int
) extends PasswordEncoder with Logging {

private val secureRandom = new SecureRandom
private val cipherParamsEncoder = cipherParamsInstance(cipherAlgorithm)

def encode(password: Password): String = {
override def encode(password: Password): String = {
val salt = new Array[Byte](256)
secureRandom.nextBytes(salt)
val cipher = Cipher.getInstance(cipherAlgorithm)
Expand All @@ -84,7 +113,7 @@ class PasswordEncoder(secret: Password,
encryptedMap.map { case (k, v) => s"$k:$v" }.mkString(",")
}

def decode(encodedPassword: String): Password = {
override def decode(encodedPassword: String): Password = {
val params = CoreUtils.parseCsvMap(encodedPassword)
val keyFactoryAlg = params(KeyFactoryAlgorithmProp)
val cipherAlg = params(CipherAlgorithmProp)
Expand Down Expand Up @@ -131,8 +160,6 @@ class PasswordEncoder(secret: Password,

private def base64Encode(bytes: Array[Byte]): String = Base64.getEncoder.encodeToString(bytes)

private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded)

private def cipherParamsInstance(cipherAlgorithm: String): CipherParamsEncoder = {
val aesPattern = "AES/(.*)/.*".r
cipherAlgorithm match {
Expand Down
Loading