diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java index 7cf5d67899e61..08a32f2c89c69 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP; import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX; @@ -38,6 +40,8 @@ public class SourceConnectorConfig extends ConnectorConfig { + private static final Logger log = LoggerFactory.getLogger(SourceConnectorConfig.class); + protected static final String TOPIC_CREATION_GROUP = "Topic Creation"; public static final String TOPIC_CREATION_PREFIX = "topic.creation."; @@ -98,6 +102,13 @@ public static ConfigDef enrich(ConfigDef baseConfigDef, Map prop topicCreationGroups.addAll((List) aliases); } + //Remove "topic.creation.groups" config if its present and the value is "default" + if (topicCreationGroups.contains(DEFAULT_TOPIC_CREATION_GROUP)) { + log.warn("'{}' topic creation group always exists and does not need to be listed explicitly", + DEFAULT_TOPIC_CREATION_GROUP); + topicCreationGroups.removeAll(Collections.singleton(DEFAULT_TOPIC_CREATION_GROUP)); + } + ConfigDef newDef = new ConfigDef(baseConfigDef); String defaultGroupPrefix = TOPIC_CREATION_PREFIX + DEFAULT_TOPIC_CREATION_GROUP + "."; short defaultGroupReplicationFactor = defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java index 1972b62e81113..251bb72fbe2d9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java @@ -33,6 +33,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfigTest.MOCK_PLUGINS; +import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP; import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX; import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; @@ -47,6 +48,8 @@ public class SourceConnectorConfigTest { private static final String FOO_CONNECTOR = "foo-source"; + private static final String TOPIC_CREATION_GROUP_1 = "group1"; + private static final String TOPIC_CREATION_GROUP_2 = "group2"; private static final short DEFAULT_REPLICATION_FACTOR = -1; private static final int DEFAULT_PARTITIONS = -1; @@ -64,6 +67,16 @@ public Map defaultConnectorPropsWithTopicCreation() { return props; } + @Test + public void shouldNotFailWithExplicitlySpecifiedDefaultTopicCreationGroup() { + Map props = defaultConnectorProps(); + props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", DEFAULT_TOPIC_CREATION_GROUP, + TOPIC_CREATION_GROUP_1, TOPIC_CREATION_GROUP_2)); + props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, "1"); + props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, "1"); + SourceConnectorConfig config = new SourceConnectorConfig(MOCK_PLUGINS, props, true); + } + @Test public void noTopicCreation() { Map props = defaultConnectorProps();