diff --git a/docs/options.md b/docs/options.md index 423a1c7..87f4cf8 100644 --- a/docs/options.md +++ b/docs/options.md @@ -32,6 +32,7 @@ "resource.tagging.service.endpoint": "", "kafka.topic.prefix": "dynamodb-", + "kafka.topic.map": "", "tasks.max": "1", "init.sync.delay.period": 60, @@ -44,7 +45,9 @@ `dynamodb.table.ingestion.tag.key` - only tables marked with this tag key will be ingested. -`kafka.topic.prefix` - all topics created by this connector will have this prefix in their name. Following this pattern `{prefix}-{dynamodb-table-name}` +`kafka.topic.prefix` - all topics created by this connector will have this prefix in their name. Following this pattern `{prefix}{dynamodb-table-name}` + +`kafka.topic.map` - A JSON mapping between dynamodb table name and topic name. The topics will be named like `{prefix}{map[table-name]}`. If the map is not specified, the table name will be used. `tasks.max` - **MUST** always exceed number of tables found for tracking. If max tasks count is lower then found tables count, no tasks will be started! diff --git a/source/build.gradle b/source/build.gradle index d8faae6..8174a95 100644 --- a/source/build.gradle +++ b/source/build.gradle @@ -3,6 +3,7 @@ description = "Kafka Connect Source connector that reads from DynamoDB streams" dependencies { implementation 'com.google.code.gson:gson:2.8.2' implementation 'com.amazonaws:aws-java-sdk-resourcegroupstaggingapi:1.11.551' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2' compile group: 'org.apache.kafka', name: 'connect-api', version: "${rootProject.ext.kafkaConnectApiVersion}" compile group: 'com.amazonaws', name: 'amazon-kinesis-client', version: '1.9.1' diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java index 078d6fb..3b1c6a5 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java @@ -58,10 +58,14 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig { public static final String SRC_KCL_TABLE_BILLING_MODE_DEFAULT = "PROVISIONED"; public static final String DST_TOPIC_PREFIX_CONFIG = "kafka.topic.prefix"; - public static final String DST_TOPIC_PREFIX_DOC = "Define Kafka topic destination prefix. End will be the name of a table."; + public static final String DST_TOPIC_PREFIX_DOC = "Define Kafka topic destination prefix. End will be the name of a table by default."; public static final String DST_TOPIC_PREFIX_DISPLAY = "Topic prefix"; public static final String DST_TOPIC_PREFIX_DEFAULT = "dynamodb-"; + public static final String DST_TOPIC_NAMESPACE_MAP_CONFIG = "kafka.topic.namespace.map"; + public static final String DST_TOPIC_NAMESPACE_MAP_DOC = "Define Kafka topic namespace map."; + public static final String DST_TOPIC_NAMESPACE_MAP_DISPLAY = "Topic namespace map"; + public static final String DST_TOPIC_NAMESPACE_MAP_DEFAULT = null; public static final String REDISCOVERY_PERIOD_CONFIG = "connect.dynamodb.rediscovery.period"; public static final String REDISCOVERY_PERIOD_DOC = "Time period in milliseconds to rediscover stream enabled DynamoDB tables"; @@ -189,6 +193,15 @@ public static ConfigDef baseConfigDef() { CONNECTOR_GROUP, 1, ConfigDef.Width.MEDIUM, DST_TOPIC_PREFIX_DISPLAY) + + .define(DST_TOPIC_NAMESPACE_MAP_CONFIG, + ConfigDef.Type.STRING, + DST_TOPIC_NAMESPACE_MAP_DEFAULT, + ConfigDef.Importance.HIGH, + DST_TOPIC_NAMESPACE_MAP_DOC, + CONNECTOR_GROUP, 2, + ConfigDef.Width.LONG, + DST_TOPIC_NAMESPACE_MAP_DISPLAY) .define(SRC_INIT_SYNC_DELAY_CONFIG, ConfigDef.Type.INT, @@ -249,6 +262,10 @@ public String getDestinationTopicPrefix() { return getString(DST_TOPIC_PREFIX_CONFIG); } + public String getDestinationTopicNamespaceMap() { + return getString(DST_TOPIC_NAMESPACE_MAP_CONFIG); + } + public long getRediscoveryPeriod() { return getLong(REDISCOVERY_PERIOD_CONFIG); } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 1868cc3..dc39d01 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -134,7 +134,7 @@ public void start(Map configProperties) { tableDesc.getTableName(), tableDesc.getProvisionedThroughput().getReadCapacityUnits()); } - converter = new RecordConverter(tableDesc, config.getDestinationTopicPrefix()); + converter = new RecordConverter(tableDesc, config.getDestinationTopicPrefix(), config.getDestinationTopicNamespaceMap()); LOGGER.info("Starting background KCL worker thread for table: {}", tableDesc.getTableName()); diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index dd9137c..adb1c6f 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -4,6 +4,7 @@ import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.TableDescription; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.trustpilot.connector.dynamodb.Envelope; @@ -22,6 +23,8 @@ import static java.util.stream.Collectors.toList; +import java.io.IOException; + /** * Takes in KCL event attributes and converts into Kafka Connect Source record. * With dynamic schema for key(based on actual DynamoDB table keys) and fixed schema for value. @@ -44,8 +47,12 @@ public class RecordConverter { private List keys; public RecordConverter(TableDescription tableDesc, String topicNamePrefix) { + this(tableDesc, topicNamePrefix, null); + } + + public RecordConverter(TableDescription tableDesc, String topicNamePrefix, String topicNamespaceMap) { this.tableDesc = tableDesc; - this.topic_name = topicNamePrefix + tableDesc.getTableName(); + this.topic_name = topicNamePrefix + this.getTopicNameSuffix(topicNamespaceMap, tableDesc.getTableName()); valueSchema = SchemaBuilder.struct() .name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope")) @@ -115,6 +122,26 @@ public SourceRecord toSourceRecord( ); } + private String getTopicNameSuffix(String topicNamespaceMap, String tableName) { + if (Strings.isNullOrEmpty(topicNamespaceMap)) { + return tableName; + } + + ObjectMapper objectMapper = new ObjectMapper(); + try { + LinkedHashMap map = objectMapper.readValue(topicNamespaceMap, new TypeReference>() {}); + + if (map.containsKey(tableName)) { + return (String) map.get(tableName); + } + + } catch (IOException e) { + throw new IllegalArgumentException("Invalid topicNamespaceMap: " + topicNamespaceMap); + } + + return tableName; + } + private Schema getKeySchema(List keys) { SchemaBuilder keySchemaBuilder = SchemaBuilder.struct() .name(SchemaNameAdjuster.DEFAULT.adjust(topic_name + ".Key")); diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java index 3dd2ab9..0da238d 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java @@ -91,6 +91,25 @@ public void correctTopicNameIsConstructed() throws Exception { assertEquals("TestTopicPrefix-TestTable1", record.topic()); } + @Test + public void correctTopicNameIsConstructedWithTopicNamespaceMapExact() throws Exception { + // Arrange + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-", "{\"TestTable1\":\"TestTopic1\"}"); + + // Act + SourceRecord record = converter.toSourceRecord( + getSourceInfo(table), + Envelope.Operation.forCode("r"), + getAttributes(), + Instant.parse("2001-01-02T00:00:00.00Z"), + "testShardID1", + "testSequenceNumberID1" + ); + + // Assert + assertEquals("TestTopicPrefix-TestTopic1", record.topic()); + } + @Test public void sourceInfoIsPutToOffset() throws Exception { // Arrange