From 1a09960e1f90c7ab077c1f1d6a502d162f88e5fb Mon Sep 17 00:00:00 2001 From: Calvin Thomas Nix Date: Wed, 16 Oct 2024 12:32:14 -0400 Subject: [PATCH 1/4] KAFKA-428: Mark the copy complete in the source offset for the last copied document --- .../MongoSourceTaskIntegrationTest.java | 30 +++- .../source/StartedMongoSourceTask.java | 135 ++++++++++-------- 2 files changed, 102 insertions(+), 63 deletions(-) diff --git a/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java b/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java index 0f36e4f16..133d2e0d6 100644 --- a/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java +++ b/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java @@ -169,6 +169,9 @@ void testSourceLoadsDataFromMongoClientWithCopyExisting() { new HashMap() { { put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue()); + String namespaceRegex = + String.format("(%s\\.coll|%s\\.coll)", db1.getName(), db2.getName()); + put(MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG, namespaceRegex); } }; task.start(cfg); @@ -178,11 +181,22 @@ void testSourceLoadsDataFromMongoClientWithCopyExisting() { () -> assertEquals(150, firstPoll.size()), () -> assertSourceRecordValues(createInserts(1, 75), firstPoll, coll1), () -> assertSourceRecordValues(createInserts(1, 75), firstPoll, coll2), + // make sure all elements, except the last, contains the "copy" key () -> assertTrue( firstPoll.stream() .map(SourceRecord::sourceOffset) - .allMatch(i -> i.containsKey("copy")))); + .limit(149) + .allMatch(i -> i.containsKey("copy"))), + // make sure that the last copied element does not have the "copy" key + () -> + assertTrue( + firstPoll.stream() + .map(SourceRecord::sourceOffset) + .skip(149) + .findFirst() + .filter(i -> !i.containsKey("copy")) + .isPresent())); assertNull(task.poll()); @@ -566,8 +580,20 @@ void testCopyingExistingWithARestartMidwayThrough() { List thirdPoll = getNextBatch(task); assertSourceRecordValues(createInserts(26, 50), thirdPoll, coll); + // Make sure all elements, except the last one, contains the "copy" key + assertTrue( + thirdPoll.stream() + .map(SourceRecord::sourceOffset) + .limit(24) + .allMatch(i -> i.containsKey("copy"))); + // Make sure the last copied element does not contain the "copy" key assertTrue( - thirdPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy"))); + thirdPoll.stream() + .map(SourceRecord::sourceOffset) + .skip(24) + .findFirst() + .filter(i -> !i.containsKey("copy")) + .isPresent()); assertTrue(getNextBatch(task).isEmpty()); insertMany(rangeClosed(51, 75), coll); diff --git a/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java b/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java index 5eb4edabe..98305671e 100644 --- a/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java +++ b/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java @@ -48,6 +48,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -210,68 +211,80 @@ private List pollInternal() { createValueSchemaAndValueProvider(sourceConfig); List sourceRecords = new ArrayList<>(); - getNextBatch() - .forEach( - changeStreamDocument -> { - Map sourceOffset = new HashMap<>(); - sourceOffset.put(ID_FIELD, changeStreamDocument.getDocument(ID_FIELD).toJson()); - if (isCopying) { - sourceOffset.put(COPY_KEY, "true"); - } + Iterator batchIterator = getNextBatch().iterator(); + BsonDocument changeStreamDocument; + while (batchIterator.hasNext()) { + changeStreamDocument = batchIterator.next(); + Map sourceOffset = new HashMap<>(); + sourceOffset.put(ID_FIELD, changeStreamDocument.getDocument(ID_FIELD).toJson()); + if (isCopying) { + sourceOffset.put(COPY_KEY, "true"); + } - String topicName = topicMapper.getTopic(changeStreamDocument); - if (topicName.isEmpty()) { - LOGGER.warn( - "No topic set. Could not publish the message: {}", - changeStreamDocument.toJson()); - } else { - - Optional valueDocument = Optional.empty(); - - boolean isTombstoneEvent = - publishFullDocumentOnlyTombstoneOnDelete - && !changeStreamDocument.containsKey(FULL_DOCUMENT); - if (publishFullDocumentOnly) { - if (changeStreamDocument.containsKey(FULL_DOCUMENT) - && changeStreamDocument.get(FULL_DOCUMENT).isDocument()) { - valueDocument = Optional.of(changeStreamDocument.getDocument(FULL_DOCUMENT)); - } - } else { - valueDocument = Optional.of(changeStreamDocument); - } - - if (valueDocument.isPresent() || isTombstoneEvent) { - BsonDocument valueDoc = valueDocument.orElse(new BsonDocument()); - LOGGER.trace("Adding {} to {}: {}", valueDoc, topicName, sourceOffset); - - if (valueDoc instanceof RawBsonDocument) { - int sizeBytes = ((RawBsonDocument) valueDoc).getByteBuffer().limit(); - statisticsManager.currentStatistics().getMongodbBytesRead().sample(sizeBytes); - } - - BsonDocument keyDocument; - if (sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA) { - keyDocument = changeStreamDocument; - } else if (sourceConfig.getBoolean(DOCUMENT_KEY_AS_KEY_CONFIG) - && changeStreamDocument.containsKey(DOCUMENT_KEY_FIELD)) { - keyDocument = changeStreamDocument.getDocument(DOCUMENT_KEY_FIELD); - } else { - keyDocument = new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD)); - } - - createSourceRecord( - keySchemaAndValueProducer, - isTombstoneEvent - ? TOMBSTONE_SCHEMA_AND_VALUE_PRODUCER - : valueSchemaAndValueProducer, - sourceOffset, - topicName, - keyDocument, - valueDoc) - .map(sourceRecords::add); - } - } - }); + // if isCopying is true, we want to set the COPY_KEY flag so that kafka has context that a + // copy is in progress. However, for the last document that we are copying, we should not set + // this flag because the copy has completed, otherwise we are relying on future change stream + // events to signify that we are no longer copying. We also need to set the _id field to be a + // valid resume token, which during copying exists in the cachedResumeToken variable. + boolean lastDocument = !batchIterator.hasNext(); + boolean noMoreDataToCopy = copyDataManager != null && !copyDataManager.isCopying(); + if (isCopying && lastDocument && noMoreDataToCopy) { + sourceOffset.put(ID_FIELD, cachedResumeToken.toJson()); + sourceOffset.remove(COPY_KEY); + } + + String topicName = topicMapper.getTopic(changeStreamDocument); + if (topicName.isEmpty()) { + LOGGER.warn( + "No topic set. Could not publish the message: {}", changeStreamDocument.toJson()); + } else { + + Optional valueDocument = Optional.empty(); + + boolean isTombstoneEvent = + publishFullDocumentOnlyTombstoneOnDelete + && !changeStreamDocument.containsKey(FULL_DOCUMENT); + if (publishFullDocumentOnly) { + if (changeStreamDocument.containsKey(FULL_DOCUMENT) + && changeStreamDocument.get(FULL_DOCUMENT).isDocument()) { + valueDocument = Optional.of(changeStreamDocument.getDocument(FULL_DOCUMENT)); + } + } else { + valueDocument = Optional.of(changeStreamDocument); + } + + if (valueDocument.isPresent() || isTombstoneEvent) { + BsonDocument valueDoc = valueDocument.orElse(new BsonDocument()); + LOGGER.trace("Adding {} to {}: {}", valueDoc, topicName, sourceOffset); + + if (valueDoc instanceof RawBsonDocument) { + int sizeBytes = ((RawBsonDocument) valueDoc).getByteBuffer().limit(); + statisticsManager.currentStatistics().getMongodbBytesRead().sample(sizeBytes); + } + + BsonDocument keyDocument; + if (sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA) { + keyDocument = changeStreamDocument; + } else if (sourceConfig.getBoolean(DOCUMENT_KEY_AS_KEY_CONFIG) + && changeStreamDocument.containsKey(DOCUMENT_KEY_FIELD)) { + keyDocument = changeStreamDocument.getDocument(DOCUMENT_KEY_FIELD); + } else { + keyDocument = new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD)); + } + + createSourceRecord( + keySchemaAndValueProducer, + isTombstoneEvent + ? TOMBSTONE_SCHEMA_AND_VALUE_PRODUCER + : valueSchemaAndValueProducer, + sourceOffset, + topicName, + keyDocument, + valueDoc) + .map(sourceRecords::add); + } + } + } LOGGER.debug("Return batch of {}", sourceRecords.size()); if (sourceRecords.isEmpty()) { From 20fad7e837807d133c8123e83873e682ff3fcdb2 Mon Sep 17 00:00:00 2001 From: Calvin Thomas Nix Date: Thu, 17 Oct 2024 12:19:20 -0400 Subject: [PATCH 2/4] refactor per code review comments & add e2e test --- .../MongoSourceTaskIntegrationTest.java | 87 +++++++++++++++++-- .../source/StartedMongoSourceTask.java | 3 +- 2 files changed, 83 insertions(+), 7 deletions(-) diff --git a/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java b/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java index 133d2e0d6..e106ff8ca 100644 --- a/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java +++ b/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java @@ -36,6 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -186,14 +187,14 @@ void testSourceLoadsDataFromMongoClientWithCopyExisting() { assertTrue( firstPoll.stream() .map(SourceRecord::sourceOffset) - .limit(149) + .limit(150 - 1) // exclude the last record .allMatch(i -> i.containsKey("copy"))), // make sure that the last copied element does not have the "copy" key () -> assertTrue( firstPoll.stream() .map(SourceRecord::sourceOffset) - .skip(149) + .skip(150 - 1) // exclude the last record .findFirst() .filter(i -> !i.containsKey("copy")) .isPresent())); @@ -558,7 +559,7 @@ void testCopyingExistingWithARestartMidwayThrough() { put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName()); put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue()); put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "25"); - put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "10000"); + put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "1000"); } }; @@ -570,6 +571,17 @@ void testCopyingExistingWithARestartMidwayThrough() { assertTrue( firstPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy"))); + Map lastOffset = firstPoll.get(25 - 1).sourceOffset(); + + // mock the context so that on restart we know where the last task left off + when(context.offsetStorageReader()).thenReturn(offsetStorageReader); + assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object + @SuppressWarnings("unchecked") + Map mockedOffset = (Map) lastOffset; + when(offsetStorageReader.offset(any())).thenReturn(mockedOffset); + task.initialize(context); + + // perform a restart task.stop(); task.start(cfg); @@ -584,13 +596,13 @@ void testCopyingExistingWithARestartMidwayThrough() { assertTrue( thirdPoll.stream() .map(SourceRecord::sourceOffset) - .limit(24) + .limit(25 - 1) // exclude the last record in the batch .allMatch(i -> i.containsKey("copy"))); // Make sure the last copied element does not contain the "copy" key assertTrue( thirdPoll.stream() .map(SourceRecord::sourceOffset) - .skip(24) + .skip(25 - 1) // exclude the last record in the batch .findFirst() .filter(i -> !i.containsKey("copy")) .isPresent()); @@ -605,6 +617,71 @@ void testCopyingExistingWithARestartMidwayThrough() { } } + @Test + @DisplayName("Copy existing with a restart after finishing") + void testCopyingExistingWithARestartAfterFinishing() { + try (AutoCloseableSourceTask task = createSourceTask()) { + + MongoCollection coll = getCollection(); + + HashMap cfg = + new HashMap() { + { + put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName()); + put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName()); + put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue()); + put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "25"); + put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "1000"); + } + }; + + insertMany(rangeClosed(1, 50), coll); + task.start(cfg); + + List firstPoll = getNextBatch(task); + assertSourceRecordValues(createInserts(1, 25), firstPoll, coll); + assertTrue( + firstPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy"))); + + List secondPoll = getNextBatch(task); + assertSourceRecordValues(createInserts(26, 50), secondPoll, coll); + // Make sure all elements, except the last one, contains the "copy" key + assertTrue( + secondPoll.stream() + .map(SourceRecord::sourceOffset) + .limit(25 - 1) // exclude the last record in the batch + .allMatch(i -> i.containsKey("copy"))); + + Map lastOffset = secondPoll.get(25 - 1).sourceOffset(); + + // Make sure the last copied element does not contain the "copy" key + assertFalse(lastOffset.containsKey("copy")); + + // mock the context so that on restart we know where the last task left off + when(context.offsetStorageReader()).thenReturn(offsetStorageReader); + assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object + @SuppressWarnings("unchecked") + Map mockedOffset = (Map) lastOffset; + when(offsetStorageReader.offset(any())).thenReturn(mockedOffset); + task.initialize(context); + + // perform a restart + task.stop(); + task.start(cfg); + + // make sure that a copy doesn't occur again because all data was already copied + assertTrue(getNextBatch(task).isEmpty()); + + // make sure that we can continue to process data + insertMany(rangeClosed(51, 75), coll); + + List thirdPoll = getNextBatch(task); + assertSourceRecordValues(createInserts(51, 75), thirdPoll, coll); + assertFalse( + thirdPoll.stream().map(SourceRecord::sourceOffset).anyMatch(i -> i.containsKey("copy"))); + } + } + @Test @DisplayName("Ensure source loads data from collection and outputs documents only") void testSourceLoadsDataFromCollectionDocumentOnly() { diff --git a/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java b/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java index 98305671e..1480f92a1 100644 --- a/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java +++ b/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java @@ -212,9 +212,8 @@ private List pollInternal() { List sourceRecords = new ArrayList<>(); Iterator batchIterator = getNextBatch().iterator(); - BsonDocument changeStreamDocument; while (batchIterator.hasNext()) { - changeStreamDocument = batchIterator.next(); + BsonDocument changeStreamDocument = batchIterator.next(); Map sourceOffset = new HashMap<>(); sourceOffset.put(ID_FIELD, changeStreamDocument.getDocument(ID_FIELD).toJson()); if (isCopying) { From 2d67b01d55dc62e80975cfa6af0e09aefb97099c Mon Sep 17 00:00:00 2001 From: Calvin Thomas Nix Date: Mon, 21 Oct 2024 09:15:03 -0400 Subject: [PATCH 3/4] add null check for cachedResumeToken --- .../mongodb/kafka/connect/source/StartedMongoSourceTask.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java b/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java index 1480f92a1..ad1f4e4cb 100644 --- a/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java +++ b/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java @@ -225,9 +225,11 @@ private List pollInternal() { // this flag because the copy has completed, otherwise we are relying on future change stream // events to signify that we are no longer copying. We also need to set the _id field to be a // valid resume token, which during copying exists in the cachedResumeToken variable. + // In version 3.6 of mongodb the cachedResumeToken initializes to null so we need to avoid + // this null pointer exception. boolean lastDocument = !batchIterator.hasNext(); boolean noMoreDataToCopy = copyDataManager != null && !copyDataManager.isCopying(); - if (isCopying && lastDocument && noMoreDataToCopy) { + if (isCopying && lastDocument && noMoreDataToCopy && cachedResumeToken != null) { sourceOffset.put(ID_FIELD, cachedResumeToken.toJson()); sourceOffset.remove(COPY_KEY); } From f464753056bc955f860615b7279885417b0382d6 Mon Sep 17 00:00:00 2001 From: Calvin Thomas Nix Date: Mon, 21 Oct 2024 09:54:18 -0400 Subject: [PATCH 4/4] skip copy_existing tests for v3.6 --- .../kafka/connect/source/MongoSourceTaskIntegrationTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java b/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java index e106ff8ca..59526df77 100644 --- a/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java +++ b/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java @@ -548,6 +548,7 @@ void testSourceCanUseCustomOffsetPartitionNames() { @Test @DisplayName("Copy existing with a restart midway through") void testCopyingExistingWithARestartMidwayThrough() { + assumeTrue(isGreaterThanThreeDotSix()); try (AutoCloseableSourceTask task = createSourceTask()) { MongoCollection coll = getCollection(); @@ -620,6 +621,7 @@ void testCopyingExistingWithARestartMidwayThrough() { @Test @DisplayName("Copy existing with a restart after finishing") void testCopyingExistingWithARestartAfterFinishing() { + assumeTrue(isGreaterThanThreeDotSix()); try (AutoCloseableSourceTask task = createSourceTask()) { MongoCollection coll = getCollection();