diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 0685d1d49b76a..741fd332fa31c 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1415,4 +1415,21 @@ public static Map initializeMap(Collection keys, Supplier val return res; } + /** + * Get an array containing all of the {@link Object#toString names} of a given enumerable type. + * @param enumClass the enum class; may not be null + * @return an array with the names of every value for the enum class; never null, but may be empty + * if there are no values defined for the enum + */ + public static String[] enumOptions(Class> enumClass) { + Objects.requireNonNull(enumClass); + if (!enumClass.isEnum()) { + throw new IllegalArgumentException("Class " + enumClass + " is not an enumerable type"); + } + + return Stream.of(enumClass.getEnumConstants()) + .map(Object::toString) + .toArray(String[]::new); + } + } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java index c1b83eb648828..7adbd8f92dfd8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java @@ -31,6 +31,8 @@ import org.slf4j.LoggerFactory; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ExecutionException; public final class ConnectUtils { @@ -72,6 +74,73 @@ static String lookupKafkaClusterId(Admin adminClient) { } } + /** + * Ensure that the {@link Map properties} contain an expected value for the given key, inserting the + * expected value into the properties if necessary. + * + *

If there is a pre-existing value for the key in the properties, log a warning to the user + * that this value will be ignored, and the expected value will be used instead. + * + * @param props the configuration properties provided by the user; may not be null + * @param key the name of the property to check on; may not be null + * @param expectedValue the expected value for the property; may not be null + * @param justification the reason the property cannot be overridden. + * Will follow the phrase "The value... for the... property will be ignored as it cannot be overridden ". + * For example, one might supply the message "in connectors with the DLQ feature enabled" for this parameter. + * May be null (in which case, no justification is given to the user in the logged warning message) + * @param caseSensitive whether the value should match case-insensitively + */ + public static void ensureProperty( + Map props, + String key, + String expectedValue, + String justification, + boolean caseSensitive + ) { + ensurePropertyAndGetWarning(props, key, expectedValue, justification, caseSensitive).ifPresent(log::warn); + } + + // Visible for testing + /** + * Ensure that a given key has an expected value in the properties, inserting the expected value into the + * properties if necessary. If a user-supplied value is overridden, return a warning message that can + * be logged to the user notifying them of this fact. + * + * @return an {@link Optional} containing a warning that should be logged to the user if a value they + * supplied in the properties is being overridden, or {@link Optional#empty()} if no such override has + * taken place + */ + static Optional ensurePropertyAndGetWarning( + Map props, + String key, + String expectedValue, + String justification, + boolean caseSensitive) { + if (!props.containsKey(key)) { + // Insert the expected value + props.put(key, expectedValue); + // But don't issue a warning to the user + return Optional.empty(); + } + + String value = Objects.toString(props.get(key)); + boolean matchesExpectedValue = caseSensitive ? expectedValue.equals(value) : expectedValue.equalsIgnoreCase(value); + if (matchesExpectedValue) { + return Optional.empty(); + } + + // Insert the expected value + props.put(key, expectedValue); + + justification = justification != null ? " " + justification : ""; + // And issue a warning to the user + return Optional.of(String.format( + "The value '%s' for the '%s' property will be ignored as it cannot be overridden%s. " + + "The value '%s' will be used instead.", + value, key, justification, expectedValue + )); + } + public static void addMetricsContextProperties(Map prop, WorkerConfig config, String clusterId) { //add all properties predefined with "metrics.context." prop.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false)); @@ -90,4 +159,5 @@ public static boolean isSinkConnector(Connector connector) { public static boolean isSourceConnector(Connector connector) { return SourceConnector.class.isAssignableFrom(connector.getClass()); } + } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java index 62a01b706a832..d1330277e8e45 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java @@ -26,9 +26,11 @@ import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -101,4 +103,73 @@ public void testAddMetricsContextPropertiesStandalone() { assertEquals("cluster-1", prop.get(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_KAFKA_CLUSTER_ID)); } + + @Test + public void testNoOverrideWarning() { + Map props = new HashMap<>(); + assertEquals( + Optional.empty(), + ConnectUtils.ensurePropertyAndGetWarning(props, "key", "value", "because i say so", true) + ); + assertEquals("value", props.get("key")); + + props.clear(); + assertEquals( + Optional.empty(), + ConnectUtils.ensurePropertyAndGetWarning(props, "key", "value", "because i say so", false) + ); + assertEquals("value", props.get("key")); + + props.clear(); + props.put("key", "value"); + assertEquals( + Optional.empty(), + ConnectUtils.ensurePropertyAndGetWarning(props, "key", "value", "because i say so", true) + ); + assertEquals("value", props.get("key")); + + props.clear(); + props.put("key", "VALUE"); + assertEquals( + Optional.empty(), + ConnectUtils.ensurePropertyAndGetWarning(props, "key", "value", "because i say so", false) + ); + assertEquals("VALUE", props.get("key")); + } + + @Test + public void testOverrideWarning() { + Map props = new HashMap<>(); + props.put("\u1984", "little brother"); + String expectedWarning = "The value 'little brother' for the '\u1984' property will be ignored as it cannot be overridden " + + "thanks to newly-introduced federal legislation. " + + "The value 'big brother' will be used instead."; + assertEquals( + Optional.of(expectedWarning), + ConnectUtils.ensurePropertyAndGetWarning( + props, + "\u1984", + "big brother", + "thanks to newly-introduced federal legislation", + false) + ); + assertEquals(Collections.singletonMap("\u1984", "big brother"), props); + + props.clear(); + props.put("\u1984", "BIG BROTHER"); + expectedWarning = "The value 'BIG BROTHER' for the '\u1984' property will be ignored as it cannot be overridden " + + "thanks to newly-introduced federal legislation. " + + "The value 'big brother' will be used instead."; + assertEquals( + Optional.of(expectedWarning), + ConnectUtils.ensurePropertyAndGetWarning( + props, + "\u1984", + "big brother", + "thanks to newly-introduced federal legislation", + true) + ); + assertEquals(Collections.singletonMap("\u1984", "big brother"), props); + } + }