Skip to content

Commit

Permalink
add new parameter 'table.name.normalize'
Browse files Browse the repository at this point in the history
  • Loading branch information
willyborankin committed Jun 16, 2020
1 parent 87d4774 commit 64ede60
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 31 deletions.
6 changes: 4 additions & 2 deletions src/main/java/io/aiven/connect/jdbc/sink/JdbcDbWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class JdbcDbWriter {

private static final Logger log = LoggerFactory.getLogger(JdbcDbWriter.class);

private static final Pattern NORMALIZE_TABLE_NAME_FOR_TOPIC = Pattern.compile("(?<!^)[^a-zA-Z0-9_-]");
private static final Pattern NORMALIZE_TABLE_NAME_FOR_TOPIC = Pattern.compile("[^a-zA-Z0-9_]");

private final JdbcSinkConfig config;

Expand Down Expand Up @@ -103,7 +103,9 @@ TableId destinationTable(final String topic) {

public String generateTableNameFor(final String topic) {
final String tableName = config.tableNameFormat.replace("${topic}", topic);
return NORMALIZE_TABLE_NAME_FOR_TOPIC.matcher(tableName).replaceAll("_");
return config.tableNameNormalize
? NORMALIZE_TABLE_NAME_FOR_TOPIC.matcher(tableName).replaceAll("_")
: tableName;
}

}
33 changes: 26 additions & 7 deletions src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,23 @@ public enum PrimaryKeyMode {
);

public static final String TABLE_NAME_FORMAT = "table.name.format";

public static final String TABLE_NAME_FORMAT_DEFAULT = "${topic}";

private static final String TABLE_NAME_FORMAT_DOC =
"A format string for the destination table name, which may contain '${topic}' as a "
+ "placeholder for the originating topic name.\n"
+ "For example, ``kafka_${topic}`` for the topic 'orders' will map to the table name "
+ "'kafka_orders'. The alphanumeric characters (``a-z A-Z 0-9``) and ``_`` "
+ "will remain as is, others (like ``.``) will be replaced by ``_``.";
+ "'kafka_orders'.";
private static final String TABLE_NAME_FORMAT_DISPLAY = "Table Name Format";

public static final String TABLE_NAME_NORMALIZE = "table.name.normalize";
public static final boolean TABLE_NAME_NORMALIZE_DEFAULT = false;
private static final String TABLE_NAME_NORMALIZE_DOC =
"If set to ``true`` the alphanumeric characters (``a-z A-Z 0-9``) and ``_`` "
+ "in the destination table name for the particular topic will remain as is, "
+ "others (like ``.``) will be replaced by ``_``. "
+ "By default is set to ``false``.";
private static final String TABLE_NAME_NORMALIZE_DISPLAY = "Table Name Normalize";

public static final String MAX_RETRIES = "max.retries";
private static final int MAX_RETRIES_DEFAULT = 10;
private static final String MAX_RETRIES_DOC =
Expand Down Expand Up @@ -215,6 +221,17 @@ public enum PrimaryKeyMode {
ConfigDef.Width.LONG,
TABLE_NAME_FORMAT_DISPLAY
)
.define(
TABLE_NAME_NORMALIZE,
ConfigDef.Type.BOOLEAN,
TABLE_NAME_NORMALIZE_DEFAULT,
ConfigDef.Importance.MEDIUM,
TABLE_NAME_NORMALIZE_DOC,
DATAMAPPING_GROUP,
2,
ConfigDef.Width.LONG,
TABLE_NAME_NORMALIZE_DISPLAY
)
.define(
PK_MODE,
ConfigDef.Type.STRING,
Expand All @@ -223,7 +240,7 @@ public enum PrimaryKeyMode {
ConfigDef.Importance.HIGH,
PK_MODE_DOC,
DATAMAPPING_GROUP,
2,
3,
ConfigDef.Width.MEDIUM,
PK_MODE_DISPLAY
)
Expand All @@ -234,7 +251,7 @@ public enum PrimaryKeyMode {
ConfigDef.Importance.MEDIUM,
PK_FIELDS_DOC,
DATAMAPPING_GROUP,
3,
4,
ConfigDef.Width.LONG, PK_FIELDS_DISPLAY
)
.define(
Expand All @@ -244,7 +261,7 @@ public enum PrimaryKeyMode {
ConfigDef.Importance.MEDIUM,
FIELDS_WHITELIST_DOC,
DATAMAPPING_GROUP,
4,
5,
ConfigDef.Width.LONG,
FIELDS_WHITELIST_DISPLAY
);
Expand Down Expand Up @@ -299,6 +316,7 @@ public enum PrimaryKeyMode {
}

public final String tableNameFormat;
public final boolean tableNameNormalize;
public final int batchSize;
public final int maxRetries;
public final int retryBackoffMs;
Expand All @@ -313,6 +331,7 @@ public enum PrimaryKeyMode {
public JdbcSinkConfig(final Map<?, ?> props) {
super(CONFIG_DEF, props);
tableNameFormat = getString(TABLE_NAME_FORMAT).trim();
tableNameNormalize = getBoolean(TABLE_NAME_NORMALIZE);
batchSize = getInt(BATCH_SIZE);
maxRetries = getInt(MAX_RETRIES);
retryBackoffMs = getInt(RETRY_BACKOFF_MS);
Expand Down
28 changes: 6 additions & 22 deletions src/test/java/io/aiven/connect/jdbc/sink/JdbcDbWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,23 @@ private JdbcDbWriter newWriter(final Map<String, String> props) {

@Test
public void shouldGenerateNormalizedTableNameForTopic() {

final Map<String, String> props = new HashMap<>();
final Map<String, Object> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost");
props.put(JdbcSinkConfig.TABLE_NAME_FORMAT, "kafka_topic_${topic}");
props.put(JdbcSinkConfig.TABLE_NAME_NORMALIZE, true);
final JdbcSinkConfig jdbcSinkConfig = new JdbcSinkConfig(props);

dialect = new SqliteDatabaseDialect(jdbcSinkConfig);
final DbStructure dbStructure = new DbStructure(dialect);
final JdbcDbWriter jdbcDbWriter = new JdbcDbWriter(jdbcSinkConfig, dialect, dbStructure);

assertEquals("kafka_topic___some_topic",
jdbcDbWriter.generateTableNameFor("--some_topic"));

assertEquals("kafka_topic_some_topic",
jdbcDbWriter.generateTableNameFor("some_topic"));

assertEquals("kafka_topic_some-topic",
assertEquals("kafka_topic_some_topic",
jdbcDbWriter.generateTableNameFor("some-topic"));

assertEquals("kafka_topic_this_is_topic_with_dots",
Expand All @@ -106,25 +109,6 @@ public void shouldGenerateNormalizedTableNameForTopic() {

}


@Test
public void shouldGetNormalizedTableName() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhnost");
props.put(JdbcSinkConfig.TABLE_NAME_FORMAT, "${topic}");

final JdbcSinkConfig jdbcSinkConfig = new JdbcSinkConfig(props);
dialect = new SqliteDatabaseDialect(jdbcSinkConfig);
final DbStructure dbStructure = new DbStructure(dialect);
final JdbcDbWriter writer = new JdbcDbWriter(jdbcSinkConfig, dialect, dbStructure);

TableId tableId = writer.destinationTable("this.is.my.topic");
assertEquals("this_is_my_topic", tableId.tableName());

tableId = writer.destinationTable("the_topic");
assertEquals("the_topic", tableId.tableName());
}

@Test
public void autoCreateWithAutoEvolve() throws SQLException {
final String topic = "books";
Expand Down

0 comments on commit 64ede60

Please sign in to comment.