diff --git a/src/integrationTest/java/com/mongodb/kafka/connect/mongodb/MongoKafkaTestCase.java b/src/integrationTest/java/com/mongodb/kafka/connect/mongodb/MongoKafkaTestCase.java index 96f120800..3a51fb8bf 100644 --- a/src/integrationTest/java/com/mongodb/kafka/connect/mongodb/MongoKafkaTestCase.java +++ b/src/integrationTest/java/com/mongodb/kafka/connect/mongodb/MongoKafkaTestCase.java @@ -114,6 +114,7 @@ public boolean isReplicaSetOrSharded() { private static final int FOUR_DOT_TWO_WIRE_VERSION = 8; public static final int FOUR_DOT_FOUR_WIRE_VERSION = 9; private static final int SIX_DOT_ZERO_WIRE_VERSION = 17; + private static final int SEVEN_DOT_ZERO_WIRE_VERSION = 21; public boolean isGreaterThanThreeDotSix() { return getMaxWireVersion() > THREE_DOT_SIX_WIRE_VERSION; @@ -135,6 +136,10 @@ public boolean isAtLeastSixDotZero() { return getMaxWireVersion() >= SIX_DOT_ZERO_WIRE_VERSION; } + public boolean isAtLeastSevenDotZero() { + return getMaxWireVersion() >= SEVEN_DOT_ZERO_WIRE_VERSION; + } + public int getMaxWireVersion() { Document isMaster = MONGODB diff --git a/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java b/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java index 59526df77..496900ba8 100644 --- a/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java +++ b/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java @@ -46,6 +46,7 @@ import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -1053,6 +1054,144 @@ void testFullDocumentBeforeChange() { } } + @Test + @DisplayName("Ensure disambiguatedPaths exist when showExpandedEvents is true") + void testDisambiguatedPathsExistWhenShowExpandedEventsIsTrue() { + assumeTrue(isAtLeastSevenDotZero()); + MongoDatabase db = getDatabaseWithPostfix(); + try (AutoCloseableSourceTask task = createSourceTask()) { + MongoCollection coll = db.getCollection("coll"); + coll.drop(); + db.createCollection(coll.getNamespace().getCollectionName(), new CreateCollectionOptions()); + HashMap cfg = new HashMap<>(); + cfg.put( + MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG, + OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT)); + cfg.put(MongoSourceConfig.SHOW_EXPANDED_EVENTS_CONFIG, "true"); + task.start(cfg); + int id = 0; + Document expected = new Document("_id", id); + coll.insertOne(expected); + coll.updateOne(Filters.eq(id), Document.parse("{ $set: { foo: 1 } }")); + coll.deleteOne(Filters.eq(id)); + List records = getNextResults(task); + assertEquals(3, records.size()); + Struct update = (Struct) records.get(1).value(); + assertEquals(OperationType.UPDATE.getValue(), update.getString("operationType")); + Struct updateDescription = (Struct) update.get("updateDescription"); + assertEquals("{}", updateDescription.getString("disambiguatedPaths")); + } finally { + db.drop(); + } + } + + @Test + @DisplayName("Ensure disambiguatedPaths don't exist when showExpandedEvents is false") + void testDisambiguatedPathsDontExistWhenShowExpandedEventsIsTrue() { + assumeTrue(isAtLeastSevenDotZero()); + MongoDatabase db = getDatabaseWithPostfix(); + try (AutoCloseableSourceTask task = createSourceTask()) { + MongoCollection coll = db.getCollection("coll"); + coll.drop(); + db.createCollection(coll.getNamespace().getCollectionName(), new CreateCollectionOptions()); + HashMap cfg = new HashMap<>(); + cfg.put( + MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG, + OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT)); + cfg.put(MongoSourceConfig.SHOW_EXPANDED_EVENTS_CONFIG, "false"); + task.start(cfg); + int id = 0; + Document expected = new Document("_id", id); + coll.insertOne(expected); + coll.updateOne(Filters.eq(id), Document.parse("{ $set: { foo: 1 } }")); + coll.deleteOne(Filters.eq(id)); + List records = getNextResults(task); + assertEquals(3, records.size()); + Struct update = (Struct) records.get(1).value(); + assertEquals(OperationType.UPDATE.getValue(), update.getString("operationType")); + Struct updateDescription = (Struct) update.get("updateDescription"); + assertNull(updateDescription.getString("disambiguatedPaths")); + } finally { + db.drop(); + } + } + + @Test + @DisplayName("Ensure disambiguatedPaths don't exist by default") + void testDisambiguatedPathsDontExistByDefault() { + assumeTrue(isAtLeastSevenDotZero()); + MongoDatabase db = getDatabaseWithPostfix(); + try (AutoCloseableSourceTask task = createSourceTask()) { + MongoCollection coll = db.getCollection("coll"); + coll.drop(); + db.createCollection(coll.getNamespace().getCollectionName(), new CreateCollectionOptions()); + HashMap cfg = new HashMap<>(); + cfg.put( + MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG, + OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT)); + task.start(cfg); + int id = 0; + Document expected = new Document("_id", id); + coll.insertOne(expected); + coll.updateOne(Filters.eq(id), Document.parse("{ $set: { foo: 1 } }")); + coll.deleteOne(Filters.eq(id)); + List records = getNextResults(task); + assertEquals(3, records.size()); + Struct update = (Struct) records.get(1).value(); + assertEquals(OperationType.UPDATE.getValue(), update.getString("operationType")); + Struct updateDescription = (Struct) update.get("updateDescription"); + assertNull(updateDescription.getString("disambiguatedPaths")); + } finally { + db.drop(); + } + } + + @Test + @DisplayName("Ensure truncatedArrays works") + void testTruncatedArrays() { + assumeTrue(isAtLeastSixDotZero()); + MongoDatabase db = getDatabaseWithPostfix(); + try (AutoCloseableSourceTask task = createSourceTask()) { + MongoCollection coll = db.getCollection("coll"); + coll.drop(); + db.createCollection(coll.getNamespace().getCollectionName(), new CreateCollectionOptions()); + HashMap cfg = new HashMap<>(); + cfg.put( + MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG, + OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT)); + task.start(cfg); + int id = 0; + Document expected = + new Document("_id", id) + .append("items", Arrays.asList(2, 30, 5, 10, 11, 100, 200, 250, 300, 5, 600)); + coll.insertOne(expected); + coll.updateOne( + Filters.eq(id), + singletonList(Document.parse("{ $set: { items: [2,30,5,10,11,100,200,250,300,5] } }"))); + coll.deleteOne(Filters.eq(id)); + List records = getNextResults(task); + assertEquals(3, records.size()); + Struct update = (Struct) records.get(1).value(); + assertEquals(OperationType.UPDATE.getValue(), update.getString("operationType")); + Struct updateDescription = (Struct) update.get("updateDescription"); + + Schema schema = + SchemaBuilder.struct() + .name("truncatedArray") + .field("field", Schema.STRING_SCHEMA) + .field("newSize", Schema.INT32_SCHEMA) + .build(); + + Struct truncatedArrayStruct = new Struct(schema).put("field", "items").put("newSize", 10); + + List expectedTruncatedArray = new ArrayList<>(); + expectedTruncatedArray.add(truncatedArrayStruct); + assertEquals(expectedTruncatedArray, updateDescription.getArray("truncatedArrays")); + } finally { + db.drop(); + } + } + /** * We insert a document into a collection before starting the {@link MongoSourceTask}, yet we * observe the change due to specifying {@link diff --git a/src/main/java/com/mongodb/kafka/connect/sink/cdc/mongodb/operations/OperationHelper.java b/src/main/java/com/mongodb/kafka/connect/sink/cdc/mongodb/operations/OperationHelper.java index 5e6928286..018404729 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/cdc/mongodb/operations/OperationHelper.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/cdc/mongodb/operations/OperationHelper.java @@ -37,8 +37,10 @@ final class OperationHelper { private static final String UPDATE_DESCRIPTION = "updateDescription"; private static final String UPDATED_FIELDS = "updatedFields"; private static final String REMOVED_FIELDS = "removedFields"; + private static final String TRUNCATED_ARRAYS = "truncatedArrays"; + private static final String DISAMBIGUATED_PATHS = "disambiguatedPaths"; private static final Set UPDATE_DESCRIPTION_FIELDS = - new HashSet<>(asList(UPDATED_FIELDS, REMOVED_FIELDS)); + new HashSet<>(asList(UPDATED_FIELDS, REMOVED_FIELDS, TRUNCATED_ARRAYS, DISAMBIGUATED_PATHS)); private static final String SET = "$set"; private static final String UNSET = "$unset"; @@ -125,6 +127,26 @@ static BsonDocument getUpdateDocument(final BsonDocument changeStreamDocument) { REMOVED_FIELDS, updateDescription.get(REMOVED_FIELDS), updateDescription.toJson())); } + if (updateDescription.containsKey(TRUNCATED_ARRAYS) + && !updateDescription.get(TRUNCATED_ARRAYS).isArray()) { + throw new DataException( + format( + "Unexpected %s field type, expected an array but found `%s`: %s", + TRUNCATED_ARRAYS, + updateDescription.get(TRUNCATED_ARRAYS), + updateDescription.toJson())); + } + + if (updateDescription.containsKey(DISAMBIGUATED_PATHS) + && !updateDescription.get(DISAMBIGUATED_PATHS).isDocument()) { + throw new DataException( + format( + "Unexpected %s field type, expected an array but found `%s`: %s", + DISAMBIGUATED_PATHS, + updateDescription.get(DISAMBIGUATED_PATHS), + updateDescription.toJson())); + } + BsonDocument updatedFields = updateDescription.getDocument(UPDATED_FIELDS); BsonArray removedFields = updateDescription.getArray(REMOVED_FIELDS); BsonDocument unsetDocument = new BsonDocument(); @@ -132,7 +154,7 @@ static BsonDocument getUpdateDocument(final BsonDocument changeStreamDocument) { if (!removedField.isString()) { throw new DataException( format( - "Unexpected value type in %s, expected an string but found `%s`: %s", + "Unexpected value type in %s, expected a string but found `%s`: %s", REMOVED_FIELDS, removedField, updateDescription.toJson())); } unsetDocument.append(removedField.asString().getValue(), EMPTY_STRING); diff --git a/src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java b/src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java index 4a4aeb9c3..310ba06fd 100644 --- a/src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java +++ b/src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java @@ -323,6 +323,21 @@ public class MongoSourceConfig extends AbstractConfig { + "See https://www.mongodb.com/docs/manual/reference/method/db.collection.watch/ for more details and possible values."; private static final String FULL_DOCUMENT_DEFAULT = EMPTY_STRING; + public static final String SHOW_EXPANDED_EVENTS_CONFIG = "change.stream.show.expanded.events"; + private static final String SHOW_EXPANDED_EVENTS_DISPLAY = + "The `showExpandedEvents` configuration."; + private static final String SHOW_EXPANDED_EVENTS_DOC = + "Determines if change streams notifies for DDL events, like the createIndexes and dropIndexes events.\n" + + "New in version 6.0.\n" + + "See https://www.mongodb.com/docs/manual/reference/change-events/#std-label-change-streams-expanded-events for more " + + "details on showExpandedEvents.\n" + + "This setting is required to show updateDescription.disambiguatedPaths in update events, " + + "helping clarify changes that involve ambiguous fields.\n" + + "New in version 6.1.\n" + + "See https://www.mongodb.com/docs/manual/reference/change-events/update/#path-disambiguation for more details on " + + "disambiguatedPaths."; + private static final boolean SHOW_EXPANDED_EVENTS_DEFAULT = false; + public static final String COLLATION_CONFIG = "collation"; private static final String COLLATION_DISPLAY = "The collation options"; private static final String COLLATION_DOC = @@ -803,6 +818,10 @@ Optional getFullDocument() { } } + boolean getShowExpandedEvents() { + return getBoolean(SHOW_EXPANDED_EVENTS_CONFIG); + } + StartupConfig getStartupConfig() { StartupConfig result = startupConfig; if (result != null) { @@ -1092,6 +1111,17 @@ public Map validateAll(final Map props) { FULL_DOCUMENT_DISPLAY, Validators.EnumValidatorAndRecommender.in(FullDocument.values(), FullDocument::getValue)); + configDef.define( + SHOW_EXPANDED_EVENTS_CONFIG, + Type.BOOLEAN, + SHOW_EXPANDED_EVENTS_DEFAULT, + Importance.MEDIUM, + SHOW_EXPANDED_EVENTS_DOC, + group, + ++orderInGroup, + Width.MEDIUM, + SHOW_EXPANDED_EVENTS_DISPLAY); + configDef.define( COLLATION_CONFIG, Type.STRING, diff --git a/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java b/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java index e2efddec0..ece2db42d 100644 --- a/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java +++ b/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java @@ -681,6 +681,7 @@ private static ChangeStreamIterable getChangeStreamIterable( if (batchSize > 0) { changeStream.batchSize(batchSize); } + changeStream.showExpandedEvents(sourceConfig.getShowExpandedEvents()); sourceConfig.getFullDocumentBeforeChange().ifPresent(changeStream::fullDocumentBeforeChange); sourceConfig.getFullDocument().ifPresent(changeStream::fullDocument); sourceConfig.getCollation().ifPresent(changeStream::collation); diff --git a/src/main/java/com/mongodb/kafka/connect/source/schema/AvroSchemaDefaults.java b/src/main/java/com/mongodb/kafka/connect/source/schema/AvroSchemaDefaults.java index 1ffa947c7..61aa27d80 100644 --- a/src/main/java/com/mongodb/kafka/connect/source/schema/AvroSchemaDefaults.java +++ b/src/main/java/com/mongodb/kafka/connect/source/schema/AvroSchemaDefaults.java @@ -51,8 +51,15 @@ public final class AvroSchemaDefaults { + " \"type\": [{\"name\": \"updateDescription\", \"type\": \"record\", \"fields\": [" + " {\"name\": \"updatedFields\", \"type\": [\"string\", \"null\"]}," + " {\"name\": \"removedFields\"," - + " \"type\": [{\"type\": \"array\", \"items\": \"string\"}, \"null\"]" - + " }] }, \"null\"] }," + + " \"type\": [{\"type\": \"array\", \"items\": \"string\"}, \"null\"]}," + + " {\"name\": \"truncatedArrays\"," + + " \"type\": [{ \"type\":\"array\", \"items\": {\"type\": \"record\"," + + " \"name\": \"truncatedArray\", \"fields\": [" + + " {\"name\": \"field\", \"type\": \"string\"}," + + " {\"name\": \"newSize\", \"type\": \"int\"} ] }" + + " }, \"null\" ] }," + + " {\"name\": \"disambiguatedPaths\", \"type\": [\"string\", \"null\"]}" + + " ]}, \"null\"] }," + " { \"name\": \"clusterTime\", \"type\": [\"string\", \"null\"] }," + " { \"name\": \"txnNumber\", \"type\": [\"long\", \"null\"]}," + " { \"name\": \"lsid\", \"type\": [{\"name\": \"lsid\", \"type\": \"record\"," diff --git a/src/test/java/com/mongodb/kafka/connect/sink/cdc/mongodb/operations/UpdateTest.java b/src/test/java/com/mongodb/kafka/connect/sink/cdc/mongodb/operations/UpdateTest.java index b63fa5525..f0814914a 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/cdc/mongodb/operations/UpdateTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/cdc/mongodb/operations/UpdateTest.java @@ -52,7 +52,12 @@ class UpdateTest { + " updatedFields: {" + " email: 'alice@10gen.com'" + " }," - + " removedFields: ['phoneNumber']" + + " removedFields: ['phoneNumber']," + + " truncatedArrays: [{ field: 'foo', newSize: 1 }]," + + " disambiguatedPaths: {" + + " 'home.town': [ 'home.town' ]," + + " 'residences.0.0': [ 'residences', 0, '0' ]" + + " }" + " }," + " fullDocument: {" + " _id: ObjectId(\"58a4eb4a30c75625e00d2820\")," @@ -148,7 +153,7 @@ void testMissingChangeEventData() { new SinkDocument( null, BsonDocument.parse( - "{documentKey: {}, updateDescription: {updatedFields: 1}}")))), + "{documentKey: {}, updateDescription: {updatedFields: 1, removedFields: []}}")))), () -> assertThrows( DataException.class, @@ -157,7 +162,25 @@ void testMissingChangeEventData() { new SinkDocument( null, BsonDocument.parse( - "{documentKey: {}, updateDescription: {removedFields: 1}}")))), + "{documentKey: {}, updateDescription: {updatedFields: {}, removedFields: 1}}")))), + () -> + assertThrows( + DataException.class, + () -> + UPDATE.perform( + new SinkDocument( + null, + BsonDocument.parse( + "{documentKey: {}, updateDescription: {updatedFields: {}, removedFields: [], truncatedArrays: 1}}")))), + () -> + assertThrows( + DataException.class, + () -> + UPDATE.perform( + new SinkDocument( + null, + BsonDocument.parse( + "{documentKey: {}, updateDescription: {updatedFields: {}, removedFields: [], disambiguatedPaths: 1}}")))), () -> assertThrows( DataException.class, diff --git a/src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java b/src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java index 4c1d5b01a..c14b388e1 100644 --- a/src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java +++ b/src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java @@ -31,7 +31,9 @@ import java.math.BigDecimal; import java.util.Base64; +import java.util.Collections; import java.util.Date; +import java.util.List; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -342,6 +344,8 @@ static Struct generateExpectedValue(final boolean simplified) { { put("updatedFields", getUpdatedField(simplified)); put("removedFields", singletonList("legacyUUID")); + put("truncatedArrays", getTruncatedArrays()); + put("disambiguatedPaths", getDisambiguatedPaths(simplified)); } }); put("clusterTime", "{\"$timestamp\": {\"t\": 123456789, \"i\": 42}}"); @@ -378,6 +382,26 @@ static String getUpdatedField(final boolean simplified) { : "{\"myString\": \"some foo bla text\", \"myInt\": {\"$numberInt\": \"42\"}}"; } + static List getTruncatedArrays() { + Schema truncatedArraySchema = + SchemaBuilder.struct() + .name("truncatedArray") + .field("field", Schema.STRING_SCHEMA) + .field("newSize", Schema.INT32_SCHEMA) + .build(); + + Struct truncatedArrayStruct = + new Struct(truncatedArraySchema).put("field", "foo").put("newSize", 1); + + return Collections.singletonList(truncatedArrayStruct); + } + + static String getDisambiguatedPaths(final boolean simplified) { + return simplified + ? "{\"home.town\": [\"home.town\"], \"residences.0.0\": [\"residences\", 0, \"0\"]}" + : "{\"home.town\": [\"home.town\"], \"residences.0.0\": [\"residences\", {\"$numberInt\": \"0\"}, \"0\"]}"; + } + static String getLsidId(final boolean simplified) { return getLsidId(simplified, false); } @@ -409,7 +433,9 @@ static String generateJson(final boolean simplified) { + " \"documentKey\": %s," + " \"updateDescription\":" + " {\"updatedFields\": %s," - + " \"removedFields\": [\"legacyUUID\"]}," + + " \"removedFields\": [\"legacyUUID\"]," + + " \"truncatedArrays\": [{\"field\": \"foo\", \"newSize\": 1}]," + + " \"disambiguatedPaths\": %s}," + " \"clusterTime\": {\"$timestamp\": {\"t\": 123456789, \"i\": 42}}," + " \"txnNumber\": 987654321," + " \"lsid\": {\"id\": %s, \"uid\": %s}" @@ -418,6 +444,7 @@ static String generateJson(final boolean simplified) { getFullDocument(simplified), getDocumentKey(simplified), getUpdatedField(simplified), + getDisambiguatedPaths(simplified), getLsidId(simplified, true), getLsidUid(simplified, true)); }