From d3e6ca76d382a376b14df74a9ae3bcc3e7c762f3 Mon Sep 17 00:00:00 2001 From: Andrey Pleskach Date: Wed, 10 Jun 2020 17:01:16 +0200 Subject: [PATCH] Add new config parameter: topics.to.tables.mapping --- .../aiven/connect/jdbc/sink/JdbcDbWriter.java | 35 ++++++--- .../connect/jdbc/sink/JdbcSinkConfig.java | 60 +++++++++++++-- .../connect/jdbc/sink/JdbcDbWriterTest.java | 31 ++++++++ .../connect/jdbc/sink/JdbcSinkConfigTest.java | 74 +++++++++++++++++++ 4 files changed, 184 insertions(+), 16 deletions(-) create mode 100644 src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java diff --git a/src/main/java/io/aiven/connect/jdbc/sink/JdbcDbWriter.java b/src/main/java/io/aiven/connect/jdbc/sink/JdbcDbWriter.java index f60a2638..f7bbf73d 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/JdbcDbWriter.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/JdbcDbWriter.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; @@ -91,21 +92,33 @@ void closeQuietly() { TableId destinationTable(final String topic) { final String tableName = generateTableNameFor(topic); - if (tableName.isEmpty()) { - throw new ConnectException(String.format( - "Destination table name for topic '%s' is empty using the format string '%s'", - topic, - config.tableNameFormat - )); - } return dbDialect.parseTableIdentifier(tableName); } public String generateTableNameFor(final String topic) { - final String tableName = config.tableNameFormat.replace("${topic}", topic); - return config.tableNameNormalize - ? NORMALIZE_TABLE_NAME_FOR_TOPIC.matcher(tableName).replaceAll("_") - : tableName; + String tableName = config.tableNameFormat.replace("${topic}", topic); + if (config.tableNameNormalize) { + tableName = NORMALIZE_TABLE_NAME_FOR_TOPIC.matcher(tableName).replaceAll("_"); + } + if (!config.topicsToTablesMapping.isEmpty()) { + tableName = config.topicsToTablesMapping.getOrDefault(topic, ""); + } + if (tableName.isEmpty()) { + final String errorMessage = + String.format( + "Destination table for the topic: '%s' " + + "couldn't be found in the topics to tables mapping: '%s' " + + "and couldn't be generated for the format string '%s'", + topic, + config.topicsToTablesMapping + .entrySet() + .stream() + .map(e -> String.join("->", e.getKey(), e.getValue())) + .collect(Collectors.joining(",")), + config.tableNameFormat); + throw new ConnectException(errorMessage); + } + return tableName; } } diff --git a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java index e37256f3..2bc4270a 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java @@ -24,8 +24,10 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TimeZone; +import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -57,7 +59,7 @@ public enum PrimaryKeyMode { ); public static final String TABLE_NAME_FORMAT = "table.name.format"; - private static final String TABLE_NAME_FORMAT_DEFAULT = "${topic}"; + public static final String TABLE_NAME_FORMAT_DEFAULT = "${topic}"; private static final String TABLE_NAME_FORMAT_DOC = "A format string for the destination table name, which may contain '${topic}' as a " + "placeholder for the originating topic name.\n" @@ -73,6 +75,14 @@ public enum PrimaryKeyMode { + "remain as is, others (like ``.``) are replaced with ``_``."; private static final String TABLE_NAME_NORMALIZE_DISPLAY = "Table Name Normalize"; + public static final String TOPICS_TO_TABLES_MAPPING = "topics.to.tables.mapping"; + private static final String TOPICS_TO_TABLES_MAPPING_DOC = + "Kafka topics to database tables mapping. " + + "Comma-separated list of topic to table mapping in the format: topic_name:table_name. " + + "If the destination table found in the mapping, " + + "it would override generated one defined in " + TABLE_NAME_FORMAT + "."; + private static final String TOPICS_TO_TABLES_MAPPING_DISPLAY = "Topics To Tables Mapping"; + public static final String MAX_RETRIES = "max.retries"; private static final int MAX_RETRIES_DEFAULT = 10; private static final String MAX_RETRIES_DOC = @@ -224,13 +234,43 @@ public enum PrimaryKeyMode { TABLE_NAME_NORMALIZE, ConfigDef.Type.BOOLEAN, TABLE_NAME_NORMALIZE_DEFAULT, - ConfigDef.Importance.LOW, + ConfigDef.Importance.MEDIUM, TABLE_NAME_NORMALIZE_DOC, DATAMAPPING_GROUP, 2, ConfigDef.Width.LONG, TABLE_NAME_NORMALIZE_DISPLAY ) + .define( + TOPICS_TO_TABLES_MAPPING, + ConfigDef.Type.LIST, + null, + new ConfigDef.Validator() { + @Override + public void ensureValid(final String name, final Object value) { + if (Objects.isNull(value) + || ConfigDef.NO_DEFAULT_VALUE == value + || "".equals(value)) { + return; + } + assert value instanceof List; + try { + final Map mapping = topicToTableMapping((List) value); + if (Objects.isNull(mapping) || mapping.isEmpty()) { + throw new ConfigException(name, value, "Invalid topics to tables mapping"); + } + } catch (final ArrayIndexOutOfBoundsException e) { + throw new ConfigException(name, value, "Invalid topics to tables mapping"); + } + } + }, + ConfigDef.Importance.MEDIUM, + TOPICS_TO_TABLES_MAPPING_DOC, + DATAMAPPING_GROUP, + 3, + ConfigDef.Width.LONG, + TOPICS_TO_TABLES_MAPPING_DISPLAY + ) .define( PK_MODE, ConfigDef.Type.STRING, @@ -239,7 +279,7 @@ public enum PrimaryKeyMode { ConfigDef.Importance.HIGH, PK_MODE_DOC, DATAMAPPING_GROUP, - 3, + 4, ConfigDef.Width.MEDIUM, PK_MODE_DISPLAY ) @@ -250,7 +290,7 @@ public enum PrimaryKeyMode { ConfigDef.Importance.MEDIUM, PK_FIELDS_DOC, DATAMAPPING_GROUP, - 4, + 5, ConfigDef.Width.LONG, PK_FIELDS_DISPLAY ) .define( @@ -260,7 +300,7 @@ public enum PrimaryKeyMode { ConfigDef.Importance.MEDIUM, FIELDS_WHITELIST_DOC, DATAMAPPING_GROUP, - 5, + 6, ConfigDef.Width.LONG, FIELDS_WHITELIST_DISPLAY ); @@ -315,6 +355,7 @@ public enum PrimaryKeyMode { } public final String tableNameFormat; + public final Map topicsToTablesMapping; public final boolean tableNameNormalize; public final int batchSize; public final int maxRetries; @@ -331,6 +372,7 @@ public JdbcSinkConfig(final Map props) { super(CONFIG_DEF, props); tableNameFormat = getString(TABLE_NAME_FORMAT).trim(); tableNameNormalize = getBoolean(TABLE_NAME_NORMALIZE); + topicsToTablesMapping = topicToTableMapping(getList(TOPICS_TO_TABLES_MAPPING)); batchSize = getInt(BATCH_SIZE); maxRetries = getInt(MAX_RETRIES); retryBackoffMs = getInt(RETRY_BACKOFF_MS); @@ -344,6 +386,14 @@ public JdbcSinkConfig(final Map props) { timeZone = TimeZone.getTimeZone(ZoneId.of(dbTimeZone)); } + static Map topicToTableMapping(final List value) { + return (Objects.nonNull(value)) + ? value.stream() + .map(s -> s.split(":")) + .collect(Collectors.toMap(e -> e[0], e -> e[1])) + : Collections.emptyMap(); + } + private static class EnumValidator implements ConfigDef.Validator { private final List canonicalValues; private final Set validValues; diff --git a/src/test/java/io/aiven/connect/jdbc/sink/JdbcDbWriterTest.java b/src/test/java/io/aiven/connect/jdbc/sink/JdbcDbWriterTest.java index f86cf3b5..22b26192 100644 --- a/src/test/java/io/aiven/connect/jdbc/sink/JdbcDbWriterTest.java +++ b/src/test/java/io/aiven/connect/jdbc/sink/JdbcDbWriterTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import io.aiven.connect.jdbc.dialect.DatabaseDialect; @@ -109,6 +110,36 @@ public void shouldGenerateNormalizedTableNameForTopic() { } + @Test + public void shouldSelectTableFromMapping() { + final Map props = new HashMap<>(); + props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhnost"); + props.put(JdbcSinkConfig.TABLE_NAME_FORMAT, "${topic}"); + props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, "some_topic:same_table"); + + final JdbcSinkConfig jdbcSinkConfig = new JdbcSinkConfig(props); + dialect = new SqliteDatabaseDialect(jdbcSinkConfig); + final DbStructure dbStructure = new DbStructure(dialect); + final JdbcDbWriter writer = new JdbcDbWriter(jdbcSinkConfig, dialect, dbStructure); + + final TableId tableId = writer.destinationTable("some_topic"); + assertEquals("same_table", tableId.tableName()); + } + + @Test(expected = ConnectException.class) + public void shouldThrowConnectExceptionForUnknownTopicToTableMapping() { + final Map props = new HashMap<>(); + props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhnost"); + props.put(JdbcSinkConfig.TABLE_NAME_FORMAT, ""); + props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, "some_topic:same_table,some_topic2:same_table2"); + + final JdbcSinkConfig jdbcSinkConfig = new JdbcSinkConfig(props); + dialect = new SqliteDatabaseDialect(jdbcSinkConfig); + final DbStructure dbStructure = new DbStructure(dialect); + final JdbcDbWriter writer = new JdbcDbWriter(jdbcSinkConfig, dialect, dbStructure); + writer.generateTableNameFor("another_topic"); + } + @Test public void autoCreateWithAutoEvolve() throws SQLException { final String topic = "books"; diff --git a/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java b/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java new file mode 100644 index 00000000..0acfc363 --- /dev/null +++ b/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java @@ -0,0 +1,74 @@ +/* + * Copyright 2020 Aiven Oy + * Copyright 2016 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.jdbc.sink; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigException; + +import org.junit.Test; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; + +public class JdbcSinkConfigTest { + + @Test + public void shouldReturnEmptyMapForUndefinedMapping() { + final Map props = new HashMap<>(); + props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost"); + assertTrue(new JdbcSinkConfig(props).topicsToTablesMapping.isEmpty()); + } + + @Test + public void shouldParseTopicToTableMappings() { + final Map props = new HashMap<>(); + props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost"); + props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, "t0:tbl0,t1:tbl1"); + + JdbcSinkConfig config = new JdbcSinkConfig(props); + + assertEquals(config.topicsToTablesMapping.size(), 2); + assertEquals(config.topicsToTablesMapping.get("t0"), "tbl0"); + assertEquals(config.topicsToTablesMapping.get("t1"), "tbl1"); + + props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, "t3:tbl3"); + config = new JdbcSinkConfig(props); + + assertEquals(config.topicsToTablesMapping.size(), 1); + assertEquals(config.topicsToTablesMapping.get("t3"), "tbl3"); + } + + @Test(expected = ConfigException.class) + public void shouldThrowExceptionForWrongMappingFormat() { + final Map props = new HashMap<>(); + props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, "asd:asd,asd"); + + new JdbcSinkConfig(props); + } + + @Test(expected = ConfigException.class) + public void shouldThrowExceptionForEmptyMappingFormat() { + final Map props = new HashMap<>(); + props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, ",,,,,,asd"); + + new JdbcSinkConfig(props); + } + +}