Skip to content

Commit

Permalink
Merge pull request #38 from aiven/topics_to_tables_mapping
Browse files Browse the repository at this point in the history
Issue #36
  • Loading branch information
Aleksandr Beloglazov authored Jul 8, 2020
2 parents 01c85ae + d3e6ca7 commit 2bd80b1
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 16 deletions.
35 changes: 24 additions & 11 deletions src/main/java/io/aiven/connect/jdbc/sink/JdbcDbWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
Expand Down Expand Up @@ -91,21 +92,33 @@ void closeQuietly() {

TableId destinationTable(final String topic) {
final String tableName = generateTableNameFor(topic);
if (tableName.isEmpty()) {
throw new ConnectException(String.format(
"Destination table name for topic '%s' is empty using the format string '%s'",
topic,
config.tableNameFormat
));
}
return dbDialect.parseTableIdentifier(tableName);
}

public String generateTableNameFor(final String topic) {
final String tableName = config.tableNameFormat.replace("${topic}", topic);
return config.tableNameNormalize
? NORMALIZE_TABLE_NAME_FOR_TOPIC.matcher(tableName).replaceAll("_")
: tableName;
String tableName = config.tableNameFormat.replace("${topic}", topic);
if (config.tableNameNormalize) {
tableName = NORMALIZE_TABLE_NAME_FOR_TOPIC.matcher(tableName).replaceAll("_");
}
if (!config.topicsToTablesMapping.isEmpty()) {
tableName = config.topicsToTablesMapping.getOrDefault(topic, "");
}
if (tableName.isEmpty()) {
final String errorMessage =
String.format(
"Destination table for the topic: '%s' "
+ "couldn't be found in the topics to tables mapping: '%s' "
+ "and couldn't be generated for the format string '%s'",
topic,
config.topicsToTablesMapping
.entrySet()
.stream()
.map(e -> String.join("->", e.getKey(), e.getValue()))
.collect(Collectors.joining(",")),
config.tableNameFormat);
throw new ConnectException(errorMessage);
}
return tableName;
}

}
60 changes: 55 additions & 5 deletions src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -57,7 +59,7 @@ public enum PrimaryKeyMode {
);

public static final String TABLE_NAME_FORMAT = "table.name.format";
private static final String TABLE_NAME_FORMAT_DEFAULT = "${topic}";
public static final String TABLE_NAME_FORMAT_DEFAULT = "${topic}";
private static final String TABLE_NAME_FORMAT_DOC =
"A format string for the destination table name, which may contain '${topic}' as a "
+ "placeholder for the originating topic name.\n"
Expand All @@ -73,6 +75,14 @@ public enum PrimaryKeyMode {
+ "remain as is, others (like ``.``) are replaced with ``_``.";
private static final String TABLE_NAME_NORMALIZE_DISPLAY = "Table Name Normalize";

public static final String TOPICS_TO_TABLES_MAPPING = "topics.to.tables.mapping";
private static final String TOPICS_TO_TABLES_MAPPING_DOC =
"Kafka topics to database tables mapping. "
+ "Comma-separated list of topic to table mapping in the format: topic_name:table_name. "
+ "If the destination table found in the mapping, "
+ "it would override generated one defined in " + TABLE_NAME_FORMAT + ".";
private static final String TOPICS_TO_TABLES_MAPPING_DISPLAY = "Topics To Tables Mapping";

public static final String MAX_RETRIES = "max.retries";
private static final int MAX_RETRIES_DEFAULT = 10;
private static final String MAX_RETRIES_DOC =
Expand Down Expand Up @@ -224,13 +234,43 @@ public enum PrimaryKeyMode {
TABLE_NAME_NORMALIZE,
ConfigDef.Type.BOOLEAN,
TABLE_NAME_NORMALIZE_DEFAULT,
ConfigDef.Importance.LOW,
ConfigDef.Importance.MEDIUM,
TABLE_NAME_NORMALIZE_DOC,
DATAMAPPING_GROUP,
2,
ConfigDef.Width.LONG,
TABLE_NAME_NORMALIZE_DISPLAY
)
.define(
TOPICS_TO_TABLES_MAPPING,
ConfigDef.Type.LIST,
null,
new ConfigDef.Validator() {
@Override
public void ensureValid(final String name, final Object value) {
if (Objects.isNull(value)
|| ConfigDef.NO_DEFAULT_VALUE == value
|| "".equals(value)) {
return;
}
assert value instanceof List;
try {
final Map<String, String> mapping = topicToTableMapping((List<String>) value);
if (Objects.isNull(mapping) || mapping.isEmpty()) {
throw new ConfigException(name, value, "Invalid topics to tables mapping");
}
} catch (final ArrayIndexOutOfBoundsException e) {
throw new ConfigException(name, value, "Invalid topics to tables mapping");
}
}
},
ConfigDef.Importance.MEDIUM,
TOPICS_TO_TABLES_MAPPING_DOC,
DATAMAPPING_GROUP,
3,
ConfigDef.Width.LONG,
TOPICS_TO_TABLES_MAPPING_DISPLAY
)
.define(
PK_MODE,
ConfigDef.Type.STRING,
Expand All @@ -239,7 +279,7 @@ public enum PrimaryKeyMode {
ConfigDef.Importance.HIGH,
PK_MODE_DOC,
DATAMAPPING_GROUP,
3,
4,
ConfigDef.Width.MEDIUM,
PK_MODE_DISPLAY
)
Expand All @@ -250,7 +290,7 @@ public enum PrimaryKeyMode {
ConfigDef.Importance.MEDIUM,
PK_FIELDS_DOC,
DATAMAPPING_GROUP,
4,
5,
ConfigDef.Width.LONG, PK_FIELDS_DISPLAY
)
.define(
Expand All @@ -260,7 +300,7 @@ public enum PrimaryKeyMode {
ConfigDef.Importance.MEDIUM,
FIELDS_WHITELIST_DOC,
DATAMAPPING_GROUP,
5,
6,
ConfigDef.Width.LONG,
FIELDS_WHITELIST_DISPLAY
);
Expand Down Expand Up @@ -315,6 +355,7 @@ public enum PrimaryKeyMode {
}

public final String tableNameFormat;
public final Map<String, String> topicsToTablesMapping;
public final boolean tableNameNormalize;
public final int batchSize;
public final int maxRetries;
Expand All @@ -331,6 +372,7 @@ public JdbcSinkConfig(final Map<?, ?> props) {
super(CONFIG_DEF, props);
tableNameFormat = getString(TABLE_NAME_FORMAT).trim();
tableNameNormalize = getBoolean(TABLE_NAME_NORMALIZE);
topicsToTablesMapping = topicToTableMapping(getList(TOPICS_TO_TABLES_MAPPING));
batchSize = getInt(BATCH_SIZE);
maxRetries = getInt(MAX_RETRIES);
retryBackoffMs = getInt(RETRY_BACKOFF_MS);
Expand All @@ -344,6 +386,14 @@ public JdbcSinkConfig(final Map<?, ?> props) {
timeZone = TimeZone.getTimeZone(ZoneId.of(dbTimeZone));
}

static Map<String, String> topicToTableMapping(final List<String> value) {
return (Objects.nonNull(value))
? value.stream()
.map(s -> s.split(":"))
.collect(Collectors.toMap(e -> e[0], e -> e[1]))
: Collections.emptyMap();
}

private static class EnumValidator implements ConfigDef.Validator {
private final List<String> canonicalValues;
private final Set<String> validValues;
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/io/aiven/connect/jdbc/sink/JdbcDbWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;

import io.aiven.connect.jdbc.dialect.DatabaseDialect;
Expand Down Expand Up @@ -109,6 +110,36 @@ public void shouldGenerateNormalizedTableNameForTopic() {

}

@Test
public void shouldSelectTableFromMapping() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhnost");
props.put(JdbcSinkConfig.TABLE_NAME_FORMAT, "${topic}");
props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, "some_topic:same_table");

final JdbcSinkConfig jdbcSinkConfig = new JdbcSinkConfig(props);
dialect = new SqliteDatabaseDialect(jdbcSinkConfig);
final DbStructure dbStructure = new DbStructure(dialect);
final JdbcDbWriter writer = new JdbcDbWriter(jdbcSinkConfig, dialect, dbStructure);

final TableId tableId = writer.destinationTable("some_topic");
assertEquals("same_table", tableId.tableName());
}

@Test(expected = ConnectException.class)
public void shouldThrowConnectExceptionForUnknownTopicToTableMapping() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhnost");
props.put(JdbcSinkConfig.TABLE_NAME_FORMAT, "");
props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, "some_topic:same_table,some_topic2:same_table2");

final JdbcSinkConfig jdbcSinkConfig = new JdbcSinkConfig(props);
dialect = new SqliteDatabaseDialect(jdbcSinkConfig);
final DbStructure dbStructure = new DbStructure(dialect);
final JdbcDbWriter writer = new JdbcDbWriter(jdbcSinkConfig, dialect, dbStructure);
writer.generateTableNameFor("another_topic");
}

@Test
public void autoCreateWithAutoEvolve() throws SQLException {
final String topic = "books";
Expand Down
74 changes: 74 additions & 0 deletions src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2020 Aiven Oy
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.connect.jdbc.sink;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.config.ConfigException;

import org.junit.Test;

import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertTrue;

public class JdbcSinkConfigTest {

@Test
public void shouldReturnEmptyMapForUndefinedMapping() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost");
assertTrue(new JdbcSinkConfig(props).topicsToTablesMapping.isEmpty());
}

@Test
public void shouldParseTopicToTableMappings() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost");
props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, "t0:tbl0,t1:tbl1");

JdbcSinkConfig config = new JdbcSinkConfig(props);

assertEquals(config.topicsToTablesMapping.size(), 2);
assertEquals(config.topicsToTablesMapping.get("t0"), "tbl0");
assertEquals(config.topicsToTablesMapping.get("t1"), "tbl1");

props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, "t3:tbl3");
config = new JdbcSinkConfig(props);

assertEquals(config.topicsToTablesMapping.size(), 1);
assertEquals(config.topicsToTablesMapping.get("t3"), "tbl3");
}

@Test(expected = ConfigException.class)
public void shouldThrowExceptionForWrongMappingFormat() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, "asd:asd,asd");

new JdbcSinkConfig(props);
}

@Test(expected = ConfigException.class)
public void shouldThrowExceptionForEmptyMappingFormat() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.TOPICS_TO_TABLES_MAPPING, ",,,,,,asd");

new JdbcSinkConfig(props);
}

}

0 comments on commit 2bd80b1

Please sign in to comment.