Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1415,4 +1415,21 @@ public static <K, V> Map<K, V> initializeMap(Collection<K> keys, Supplier<V> val
return res;
}

/**
* Get an array containing all of the {@link Object#toString names} of a given enumerable type.
* @param enumClass the enum class; may not be null
* @return an array with the names of every value for the enum class; never null, but may be empty
* if there are no values defined for the enum
*/
public static String[] enumOptions(Class<? extends Enum<?>> enumClass) {
Objects.requireNonNull(enumClass);
if (!enumClass.isEnum()) {
throw new IllegalArgumentException("Class " + enumClass + " is not an enumerable type");
}

return Stream.of(enumClass.getEnumConstants())
.map(Object::toString)
.toArray(String[]::new);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

public final class ConnectUtils {
Expand Down Expand Up @@ -72,6 +74,73 @@ static String lookupKafkaClusterId(Admin adminClient) {
}
}

/**
* Ensure that the {@link Map properties} contain an expected value for the given key, inserting the
* expected value into the properties if necessary.
*
* <p>If there is a pre-existing value for the key in the properties, log a warning to the user
* that this value will be ignored, and the expected value will be used instead.
*
* @param props the configuration properties provided by the user; may not be null
* @param key the name of the property to check on; may not be null
* @param expectedValue the expected value for the property; may not be null
* @param justification the reason the property cannot be overridden.
* Will follow the phrase "The value... for the... property will be ignored as it cannot be overridden ".
* For example, one might supply the message "in connectors with the DLQ feature enabled" for this parameter.
* May be null (in which case, no justification is given to the user in the logged warning message)
* @param caseSensitive whether the value should match case-insensitively
*/
public static void ensureProperty(
Map<String, ? super String> props,
String key,
String expectedValue,
String justification,
boolean caseSensitive
) {
ensurePropertyAndGetWarning(props, key, expectedValue, justification, caseSensitive).ifPresent(log::warn);
}

// Visible for testing
/**
* Ensure that a given key has an expected value in the properties, inserting the expected value into the
* properties if necessary. If a user-supplied value is overridden, return a warning message that can
* be logged to the user notifying them of this fact.
*
* @return an {@link Optional} containing a warning that should be logged to the user if a value they
* supplied in the properties is being overridden, or {@link Optional#empty()} if no such override has
* taken place
*/
static Optional<String> ensurePropertyAndGetWarning(
Map<String, ? super String> props,
String key,
String expectedValue,
String justification,
boolean caseSensitive) {
if (!props.containsKey(key)) {
// Insert the expected value
props.put(key, expectedValue);
// But don't issue a warning to the user
return Optional.empty();
}

String value = Objects.toString(props.get(key));
boolean matchesExpectedValue = caseSensitive ? expectedValue.equals(value) : expectedValue.equalsIgnoreCase(value);
if (matchesExpectedValue) {
return Optional.empty();
}

// Insert the expected value
props.put(key, expectedValue);

justification = justification != null ? " " + justification : "";
// And issue a warning to the user
return Optional.of(String.format(
"The value '%s' for the '%s' property will be ignored as it cannot be overridden%s. "
+ "The value '%s' will be used instead.",
value, key, justification, expectedValue
));
}

public static void addMetricsContextProperties(Map<String, Object> prop, WorkerConfig config, String clusterId) {
//add all properties predefined with "metrics.context."
prop.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false));
Expand All @@ -90,4 +159,5 @@ public static boolean isSinkConnector(Connector connector) {
public static boolean isSourceConnector(Connector connector) {
return SourceConnector.class.isAssignableFrom(connector.getClass());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -101,4 +103,73 @@ public void testAddMetricsContextPropertiesStandalone() {
assertEquals("cluster-1", prop.get(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));

}

@Test
public void testNoOverrideWarning() {
Map<String, ? super String> props = new HashMap<>();
assertEquals(
Optional.empty(),
ConnectUtils.ensurePropertyAndGetWarning(props, "key", "value", "because i say so", true)
);
assertEquals("value", props.get("key"));

props.clear();
assertEquals(
Optional.empty(),
ConnectUtils.ensurePropertyAndGetWarning(props, "key", "value", "because i say so", false)
);
assertEquals("value", props.get("key"));

props.clear();
props.put("key", "value");
assertEquals(
Optional.empty(),
ConnectUtils.ensurePropertyAndGetWarning(props, "key", "value", "because i say so", true)
);
assertEquals("value", props.get("key"));

props.clear();
props.put("key", "VALUE");
assertEquals(
Optional.empty(),
ConnectUtils.ensurePropertyAndGetWarning(props, "key", "value", "because i say so", false)
);
assertEquals("VALUE", props.get("key"));
}

@Test
public void testOverrideWarning() {
Map<String, ? super String> props = new HashMap<>();
props.put("\u1984", "little brother");
String expectedWarning = "The value 'little brother' for the '\u1984' property will be ignored as it cannot be overridden "
+ "thanks to newly-introduced federal legislation. "
+ "The value 'big brother' will be used instead.";
assertEquals(
Optional.of(expectedWarning),
ConnectUtils.ensurePropertyAndGetWarning(
props,
"\u1984",
"big brother",
"thanks to newly-introduced federal legislation",
false)
);
assertEquals(Collections.singletonMap("\u1984", "big brother"), props);

props.clear();
props.put("\u1984", "BIG BROTHER");
expectedWarning = "The value 'BIG BROTHER' for the '\u1984' property will be ignored as it cannot be overridden "
+ "thanks to newly-introduced federal legislation. "
+ "The value 'big brother' will be used instead.";
assertEquals(
Optional.of(expectedWarning),
ConnectUtils.ensurePropertyAndGetWarning(
props,
"\u1984",
"big brother",
"thanks to newly-introduced federal legislation",
true)
);
assertEquals(Collections.singletonMap("\u1984", "big brother"), props);
}

}