Skip to content

Commit

Permalink
Add new config parameter: topics.to.tables.mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
willyborankin committed Jun 16, 2020
1 parent 64ede60 commit db4d0cf
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 14 deletions.
35 changes: 24 additions & 11 deletions src/main/java/io/aiven/connect/jdbc/sink/JdbcDbWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
Expand Down Expand Up @@ -91,21 +92,33 @@ void closeQuietly() {

TableId destinationTable(final String topic) {
final String tableName = generateTableNameFor(topic);
if (tableName.isEmpty()) {
throw new ConnectException(String.format(
"Destination table name for topic '%s' is empty using the format string '%s'",
topic,
config.tableNameFormat
));
}
return dbDialect.parseTableIdentifier(tableName);
}

public String generateTableNameFor(final String topic) {
final String tableName = config.tableNameFormat.replace("${topic}", topic);
return config.tableNameNormalize
? NORMALIZE_TABLE_NAME_FOR_TOPIC.matcher(tableName).replaceAll("_")
: tableName;
String tableName = config.tableNameFormat.replace("${topic}", topic);
if (config.tableNameNormalize) {
tableName = NORMALIZE_TABLE_NAME_FOR_TOPIC.matcher(tableName).replaceAll("_");
}
if (!config.topicsToTablesMapping.isEmpty()) {
tableName = config.topicsToTablesMapping.getOrDefault(topic, "");
}
if (tableName.isEmpty()) {
final String errorMessage =
String.format(
"Destination table for the topic: '%s' "
+ "couldn't be found in the topics to tables mapping: '%s' "
+ "and couldn't be generated for the format string '%s'",
topic,
config.topicsToTablesMapping
.entrySet()
.stream()
.map(e -> String.join("->", e.getKey(), e.getValue()))
.collect(Collectors.joining(",")),
config.tableNameFormat);
throw new ConnectException(errorMessage);
}
return tableName;
}

}
57 changes: 54 additions & 3 deletions src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -57,7 +59,9 @@ public enum PrimaryKeyMode {
);

public static final String TABLE_NAME_FORMAT = "table.name.format";

public static final String TABLE_NAME_FORMAT_DEFAULT = "${topic}";

private static final String TABLE_NAME_FORMAT_DOC =
"A format string for the destination table name, which may contain '${topic}' as a "
+ "placeholder for the originating topic name.\n"
Expand All @@ -74,6 +78,14 @@ public enum PrimaryKeyMode {
+ "By default is set to ``false``.";
private static final String TABLE_NAME_NORMALIZE_DISPLAY = "Table Name Normalize";

public static final String TOPICS_TO_TABLES_MAPPING = "topics.to.tables.mapping";
private static final String TOPICS_TO_TABLES_MAPPING_DOC =
"Kafka topics to database tables mapping. "
+ "Comma-separated list of topic to table mapping in the format: topic_name:table_name. "
+ "If the destination table found in the mapping, "
+ "it would override generated one defined in " + TABLE_NAME_FORMAT + ".";
private static final String TOPICS_TO_TABLES_MAPPING_DISPLAY = "Topics To Tables Mapping";

public static final String MAX_RETRIES = "max.retries";
private static final int MAX_RETRIES_DEFAULT = 10;
private static final String MAX_RETRIES_DOC =
Expand Down Expand Up @@ -232,6 +244,35 @@ public enum PrimaryKeyMode {
ConfigDef.Width.LONG,
TABLE_NAME_NORMALIZE_DISPLAY
)
.define(
TOPICS_TO_TABLES_MAPPING,
ConfigDef.Type.STRING,
null,
new ConfigDef.Validator() {
@Override
public void ensureValid(final String name, final Object value) {
if (Objects.isNull(value)
|| ConfigDef.NO_DEFAULT_VALUE == value
|| "".equals(value)) {
return;
}
assert value instanceof String;
try {
final Map<String, String> mapping = topicToTableMapping((String) value);
if (Objects.isNull(mapping) || mapping.isEmpty()) {
throw new ConfigException(name, value, "Invalid topics to tables mapping");
}
} catch (final ArrayIndexOutOfBoundsException e) {
throw new ConfigException(name, value, "Invalid topics to tables mapping");
}
}
},
ConfigDef.Importance.MEDIUM,
TOPICS_TO_TABLES_MAPPING_DOC,
DATAMAPPING_GROUP,
3,
ConfigDef.Width.LONG,
TOPICS_TO_TABLES_MAPPING_DISPLAY)
.define(
PK_MODE,
ConfigDef.Type.STRING,
Expand All @@ -240,7 +281,7 @@ public enum PrimaryKeyMode {
ConfigDef.Importance.HIGH,
PK_MODE_DOC,
DATAMAPPING_GROUP,
3,
4,
ConfigDef.Width.MEDIUM,
PK_MODE_DISPLAY
)
Expand All @@ -251,7 +292,7 @@ public enum PrimaryKeyMode {
ConfigDef.Importance.MEDIUM,
PK_FIELDS_DOC,
DATAMAPPING_GROUP,
4,
5,
ConfigDef.Width.LONG, PK_FIELDS_DISPLAY
)
.define(
Expand All @@ -261,7 +302,7 @@ public enum PrimaryKeyMode {
ConfigDef.Importance.MEDIUM,
FIELDS_WHITELIST_DOC,
DATAMAPPING_GROUP,
5,
6,
ConfigDef.Width.LONG,
FIELDS_WHITELIST_DISPLAY
);
Expand Down Expand Up @@ -316,6 +357,7 @@ public enum PrimaryKeyMode {
}

public final String tableNameFormat;
public final Map<String, String> topicsToTablesMapping;
public final boolean tableNameNormalize;
public final int batchSize;
public final int maxRetries;
Expand All @@ -332,6 +374,7 @@ public JdbcSinkConfig(final Map<?, ?> props) {
super(CONFIG_DEF, props);
tableNameFormat = getString(TABLE_NAME_FORMAT).trim();
tableNameNormalize = getBoolean(TABLE_NAME_NORMALIZE);
topicsToTablesMapping = topicToTableMapping(getString(TOPICS_TO_TABLES_MAPPING));
batchSize = getInt(BATCH_SIZE);
maxRetries = getInt(MAX_RETRIES);
retryBackoffMs = getInt(RETRY_BACKOFF_MS);
Expand All @@ -345,6 +388,14 @@ public JdbcSinkConfig(final Map<?, ?> props) {
timeZone = TimeZone.getTimeZone(ZoneId.of(dbTimeZone));
}

static Map<String, String> topicToTableMapping(final String value) {
return (Objects.nonNull(value))
? Arrays.stream(value.split(","))
.map(s -> s.split(":"))
.collect(Collectors.toMap(e -> e[0], e -> e[1]))
: Collections.emptyMap();
}

private static class EnumValidator implements ConfigDef.Validator {
private final List<String> canonicalValues;
private final Set<String> validValues;
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/io/aiven/connect/jdbc/sink/JdbcDbWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;

import io.aiven.connect.jdbc.dialect.DatabaseDialect;
Expand Down Expand Up @@ -109,6 +110,36 @@ public void shouldGenerateNormalizedTableNameForTopic() {

}

@Test
public void shouldSelectTableFromMapping() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhnost");
props.put(JdbcSinkConfig.TABLE_NAME_FORMAT, "${topic}");
props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, "some_topic:same_table");

final JdbcSinkConfig jdbcSinkConfig = new JdbcSinkConfig(props);
dialect = new SqliteDatabaseDialect(jdbcSinkConfig);
final DbStructure dbStructure = new DbStructure(dialect);
final JdbcDbWriter writer = new JdbcDbWriter(jdbcSinkConfig, dialect, dbStructure);

final TableId tableId = writer.destinationTable("some_topic");
assertEquals("same_table", tableId.tableName());
}

@Test(expected = ConnectException.class)
public void shouldThrowConnectExceptionForUnknownTopicToTableMapping() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhnost");
props.put(JdbcSinkConfig.TABLE_NAME_FORMAT, "");
props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, "some_topic:same_table,some_topic2:same_table2");

final JdbcSinkConfig jdbcSinkConfig = new JdbcSinkConfig(props);
dialect = new SqliteDatabaseDialect(jdbcSinkConfig);
final DbStructure dbStructure = new DbStructure(dialect);
final JdbcDbWriter writer = new JdbcDbWriter(jdbcSinkConfig, dialect, dbStructure);
writer.generateTableNameFor("another_topic");
}

@Test
public void autoCreateWithAutoEvolve() throws SQLException {
final String topic = "books";
Expand Down
54 changes: 54 additions & 0 deletions src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2020 Aiven Oy
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.connect.jdbc.sink;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.config.ConfigException;

import org.junit.Test;

import static junit.framework.TestCase.assertTrue;

public class JdbcSinkConfigTest {

@Test
public void shouldReturnEmptyMapForUndefinedMapping() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost");
assertTrue(new JdbcSinkConfig(props).topicsToTablesMapping.isEmpty());
}

@Test(expected = ConfigException.class)
public void shouldThrowExceptionForWrongMappingFormat() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, "asd:asd,asd");

new JdbcSinkConfig(props);
}

@Test(expected = ConfigException.class)
public void shouldThrowExceptionForEmptyMappingFormat() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, ",,,,,,asd");

new JdbcSinkConfig(props);
}

}

0 comments on commit db4d0cf

Please sign in to comment.