Skip to content

Commit

Permalink
[mongodb][hotfix] Fix drop or rename record cause documentKey being e…
Browse files Browse the repository at this point in the history
…mpty. (apache#2210)
  • Loading branch information
Jiabao-Sun authored and zhangchaoming.zcm committed Jan 3, 2025
1 parent 1a93bb0 commit bddc0ba
Showing 1 changed file with 39 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.changestream.OperationType;
import com.mongodb.kafka.connect.source.heartbeat.HeartbeatManager;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
Expand Down Expand Up @@ -57,6 +58,7 @@
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.NAMESPACE_DATABASE_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.NAMESPACE_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OPERATION_TYPE_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.SNAPSHOT_KEY_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.SOURCE_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.TIMESTAMP_KEY_FIELD;
Expand Down Expand Up @@ -134,25 +136,38 @@ public void execute(Context context) throws Exception {
nextUpdate = time.milliseconds() + sourceConfig.getPollAwaitTimeMillis();
} else {
BsonDocument changeStreamDocument = next.get();
MongoNamespace namespace = getMongoNamespace(changeStreamDocument);

BsonDocument resumeToken = changeStreamDocument.getDocument(ID_FIELD);
BsonDocument valueDocument =
normalizeChangeStreamDocument(changeStreamDocument);

LOG.trace("Adding {} to {}", valueDocument, namespace.getFullName());

changeRecord =
createSourceRecord(
createPartitionMap(
sourceConfig.getScheme(),
sourceConfig.getHosts(),
namespace.getDatabaseName(),
namespace.getCollectionName()),
createSourceOffsetMap(resumeToken, false),
namespace.getFullName(),
changeStreamDocument.getDocument(ID_FIELD),
valueDocument);
OperationType operationType = getOperationType(changeStreamDocument);

switch (operationType) {
case INSERT:
case UPDATE:
case REPLACE:
case DELETE:
MongoNamespace namespace = getMongoNamespace(changeStreamDocument);

BsonDocument resumeToken = changeStreamDocument.getDocument(ID_FIELD);
BsonDocument valueDocument =
normalizeChangeStreamDocument(changeStreamDocument);

LOG.trace("Adding {} to {}", valueDocument, namespace.getFullName());

changeRecord =
createSourceRecord(
createPartitionMap(
sourceConfig.getScheme(),
sourceConfig.getHosts(),
namespace.getDatabaseName(),
namespace.getCollectionName()),
createSourceOffsetMap(resumeToken, false),
namespace.getFullName(),
changeStreamDocument.getDocument(ID_FIELD),
valueDocument);
break;
default:
// Ignore drop、drop_database、rename and other record to prevent
// documentKey from being empty.
LOG.info("Ignored {} record: {}", operationType, changeStreamDocument);
}
}

if (changeRecord != null) {
Expand Down Expand Up @@ -347,6 +362,11 @@ private MongoNamespace getMongoNamespace(BsonDocument changeStreamDocument) {
ns.getString(NAMESPACE_COLLECTION_FIELD).getValue());
}

private OperationType getOperationType(BsonDocument changeStreamDocument) {
return OperationType.fromString(
changeStreamDocument.getString(OPERATION_TYPE_FIELD).getValue());
}

private boolean isBoundedRead() {
return !NO_STOPPING_OFFSET.equals(streamSplit.getEndingOffset());
}
Expand Down

0 comments on commit bddc0ba

Please sign in to comment.