diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java index 477690c..169a8df 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java @@ -17,6 +17,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig { public static final String SRC_INIT_SYNC_DELAY_DISPLAY = "INIT_SYNC delay"; public static final int SRC_INIT_SYNC_DELAY_DEFAULT = 60; + public static final String SRC_INIT_SYNC_ENABLE_CONFIG = "init.sync.enable"; + public static final String SRC_INIT_SYNC_ENABLE_DOC = "Define if INIT_SYNC should be enabled."; + public static final String SRC_INIT_SYNC_ENABLE_DISPLAY = "INIT_SYNC enable"; + public static final boolean SRC_INIT_SYNC_ENABLE_DEFAULT = false; + public static final String AWS_REGION_CONFIG = "aws.region"; public static final String AWS_REGION_DOC = "Define AWS region."; public static final String AWS_REGION_DISPLAY = "Region"; @@ -223,16 +228,25 @@ public static ConfigDef baseConfigDef() { SRC_INIT_SYNC_DELAY_DEFAULT, ConfigDef.Importance.LOW, SRC_INIT_SYNC_DELAY_DOC, - CONNECTOR_GROUP, 2, + CONNECTOR_GROUP, 4, ConfigDef.Width.MEDIUM, SRC_INIT_SYNC_DELAY_DISPLAY) + .define(SRC_INIT_SYNC_ENABLE_CONFIG, + ConfigDef.Type.BOOLEAN, + SRC_INIT_SYNC_ENABLE_DEFAULT, + ConfigDef.Importance.LOW, + SRC_INIT_SYNC_ENABLE_DOC, + CONNECTOR_GROUP, 3, + ConfigDef.Width.MEDIUM, + SRC_INIT_SYNC_ENABLE_DISPLAY) + .define(REDISCOVERY_PERIOD_CONFIG, ConfigDef.Type.LONG, REDISCOVERY_PERIOD_DEFAULT, ConfigDef.Importance.LOW, REDISCOVERY_PERIOD_DOC, - CONNECTOR_GROUP, 4, + CONNECTOR_GROUP, 5, ConfigDef.Width.MEDIUM, REDISCOVERY_PERIOD_DISPLAY) ; @@ -289,6 +303,10 @@ public int getInitSyncDelay() { return (int)get(SRC_INIT_SYNC_DELAY_CONFIG); } + public boolean getInitSyncEnable() { + return getBoolean(SRC_INIT_SYNC_ENABLE_CONFIG); + } + public String getDynamoDBServiceEndpoint() { return getString(AWS_DYNAMODB_SERVICE_ENDPOINT_CONFIG); } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 9ca3693..22997f6 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -89,6 +89,7 @@ public class DynamoDBSourceTask extends SourceTask { private SourceInfo sourceInfo; private TableDescription tableDesc; private int initSyncDelay; + private boolean initSyncEnabled = false; @SuppressWarnings("unused") //Used by Confluent platform to initialize connector @@ -123,6 +124,7 @@ public void start(Map configProperties) { tableDesc = client.describeTable(config.getTableName()).getTable(); initSyncDelay = config.getInitSyncDelay(); + initSyncEnabled = config.getInitSyncEnable(); LOGGER.debug("Getting offset for table: {}", tableDesc.getTableName()); setStateFromOffset(); @@ -163,7 +165,11 @@ private void setStateFromOffset() { } else { LOGGER.debug("No stored offset found for table: {}", tableDesc.getTableName()); sourceInfo = new SourceInfo(tableDesc.getTableName(), clock); - sourceInfo.startInitSync(); // InitSyncStatus always needs to run after adding new table + if (initSyncEnabled) { + sourceInfo.startInitSync(); // InitSyncStatus always needs to run after adding new table + } else{ + sourceInfo.initSyncStatus = InitSyncStatus.FINISHED; + } } } @@ -296,7 +302,7 @@ private List sync() throws Exception { // // NOTE2: KCL worker reads from multiple shards at the same time in a loop. // Which means that there can be messages from various time instances (before and after init sync start instance). - if (isPreInitSyncRecord(arrivalTimestamp)) { + if (initSyncEnabled && isPreInitSyncRecord(arrivalTimestamp)) { LOGGER.debug( "Dropping old record to prevent another INIT_SYNC. ShardId: {} " + "ApproximateArrivalTimestamp: {} CurrentTime: {}", @@ -315,7 +321,7 @@ private List sync() throws Exception { // * connector was down for some time // * connector is lagging // * connector failed to finish init sync in acceptable time frame - if (recordIsInDangerZone(arrivalTimestamp)) { + if (initSyncEnabled && recordIsInDangerZone(arrivalTimestamp)) { sourceInfo.startInitSync(); LOGGER.info(