Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-428: Mark the copy complete in the source offset for the last copied document #168

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -169,6 +170,9 @@ void testSourceLoadsDataFromMongoClientWithCopyExisting() {
new HashMap<String, String>() {
{
put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue());
String namespaceRegex =
String.format("(%s\\.coll|%s\\.coll)", db1.getName(), db2.getName());
put(MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG, namespaceRegex);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[driveby] without this filter this test tries to copy all data

}
};
task.start(cfg);
Expand All @@ -178,11 +182,22 @@ void testSourceLoadsDataFromMongoClientWithCopyExisting() {
() -> assertEquals(150, firstPoll.size()),
() -> assertSourceRecordValues(createInserts(1, 75), firstPoll, coll1),
() -> assertSourceRecordValues(createInserts(1, 75), firstPoll, coll2),
// make sure all elements, except the last, contains the "copy" key
() ->
assertTrue(
firstPoll.stream()
.map(SourceRecord::sourceOffset)
.allMatch(i -> i.containsKey("copy"))));
.limit(150 - 1) // exclude the last record
.allMatch(i -> i.containsKey("copy"))),
// make sure that the last copied element does not have the "copy" key
() ->
assertTrue(
firstPoll.stream()
.map(SourceRecord::sourceOffset)
.skip(150 - 1) // exclude the last record
.findFirst()
.filter(i -> !i.containsKey("copy"))
.isPresent()));

assertNull(task.poll());

Expand Down Expand Up @@ -533,6 +548,7 @@ void testSourceCanUseCustomOffsetPartitionNames() {
@Test
@DisplayName("Copy existing with a restart midway through")
void testCopyingExistingWithARestartMidwayThrough() {
assumeTrue(isGreaterThanThreeDotSix());
try (AutoCloseableSourceTask task = createSourceTask()) {

MongoCollection<Document> coll = getCollection();
Expand All @@ -544,7 +560,7 @@ void testCopyingExistingWithARestartMidwayThrough() {
put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue());
put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "25");
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "10000");
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "1000");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[driveby] because this test expects an empty batch we should reduce the poll await time even further, otherwise we're guaranteeing to wait for at least 5x10000ms. This just speeds the test up.

}
};

Expand All @@ -556,6 +572,17 @@ void testCopyingExistingWithARestartMidwayThrough() {
assertTrue(
firstPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy")));

Map<String, ?> lastOffset = firstPoll.get(25 - 1).sourceOffset();

// mock the context so that on restart we know where the last task left off
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object
@SuppressWarnings("unchecked")
Map<String, Object> mockedOffset = (Map<String, Object>) lastOffset;
Comment on lines +579 to +581
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this code is just to make the (Map<String, Object>) lastOffset cast safer.. since this is just a test we could probably skip the null check.

when(offsetStorageReader.offset(any())).thenReturn(mockedOffset);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[driveby] I included mocking logic in this test because otherwise the code will always re-copy the data, but not because it sees the expected copy offset, but because the context is null.

task.initialize(context);

// perform a restart
task.stop();
task.start(cfg);

Expand All @@ -566,8 +593,20 @@ void testCopyingExistingWithARestartMidwayThrough() {

List<SourceRecord> thirdPoll = getNextBatch(task);
assertSourceRecordValues(createInserts(26, 50), thirdPoll, coll);
// Make sure all elements, except the last one, contains the "copy" key
assertTrue(
thirdPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy")));
thirdPoll.stream()
.map(SourceRecord::sourceOffset)
.limit(25 - 1) // exclude the last record in the batch
.allMatch(i -> i.containsKey("copy")));
// Make sure the last copied element does not contain the "copy" key
assertTrue(
thirdPoll.stream()
.map(SourceRecord::sourceOffset)
.skip(25 - 1) // exclude the last record in the batch
.findFirst()
.filter(i -> !i.containsKey("copy"))
.isPresent());

Calvinnix marked this conversation as resolved.
Show resolved Hide resolved
assertTrue(getNextBatch(task).isEmpty());
insertMany(rangeClosed(51, 75), coll);
Expand All @@ -579,6 +618,72 @@ void testCopyingExistingWithARestartMidwayThrough() {
}
}

@Test
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the integration tests are failing for version 3.6 which I'm assuming has something to do with changestream/resumetoken support. I'm wondering if I should just remove this test (and potentially replace with newer versions of mongodb)
EDIT: I added some logging in this patch and found that for version 3.6 the cachedResumeToken is initially stored as null. It might be worth including a null check in the if condition for this logic to avoid these scenarios.

Moving this thread here. I agree with adding a null check. I don't expect customers to be using v3.6 in production, but it's still worth avoiding the null pointer exception. Out of curiosity, did you find any valuable details from the logs you've added?

Copy link
Collaborator Author

@Calvinnix Calvinnix Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the logs showed that cachedResumeToken was being initialized as null. (search for CALVIN: if you wanted to review the logs)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I just added the null check, I'll let the tests run but I do expect the test to still fail for version 3.6 because it doesn't have a resume token to set and resume from.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DisplayName("Copy existing with a restart after finishing")
void testCopyingExistingWithARestartAfterFinishing() {
assumeTrue(isGreaterThanThreeDotSix());
try (AutoCloseableSourceTask task = createSourceTask()) {

MongoCollection<Document> coll = getCollection();

HashMap<String, String> cfg =
new HashMap<String, String>() {
{
put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue());
put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "25");
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "1000");
}
};

insertMany(rangeClosed(1, 50), coll);
task.start(cfg);

List<SourceRecord> firstPoll = getNextBatch(task);
assertSourceRecordValues(createInserts(1, 25), firstPoll, coll);
assertTrue(
firstPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy")));

List<SourceRecord> secondPoll = getNextBatch(task);
assertSourceRecordValues(createInserts(26, 50), secondPoll, coll);
// Make sure all elements, except the last one, contains the "copy" key
assertTrue(
secondPoll.stream()
.map(SourceRecord::sourceOffset)
.limit(25 - 1) // exclude the last record in the batch
.allMatch(i -> i.containsKey("copy")));

Map<String, ?> lastOffset = secondPoll.get(25 - 1).sourceOffset();

// Make sure the last copied element does not contain the "copy" key
assertFalse(lastOffset.containsKey("copy"));

// mock the context so that on restart we know where the last task left off
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as the one above for mocking offset, we could remove this line if we want.

@SuppressWarnings("unchecked")
Map<String, Object> mockedOffset = (Map<String, Object>) lastOffset;
when(offsetStorageReader.offset(any())).thenReturn(mockedOffset);
task.initialize(context);

// perform a restart
task.stop();
task.start(cfg);

// make sure that a copy doesn't occur again because all data was already copied
assertTrue(getNextBatch(task).isEmpty());

// make sure that we can continue to process data
insertMany(rangeClosed(51, 75), coll);

List<SourceRecord> thirdPoll = getNextBatch(task);
assertSourceRecordValues(createInserts(51, 75), thirdPoll, coll);
assertFalse(
thirdPoll.stream().map(SourceRecord::sourceOffset).anyMatch(i -> i.containsKey("copy")));
}
}

@Test
@DisplayName("Ensure source loads data from collection and outputs documents only")
void testSourceLoadsDataFromCollectionDocumentOnly() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -210,68 +211,81 @@ private List<SourceRecord> pollInternal() {
createValueSchemaAndValueProvider(sourceConfig);

List<SourceRecord> sourceRecords = new ArrayList<>();
getNextBatch()
.forEach(
changeStreamDocument -> {
Map<String, String> sourceOffset = new HashMap<>();
sourceOffset.put(ID_FIELD, changeStreamDocument.getDocument(ID_FIELD).toJson());
if (isCopying) {
sourceOffset.put(COPY_KEY, "true");
}
Iterator<BsonDocument> batchIterator = getNextBatch().iterator();
while (batchIterator.hasNext()) {
BsonDocument changeStreamDocument = batchIterator.next();
Map<String, String> sourceOffset = new HashMap<>();
sourceOffset.put(ID_FIELD, changeStreamDocument.getDocument(ID_FIELD).toJson());
if (isCopying) {
sourceOffset.put(COPY_KEY, "true");
}

String topicName = topicMapper.getTopic(changeStreamDocument);
if (topicName.isEmpty()) {
LOGGER.warn(
"No topic set. Could not publish the message: {}",
changeStreamDocument.toJson());
} else {

Optional<BsonDocument> valueDocument = Optional.empty();

boolean isTombstoneEvent =
publishFullDocumentOnlyTombstoneOnDelete
&& !changeStreamDocument.containsKey(FULL_DOCUMENT);
if (publishFullDocumentOnly) {
if (changeStreamDocument.containsKey(FULL_DOCUMENT)
&& changeStreamDocument.get(FULL_DOCUMENT).isDocument()) {
valueDocument = Optional.of(changeStreamDocument.getDocument(FULL_DOCUMENT));
}
} else {
valueDocument = Optional.of(changeStreamDocument);
}

if (valueDocument.isPresent() || isTombstoneEvent) {
BsonDocument valueDoc = valueDocument.orElse(new BsonDocument());
LOGGER.trace("Adding {} to {}: {}", valueDoc, topicName, sourceOffset);

if (valueDoc instanceof RawBsonDocument) {
int sizeBytes = ((RawBsonDocument) valueDoc).getByteBuffer().limit();
statisticsManager.currentStatistics().getMongodbBytesRead().sample(sizeBytes);
}

BsonDocument keyDocument;
if (sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA) {
keyDocument = changeStreamDocument;
} else if (sourceConfig.getBoolean(DOCUMENT_KEY_AS_KEY_CONFIG)
&& changeStreamDocument.containsKey(DOCUMENT_KEY_FIELD)) {
keyDocument = changeStreamDocument.getDocument(DOCUMENT_KEY_FIELD);
} else {
keyDocument = new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD));
}

createSourceRecord(
keySchemaAndValueProducer,
isTombstoneEvent
? TOMBSTONE_SCHEMA_AND_VALUE_PRODUCER
: valueSchemaAndValueProducer,
sourceOffset,
topicName,
keyDocument,
valueDoc)
.map(sourceRecords::add);
}
}
});
// if isCopying is true, we want to set the COPY_KEY flag so that kafka has context that a
// copy is in progress. However, for the last document that we are copying, we should not set
// this flag because the copy has completed, otherwise we are relying on future change stream
// events to signify that we are no longer copying. We also need to set the _id field to be a
// valid resume token, which during copying exists in the cachedResumeToken variable.
// In version 3.6 of mongodb the cachedResumeToken initializes to null so we need to avoid
// this null pointer exception.
boolean lastDocument = !batchIterator.hasNext();
boolean noMoreDataToCopy = copyDataManager != null && !copyDataManager.isCopying();
if (isCopying && lastDocument && noMoreDataToCopy && cachedResumeToken != null) {
sourceOffset.put(ID_FIELD, cachedResumeToken.toJson());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great find! Should we add a test that confirms we're setting the expected resume token?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we got coverage of this with the recent e2e tests I added around restarting after copying. Let me know if you disagree.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of cachedResumeToken , it might be safer to create and use a different variable here that contains the same value as cachedResumeToken but isn't modified anywhere else. cachedResumeToken get's set to null (here) within getResumeToken, which ends up being called here if cursor == null. The cursor could be set to null if the cluster runs into a temporary connection issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a good catch, I'll create a new token called resumeTokenAfterCopy

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait I think we're actually good here, I'm happy to create a new variable for extra safety though if you're still concerned.

because we are assigning this in the copied batch the cursor == null code won't be hit until after we use and store the cachedResumeToken. i.e. the getNextBatch function exits early here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'm fine with keeping it as is as well.

sourceOffset.remove(COPY_KEY);
}

String topicName = topicMapper.getTopic(changeStreamDocument);
if (topicName.isEmpty()) {
LOGGER.warn(
"No topic set. Could not publish the message: {}", changeStreamDocument.toJson());
} else {

Optional<BsonDocument> valueDocument = Optional.empty();

boolean isTombstoneEvent =
publishFullDocumentOnlyTombstoneOnDelete
&& !changeStreamDocument.containsKey(FULL_DOCUMENT);
if (publishFullDocumentOnly) {
if (changeStreamDocument.containsKey(FULL_DOCUMENT)
&& changeStreamDocument.get(FULL_DOCUMENT).isDocument()) {
valueDocument = Optional.of(changeStreamDocument.getDocument(FULL_DOCUMENT));
}
} else {
valueDocument = Optional.of(changeStreamDocument);
}

if (valueDocument.isPresent() || isTombstoneEvent) {
BsonDocument valueDoc = valueDocument.orElse(new BsonDocument());
LOGGER.trace("Adding {} to {}: {}", valueDoc, topicName, sourceOffset);

if (valueDoc instanceof RawBsonDocument) {
int sizeBytes = ((RawBsonDocument) valueDoc).getByteBuffer().limit();
statisticsManager.currentStatistics().getMongodbBytesRead().sample(sizeBytes);
}

BsonDocument keyDocument;
if (sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA) {
keyDocument = changeStreamDocument;
} else if (sourceConfig.getBoolean(DOCUMENT_KEY_AS_KEY_CONFIG)
&& changeStreamDocument.containsKey(DOCUMENT_KEY_FIELD)) {
keyDocument = changeStreamDocument.getDocument(DOCUMENT_KEY_FIELD);
} else {
keyDocument = new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD));
}

createSourceRecord(
keySchemaAndValueProducer,
isTombstoneEvent
? TOMBSTONE_SCHEMA_AND_VALUE_PRODUCER
: valueSchemaAndValueProducer,
sourceOffset,
topicName,
keyDocument,
valueDoc)
.map(sourceRecords::add);
}
}
}
LOGGER.debug("Return batch of {}", sourceRecords.size());

if (sourceRecords.isEmpty()) {
Expand Down