From 5f85c8e0cb70e23a7dd1b16ff4664cf3d14263df Mon Sep 17 00:00:00 2001 From: premkamal23 Date: Wed, 3 Aug 2022 21:39:31 +0530 Subject: [PATCH 1/3] "[KAFKA-14115] Redacting sensitive config values for KRaft" --- .../ConfigurationControlManager.java | 2 +- .../kafka/metadata/KafkaConfigSchema.java | 13 ++++++++++++ .../kafka/metadata/KafkaConfigSchemaTest.java | 20 ++++++++++++++++++- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 4b8561a4d9089..e9e43891caba2 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -381,7 +381,7 @@ public void replay(ConfigRecord record) { if (configs.isEmpty()) { configData.remove(configResource); } - log.info("{}: set configuration {} to {}", configResource, record.name(), record.value()); + log.info("{}: set configuration {} to {}", configResource, record.name(), configSchema.getLoggableValue(record)); } // VisibleForTesting diff --git a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java index d15d6623e38b8..0c33edd47d662 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.Locale; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -141,9 +142,21 @@ public boolean isSensitive(ConfigResource.Type type, String key) { if (configDef == null) return true; ConfigDef.ConfigKey configKey = configDef.configKeys().get(key); if (configKey == null) return true; + if (key.toUpperCase(Locale.ROOT).contains(ConfigDef.Type.PASSWORD.toString())) return true; return configKey.type.isSensitive(); } + /** + * Sensitive configs such as Password must be hidden from logging. + * */ + public String getLoggableValue(ConfigRecord record) { + if (record == null) return null; + if (isSensitive(record)) { + return Password.HIDDEN; + } + return record.value(); + } + /** * Get the default value of the configuration key, or null if no default is specified. */ diff --git a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java index 36089d0f9a4f2..68db647f2a0ca 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java @@ -20,6 +20,8 @@ import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -36,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNull; @Timeout(value = 40) @@ -48,7 +51,8 @@ public class KafkaConfigSchemaTest { define("baz", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "baz doc"). define("quux", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "quux doc"). define("quuux", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux doc"). - define("quuux2", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux2 doc")); + define("quuux2", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux2 doc"). + define("ssl.password", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "password doc")); CONFIGS.put(TOPIC, new ConfigDef(). define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc doc"). define("def", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "def doc"). @@ -162,4 +166,18 @@ ConfigEntry.ConfigSource.DEFAULT_CONFIG, true, false, emptyList(), dynamicNodeConfigs, dynamicTopicConfigs)); } + + @Test + public void testGetLoggableValue() { + ConfigRecord record1 = new ConfigRecord(). + setResourceType(BROKER.id()).setResourceName("0"). + setName("foo.bar").setValue("bar"); + assertEquals(SCHEMA.getLoggableValue(record1), record1.value()); + ConfigRecord record2 = null; + assertNull(SCHEMA.getLoggableValue(record2)); + ConfigRecord record3 = new ConfigRecord(). + setResourceType(BROKER.id()).setResourceName("0"). + setName("ssl.password").setValue("bar"); + assertEquals(SCHEMA.getLoggableValue(record3), Password.HIDDEN); + } } From a2e6d9691f2dcb64ffbc825d02a348d9549e4fc3 Mon Sep 17 00:00:00 2001 From: premkamal23 Date: Wed, 3 Aug 2022 22:19:05 +0530 Subject: [PATCH 2/3] Revert ""[KAFKA-14115] Redacting sensitive config values for KRaft"" This reverts commit 5f85c8e0 --- .../ConfigurationControlManager.java | 2 +- .../kafka/metadata/KafkaConfigSchema.java | 13 ------------ .../kafka/metadata/KafkaConfigSchemaTest.java | 20 +------------------ 3 files changed, 2 insertions(+), 33 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index e9e43891caba2..4b8561a4d9089 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -381,7 +381,7 @@ public void replay(ConfigRecord record) { if (configs.isEmpty()) { configData.remove(configResource); } - log.info("{}: set configuration {} to {}", configResource, record.name(), configSchema.getLoggableValue(record)); + log.info("{}: set configuration {} to {}", configResource, record.name(), record.value()); } // VisibleForTesting diff --git a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java index 0c33edd47d662..d15d6623e38b8 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.function.Function; -import java.util.Locale; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -142,21 +141,9 @@ public boolean isSensitive(ConfigResource.Type type, String key) { if (configDef == null) return true; ConfigDef.ConfigKey configKey = configDef.configKeys().get(key); if (configKey == null) return true; - if (key.toUpperCase(Locale.ROOT).contains(ConfigDef.Type.PASSWORD.toString())) return true; return configKey.type.isSensitive(); } - /** - * Sensitive configs such as Password must be hidden from logging. - * */ - public String getLoggableValue(ConfigRecord record) { - if (record == null) return null; - if (isSensitive(record)) { - return Password.HIDDEN; - } - return record.value(); - } - /** * Get the default value of the configuration key, or null if no default is specified. */ diff --git a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java index 68db647f2a0ca..36089d0f9a4f2 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java @@ -20,8 +20,6 @@ import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigResource; -import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -38,7 +36,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertNull; @Timeout(value = 40) @@ -51,8 +48,7 @@ public class KafkaConfigSchemaTest { define("baz", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "baz doc"). define("quux", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "quux doc"). define("quuux", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux doc"). - define("quuux2", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux2 doc"). - define("ssl.password", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "password doc")); + define("quuux2", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux2 doc")); CONFIGS.put(TOPIC, new ConfigDef(). define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc doc"). define("def", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "def doc"). @@ -166,18 +162,4 @@ ConfigEntry.ConfigSource.DEFAULT_CONFIG, true, false, emptyList(), dynamicNodeConfigs, dynamicTopicConfigs)); } - - @Test - public void testGetLoggableValue() { - ConfigRecord record1 = new ConfigRecord(). - setResourceType(BROKER.id()).setResourceName("0"). - setName("foo.bar").setValue("bar"); - assertEquals(SCHEMA.getLoggableValue(record1), record1.value()); - ConfigRecord record2 = null; - assertNull(SCHEMA.getLoggableValue(record2)); - ConfigRecord record3 = new ConfigRecord(). - setResourceType(BROKER.id()).setResourceName("0"). - setName("ssl.password").setValue("bar"); - assertEquals(SCHEMA.getLoggableValue(record3), Password.HIDDEN); - } } From 1039d2d94a90a811435598034dd07e4607ebc39a Mon Sep 17 00:00:00 2001 From: premkamal23 Date: Wed, 3 Aug 2022 22:23:29 +0530 Subject: [PATCH 3/3] KAFKA-14115 Redact sensitive config values for logging --- .../ConfigurationControlManager.java | 2 +- .../kafka/metadata/KafkaConfigSchema.java | 13 ++++++++++++ .../kafka/metadata/KafkaConfigSchemaTest.java | 20 ++++++++++++++++++- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 4b8561a4d9089..e9e43891caba2 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -381,7 +381,7 @@ public void replay(ConfigRecord record) { if (configs.isEmpty()) { configData.remove(configResource); } - log.info("{}: set configuration {} to {}", configResource, record.name(), record.value()); + log.info("{}: set configuration {} to {}", configResource, record.name(), configSchema.getLoggableValue(record)); } // VisibleForTesting diff --git a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java index d15d6623e38b8..0c33edd47d662 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.Locale; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -141,9 +142,21 @@ public boolean isSensitive(ConfigResource.Type type, String key) { if (configDef == null) return true; ConfigDef.ConfigKey configKey = configDef.configKeys().get(key); if (configKey == null) return true; + if (key.toUpperCase(Locale.ROOT).contains(ConfigDef.Type.PASSWORD.toString())) return true; return configKey.type.isSensitive(); } + /** + * Sensitive configs such as Password must be hidden from logging. + * */ + public String getLoggableValue(ConfigRecord record) { + if (record == null) return null; + if (isSensitive(record)) { + return Password.HIDDEN; + } + return record.value(); + } + /** * Get the default value of the configuration key, or null if no default is specified. */ diff --git a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java index 36089d0f9a4f2..68db647f2a0ca 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java @@ -20,6 +20,8 @@ import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -36,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNull; @Timeout(value = 40) @@ -48,7 +51,8 @@ public class KafkaConfigSchemaTest { define("baz", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "baz doc"). define("quux", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "quux doc"). define("quuux", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux doc"). - define("quuux2", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux2 doc")); + define("quuux2", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux2 doc"). + define("ssl.password", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "password doc")); CONFIGS.put(TOPIC, new ConfigDef(). define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc doc"). define("def", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "def doc"). @@ -162,4 +166,18 @@ ConfigEntry.ConfigSource.DEFAULT_CONFIG, true, false, emptyList(), dynamicNodeConfigs, dynamicTopicConfigs)); } + + @Test + public void testGetLoggableValue() { + ConfigRecord record1 = new ConfigRecord(). + setResourceType(BROKER.id()).setResourceName("0"). + setName("foo.bar").setValue("bar"); + assertEquals(SCHEMA.getLoggableValue(record1), record1.value()); + ConfigRecord record2 = null; + assertNull(SCHEMA.getLoggableValue(record2)); + ConfigRecord record3 = new ConfigRecord(). + setResourceType(BROKER.id()).setResourceName("0"). + setName("ssl.password").setValue("bar"); + assertEquals(SCHEMA.getLoggableValue(record3), Password.HIDDEN); + } }