diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 4b8561a4d9089..e9e43891caba2 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -381,7 +381,7 @@ public void replay(ConfigRecord record) { if (configs.isEmpty()) { configData.remove(configResource); } - log.info("{}: set configuration {} to {}", configResource, record.name(), record.value()); + log.info("{}: set configuration {} to {}", configResource, record.name(), configSchema.getLoggableValue(record)); } // VisibleForTesting diff --git a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java index d15d6623e38b8..0c33edd47d662 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.Locale; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -141,9 +142,21 @@ public boolean isSensitive(ConfigResource.Type type, String key) { if (configDef == null) return true; ConfigDef.ConfigKey configKey = configDef.configKeys().get(key); if (configKey == null) return true; + if (key.toUpperCase(Locale.ROOT).contains(ConfigDef.Type.PASSWORD.toString())) return true; return configKey.type.isSensitive(); } + /** + * Sensitive configs such as Password must be hidden from logging. + * */ + public String getLoggableValue(ConfigRecord record) { + if (record == null) return null; + if (isSensitive(record)) { + return Password.HIDDEN; + } + return record.value(); + } + /** * Get the default value of the configuration key, or null if no default is specified. */ diff --git a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java index 36089d0f9a4f2..68db647f2a0ca 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java @@ -20,6 +20,8 @@ import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -36,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNull; @Timeout(value = 40) @@ -48,7 +51,8 @@ public class KafkaConfigSchemaTest { define("baz", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "baz doc"). define("quux", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "quux doc"). define("quuux", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux doc"). - define("quuux2", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux2 doc")); + define("quuux2", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux2 doc"). + define("ssl.password", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "password doc")); CONFIGS.put(TOPIC, new ConfigDef(). define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc doc"). define("def", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "def doc"). @@ -162,4 +166,18 @@ ConfigEntry.ConfigSource.DEFAULT_CONFIG, true, false, emptyList(), dynamicNodeConfigs, dynamicTopicConfigs)); } + + @Test + public void testGetLoggableValue() { + ConfigRecord record1 = new ConfigRecord(). + setResourceType(BROKER.id()).setResourceName("0"). + setName("foo.bar").setValue("bar"); + assertEquals(SCHEMA.getLoggableValue(record1), record1.value()); + ConfigRecord record2 = null; + assertNull(SCHEMA.getLoggableValue(record2)); + ConfigRecord record3 = new ConfigRecord(). + setResourceType(BROKER.id()).setResourceName("0"). + setName("ssl.password").setValue("bar"); + assertEquals(SCHEMA.getLoggableValue(record3), Password.HIDDEN); + } }