Skip to content

Commit

Permalink
KAFKA-165: Add change.stream.show.expanded.events property (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
Calvinnix authored Jan 17, 2025
1 parent 2855ca9 commit c623ec0
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public boolean isReplicaSetOrSharded() {
private static final int FOUR_DOT_TWO_WIRE_VERSION = 8;
public static final int FOUR_DOT_FOUR_WIRE_VERSION = 9;
private static final int SIX_DOT_ZERO_WIRE_VERSION = 17;
private static final int SEVEN_DOT_ZERO_WIRE_VERSION = 21;

public boolean isGreaterThanThreeDotSix() {
return getMaxWireVersion() > THREE_DOT_SIX_WIRE_VERSION;
Expand All @@ -135,6 +136,10 @@ public boolean isAtLeastSixDotZero() {
return getMaxWireVersion() >= SIX_DOT_ZERO_WIRE_VERSION;
}

public boolean isAtLeastSevenDotZero() {
return getMaxWireVersion() >= SEVEN_DOT_ZERO_WIRE_VERSION;
}

public int getMaxWireVersion() {
Document isMaster =
MONGODB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -1053,6 +1054,144 @@ void testFullDocumentBeforeChange() {
}
}

@Test
@DisplayName("Ensure disambiguatedPaths exist when showExpandedEvents is true")
void testDisambiguatedPathsExistWhenShowExpandedEventsIsTrue() {
assumeTrue(isAtLeastSevenDotZero());
MongoDatabase db = getDatabaseWithPostfix();
try (AutoCloseableSourceTask task = createSourceTask()) {
MongoCollection<Document> coll = db.getCollection("coll");
coll.drop();
db.createCollection(coll.getNamespace().getCollectionName(), new CreateCollectionOptions());
HashMap<String, String> cfg = new HashMap<>();
cfg.put(
MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG,
OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT));
cfg.put(MongoSourceConfig.SHOW_EXPANDED_EVENTS_CONFIG, "true");
task.start(cfg);
int id = 0;
Document expected = new Document("_id", id);
coll.insertOne(expected);
coll.updateOne(Filters.eq(id), Document.parse("{ $set: { foo: 1 } }"));
coll.deleteOne(Filters.eq(id));
List<SourceRecord> records = getNextResults(task);
assertEquals(3, records.size());
Struct update = (Struct) records.get(1).value();
assertEquals(OperationType.UPDATE.getValue(), update.getString("operationType"));
Struct updateDescription = (Struct) update.get("updateDescription");
assertEquals("{}", updateDescription.getString("disambiguatedPaths"));
} finally {
db.drop();
}
}

@Test
@DisplayName("Ensure disambiguatedPaths don't exist when showExpandedEvents is false")
void testDisambiguatedPathsDontExistWhenShowExpandedEventsIsTrue() {
assumeTrue(isAtLeastSevenDotZero());
MongoDatabase db = getDatabaseWithPostfix();
try (AutoCloseableSourceTask task = createSourceTask()) {
MongoCollection<Document> coll = db.getCollection("coll");
coll.drop();
db.createCollection(coll.getNamespace().getCollectionName(), new CreateCollectionOptions());
HashMap<String, String> cfg = new HashMap<>();
cfg.put(
MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG,
OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT));
cfg.put(MongoSourceConfig.SHOW_EXPANDED_EVENTS_CONFIG, "false");
task.start(cfg);
int id = 0;
Document expected = new Document("_id", id);
coll.insertOne(expected);
coll.updateOne(Filters.eq(id), Document.parse("{ $set: { foo: 1 } }"));
coll.deleteOne(Filters.eq(id));
List<SourceRecord> records = getNextResults(task);
assertEquals(3, records.size());
Struct update = (Struct) records.get(1).value();
assertEquals(OperationType.UPDATE.getValue(), update.getString("operationType"));
Struct updateDescription = (Struct) update.get("updateDescription");
assertNull(updateDescription.getString("disambiguatedPaths"));
} finally {
db.drop();
}
}

@Test
@DisplayName("Ensure disambiguatedPaths don't exist by default")
void testDisambiguatedPathsDontExistByDefault() {
assumeTrue(isAtLeastSevenDotZero());
MongoDatabase db = getDatabaseWithPostfix();
try (AutoCloseableSourceTask task = createSourceTask()) {
MongoCollection<Document> coll = db.getCollection("coll");
coll.drop();
db.createCollection(coll.getNamespace().getCollectionName(), new CreateCollectionOptions());
HashMap<String, String> cfg = new HashMap<>();
cfg.put(
MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG,
OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT));
task.start(cfg);
int id = 0;
Document expected = new Document("_id", id);
coll.insertOne(expected);
coll.updateOne(Filters.eq(id), Document.parse("{ $set: { foo: 1 } }"));
coll.deleteOne(Filters.eq(id));
List<SourceRecord> records = getNextResults(task);
assertEquals(3, records.size());
Struct update = (Struct) records.get(1).value();
assertEquals(OperationType.UPDATE.getValue(), update.getString("operationType"));
Struct updateDescription = (Struct) update.get("updateDescription");
assertNull(updateDescription.getString("disambiguatedPaths"));
} finally {
db.drop();
}
}

@Test
@DisplayName("Ensure truncatedArrays works")
void testTruncatedArrays() {
assumeTrue(isAtLeastSixDotZero());
MongoDatabase db = getDatabaseWithPostfix();
try (AutoCloseableSourceTask task = createSourceTask()) {
MongoCollection<Document> coll = db.getCollection("coll");
coll.drop();
db.createCollection(coll.getNamespace().getCollectionName(), new CreateCollectionOptions());
HashMap<String, String> cfg = new HashMap<>();
cfg.put(
MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG,
OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT));
task.start(cfg);
int id = 0;
Document expected =
new Document("_id", id)
.append("items", Arrays.asList(2, 30, 5, 10, 11, 100, 200, 250, 300, 5, 600));
coll.insertOne(expected);
coll.updateOne(
Filters.eq(id),
singletonList(Document.parse("{ $set: { items: [2,30,5,10,11,100,200,250,300,5] } }")));
coll.deleteOne(Filters.eq(id));
List<SourceRecord> records = getNextResults(task);
assertEquals(3, records.size());
Struct update = (Struct) records.get(1).value();
assertEquals(OperationType.UPDATE.getValue(), update.getString("operationType"));
Struct updateDescription = (Struct) update.get("updateDescription");

Schema schema =
SchemaBuilder.struct()
.name("truncatedArray")
.field("field", Schema.STRING_SCHEMA)
.field("newSize", Schema.INT32_SCHEMA)
.build();

Struct truncatedArrayStruct = new Struct(schema).put("field", "items").put("newSize", 10);

List<Struct> expectedTruncatedArray = new ArrayList<>();
expectedTruncatedArray.add(truncatedArrayStruct);
assertEquals(expectedTruncatedArray, updateDescription.getArray("truncatedArrays"));
} finally {
db.drop();
}
}

/**
* We insert a document into a collection before starting the {@link MongoSourceTask}, yet we
* observe the change due to specifying {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ final class OperationHelper {
private static final String UPDATE_DESCRIPTION = "updateDescription";
private static final String UPDATED_FIELDS = "updatedFields";
private static final String REMOVED_FIELDS = "removedFields";
private static final String TRUNCATED_ARRAYS = "truncatedArrays";
private static final String DISAMBIGUATED_PATHS = "disambiguatedPaths";
private static final Set<String> UPDATE_DESCRIPTION_FIELDS =
new HashSet<>(asList(UPDATED_FIELDS, REMOVED_FIELDS));
new HashSet<>(asList(UPDATED_FIELDS, REMOVED_FIELDS, TRUNCATED_ARRAYS, DISAMBIGUATED_PATHS));

private static final String SET = "$set";
private static final String UNSET = "$unset";
Expand Down Expand Up @@ -125,14 +127,34 @@ static BsonDocument getUpdateDocument(final BsonDocument changeStreamDocument) {
REMOVED_FIELDS, updateDescription.get(REMOVED_FIELDS), updateDescription.toJson()));
}

if (updateDescription.containsKey(TRUNCATED_ARRAYS)
&& !updateDescription.get(TRUNCATED_ARRAYS).isArray()) {
throw new DataException(
format(
"Unexpected %s field type, expected an array but found `%s`: %s",
TRUNCATED_ARRAYS,
updateDescription.get(TRUNCATED_ARRAYS),
updateDescription.toJson()));
}

if (updateDescription.containsKey(DISAMBIGUATED_PATHS)
&& !updateDescription.get(DISAMBIGUATED_PATHS).isDocument()) {
throw new DataException(
format(
"Unexpected %s field type, expected an array but found `%s`: %s",
DISAMBIGUATED_PATHS,
updateDescription.get(DISAMBIGUATED_PATHS),
updateDescription.toJson()));
}

BsonDocument updatedFields = updateDescription.getDocument(UPDATED_FIELDS);
BsonArray removedFields = updateDescription.getArray(REMOVED_FIELDS);
BsonDocument unsetDocument = new BsonDocument();
for (final BsonValue removedField : removedFields) {
if (!removedField.isString()) {
throw new DataException(
format(
"Unexpected value type in %s, expected an string but found `%s`: %s",
"Unexpected value type in %s, expected a string but found `%s`: %s",
REMOVED_FIELDS, removedField, updateDescription.toJson()));
}
unsetDocument.append(removedField.asString().getValue(), EMPTY_STRING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,21 @@ public class MongoSourceConfig extends AbstractConfig {
+ "See https://www.mongodb.com/docs/manual/reference/method/db.collection.watch/ for more details and possible values.";
private static final String FULL_DOCUMENT_DEFAULT = EMPTY_STRING;

public static final String SHOW_EXPANDED_EVENTS_CONFIG = "change.stream.show.expanded.events";
private static final String SHOW_EXPANDED_EVENTS_DISPLAY =
"The `showExpandedEvents` configuration.";
private static final String SHOW_EXPANDED_EVENTS_DOC =
"Determines if change streams notifies for DDL events, like the createIndexes and dropIndexes events.\n"
+ "New in version 6.0.\n"
+ "See https://www.mongodb.com/docs/manual/reference/change-events/#std-label-change-streams-expanded-events for more "
+ "details on showExpandedEvents.\n"
+ "This setting is required to show updateDescription.disambiguatedPaths in update events, "
+ "helping clarify changes that involve ambiguous fields.\n"
+ "New in version 6.1.\n"
+ "See https://www.mongodb.com/docs/manual/reference/change-events/update/#path-disambiguation for more details on "
+ "disambiguatedPaths.";
private static final boolean SHOW_EXPANDED_EVENTS_DEFAULT = false;

public static final String COLLATION_CONFIG = "collation";
private static final String COLLATION_DISPLAY = "The collation options";
private static final String COLLATION_DOC =
Expand Down Expand Up @@ -803,6 +818,10 @@ Optional<FullDocument> getFullDocument() {
}
}

boolean getShowExpandedEvents() {
return getBoolean(SHOW_EXPANDED_EVENTS_CONFIG);
}

StartupConfig getStartupConfig() {
StartupConfig result = startupConfig;
if (result != null) {
Expand Down Expand Up @@ -1092,6 +1111,17 @@ public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
FULL_DOCUMENT_DISPLAY,
Validators.EnumValidatorAndRecommender.in(FullDocument.values(), FullDocument::getValue));

configDef.define(
SHOW_EXPANDED_EVENTS_CONFIG,
Type.BOOLEAN,
SHOW_EXPANDED_EVENTS_DEFAULT,
Importance.MEDIUM,
SHOW_EXPANDED_EVENTS_DOC,
group,
++orderInGroup,
Width.MEDIUM,
SHOW_EXPANDED_EVENTS_DISPLAY);

configDef.define(
COLLATION_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ private static ChangeStreamIterable<Document> getChangeStreamIterable(
if (batchSize > 0) {
changeStream.batchSize(batchSize);
}
changeStream.showExpandedEvents(sourceConfig.getShowExpandedEvents());
sourceConfig.getFullDocumentBeforeChange().ifPresent(changeStream::fullDocumentBeforeChange);
sourceConfig.getFullDocument().ifPresent(changeStream::fullDocument);
sourceConfig.getCollation().ifPresent(changeStream::collation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,15 @@ public final class AvroSchemaDefaults {
+ " \"type\": [{\"name\": \"updateDescription\", \"type\": \"record\", \"fields\": ["
+ " {\"name\": \"updatedFields\", \"type\": [\"string\", \"null\"]},"
+ " {\"name\": \"removedFields\","
+ " \"type\": [{\"type\": \"array\", \"items\": \"string\"}, \"null\"]"
+ " }] }, \"null\"] },"
+ " \"type\": [{\"type\": \"array\", \"items\": \"string\"}, \"null\"]},"
+ " {\"name\": \"truncatedArrays\","
+ " \"type\": [{ \"type\":\"array\", \"items\": {\"type\": \"record\","
+ " \"name\": \"truncatedArray\", \"fields\": ["
+ " {\"name\": \"field\", \"type\": \"string\"},"
+ " {\"name\": \"newSize\", \"type\": \"int\"} ] }"
+ " }, \"null\" ] },"
+ " {\"name\": \"disambiguatedPaths\", \"type\": [\"string\", \"null\"]}"
+ " ]}, \"null\"] },"
+ " { \"name\": \"clusterTime\", \"type\": [\"string\", \"null\"] },"
+ " { \"name\": \"txnNumber\", \"type\": [\"long\", \"null\"]},"
+ " { \"name\": \"lsid\", \"type\": [{\"name\": \"lsid\", \"type\": \"record\","
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ class UpdateTest {
+ " updatedFields: {"
+ " email: 'alice@10gen.com'"
+ " },"
+ " removedFields: ['phoneNumber']"
+ " removedFields: ['phoneNumber'],"
+ " truncatedArrays: [{ field: 'foo', newSize: 1 }],"
+ " disambiguatedPaths: {"
+ " 'home.town': [ 'home.town' ],"
+ " 'residences.0.0': [ 'residences', 0, '0' ]"
+ " }"
+ " },"
+ " fullDocument: {"
+ " _id: ObjectId(\"58a4eb4a30c75625e00d2820\"),"
Expand Down Expand Up @@ -148,7 +153,7 @@ void testMissingChangeEventData() {
new SinkDocument(
null,
BsonDocument.parse(
"{documentKey: {}, updateDescription: {updatedFields: 1}}")))),
"{documentKey: {}, updateDescription: {updatedFields: 1, removedFields: []}}")))),
() ->
assertThrows(
DataException.class,
Expand All @@ -157,7 +162,25 @@ void testMissingChangeEventData() {
new SinkDocument(
null,
BsonDocument.parse(
"{documentKey: {}, updateDescription: {removedFields: 1}}")))),
"{documentKey: {}, updateDescription: {updatedFields: {}, removedFields: 1}}")))),
() ->
assertThrows(
DataException.class,
() ->
UPDATE.perform(
new SinkDocument(
null,
BsonDocument.parse(
"{documentKey: {}, updateDescription: {updatedFields: {}, removedFields: [], truncatedArrays: 1}}")))),
() ->
assertThrows(
DataException.class,
() ->
UPDATE.perform(
new SinkDocument(
null,
BsonDocument.parse(
"{documentKey: {}, updateDescription: {updatedFields: {}, removedFields: [], disambiguatedPaths: 1}}")))),
() ->
assertThrows(
DataException.class,
Expand Down
Loading

0 comments on commit c623ec0

Please sign in to comment.