diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 10a0c0e..1a33b9f 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -177,7 +177,11 @@ private void setStateFromOffset() { } else { LOGGER.debug("No stored offset found for table: {}", tableDesc.getTableName()); sourceInfo = new SourceInfo(tableDesc.getTableName(), clock); - sourceInfo.startInitSync(); + if (initSyncSkip) { + sourceInfo.skipInitSync(); + } else { + sourceInfo.startInitSync(); + } } } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index ea01058..3c2efbe 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -875,6 +875,25 @@ public void onCommitIgnoreRecordsWithoutSequenceNumber() throws InterruptedExcep assertEquals("", shardRegister.get("shard1").getLastCommittedRecordSeqNo()); } + @Test + public void initSyncIsSkippedWithNoOffsetOnStart() throws InterruptedException { + configs.put("init.sync.skip", "true"); + // Arrange + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(null) + .buildTask(); + + // Act + task.start(configs); + + // Assert + SourceInfo sourceInfo = task.getSourceInfo(); + assertEquals(tableName, sourceInfo.tableName); + assertEquals(InitSyncStatus.SKIPPED, sourceInfo.initSyncStatus); + assertEquals(Instant.parse("1970-01-01T00:00:00Z"), sourceInfo.lastInitSyncStart); + assertEquals(Instant.parse("1970-01-01T00:00:00Z"), sourceInfo.lastInitSyncEnd); + } + @Test public void sourceInfoOfSkippedInitSyncIsLoadedFromOffsetOnStart() throws InterruptedException { configs.put("init.sync.skip", "true");