Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-165: Add change.stream.show.expanded.events property #172

Merged
merged 8 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public boolean isReplicaSetOrSharded() {
private static final int FOUR_DOT_TWO_WIRE_VERSION = 8;
public static final int FOUR_DOT_FOUR_WIRE_VERSION = 9;
private static final int SIX_DOT_ZERO_WIRE_VERSION = 17;
private static final int SEVEN_DOT_ZERO_WIRE_VERSION = 21;

public boolean isGreaterThanThreeDotSix() {
return getMaxWireVersion() > THREE_DOT_SIX_WIRE_VERSION;
Expand All @@ -135,6 +136,10 @@ public boolean isAtLeastSixDotZero() {
return getMaxWireVersion() >= SIX_DOT_ZERO_WIRE_VERSION;
}

public boolean isAtLeastSevenDotZero() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[q] thoughts on adding integration tests against different major versions? It looks like we're missing 6 and 7.
This can be done as part of a separate ticket.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a good idea, I added this ticket to the backlog.

return getMaxWireVersion() >= SEVEN_DOT_ZERO_WIRE_VERSION;
}

public int getMaxWireVersion() {
Document isMaster =
MONGODB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -1053,6 +1054,144 @@ void testFullDocumentBeforeChange() {
}
}

@Test
@DisplayName("Ensure disambiguatedPaths exist when showExpandedEvents is true")
void testDisambiguatedPathsExistWhenShowExpandedEventsIsTrue() {
assumeTrue(isAtLeastSevenDotZero());
MongoDatabase db = getDatabaseWithPostfix();
try (AutoCloseableSourceTask task = createSourceTask()) {
MongoCollection<Document> coll = db.getCollection("coll");
coll.drop();
db.createCollection(coll.getNamespace().getCollectionName(), new CreateCollectionOptions());
HashMap<String, String> cfg = new HashMap<>();
cfg.put(
MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG,
OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT));
cfg.put(MongoSourceConfig.SHOW_EXPANDED_EVENTS_CONFIG, "true");
task.start(cfg);
int id = 0;
Document expected = new Document("_id", id);
coll.insertOne(expected);
coll.updateOne(Filters.eq(id), Document.parse("{ $set: { foo: 1 } }"));
coll.deleteOne(Filters.eq(id));
List<SourceRecord> records = getNextResults(task);
assertEquals(3, records.size());
Struct update = (Struct) records.get(1).value();
assertEquals(OperationType.UPDATE.getValue(), update.getString("operationType"));
Struct updateDescription = (Struct) update.get("updateDescription");
assertEquals("{}", updateDescription.getString("disambiguatedPaths"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might have already mentioned this offline, but why aren't we checking that it contains the expected fields similar to this example?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this is because I wasn't able to produce disambiguatedPaths results because trying to update a field with a . just updates the nested structure instead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to take another look at this to make sure I wasn't missing something obvious 😄

} finally {
db.drop();
}
}

@Test
@DisplayName("Ensure disambiguatedPaths don't exist when showExpandedEvents is false")
void testDisambiguatedPathsDontExistWhenShowExpandedEventsIsTrue() {
assumeTrue(isAtLeastSevenDotZero());
MongoDatabase db = getDatabaseWithPostfix();
try (AutoCloseableSourceTask task = createSourceTask()) {
MongoCollection<Document> coll = db.getCollection("coll");
coll.drop();
db.createCollection(coll.getNamespace().getCollectionName(), new CreateCollectionOptions());
HashMap<String, String> cfg = new HashMap<>();
cfg.put(
MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG,
OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT));
cfg.put(MongoSourceConfig.SHOW_EXPANDED_EVENTS_CONFIG, "false");
task.start(cfg);
int id = 0;
Document expected = new Document("_id", id);
coll.insertOne(expected);
coll.updateOne(Filters.eq(id), Document.parse("{ $set: { foo: 1 } }"));
coll.deleteOne(Filters.eq(id));
List<SourceRecord> records = getNextResults(task);
assertEquals(3, records.size());
Struct update = (Struct) records.get(1).value();
assertEquals(OperationType.UPDATE.getValue(), update.getString("operationType"));
Struct updateDescription = (Struct) update.get("updateDescription");
assertNull(updateDescription.getString("disambiguatedPaths"));
} finally {
db.drop();
}
}

@Test
@DisplayName("Ensure disambiguatedPaths don't exist by default")
void testDisambiguatedPathsDontExistByDefault() {
assumeTrue(isAtLeastSevenDotZero());
MongoDatabase db = getDatabaseWithPostfix();
try (AutoCloseableSourceTask task = createSourceTask()) {
MongoCollection<Document> coll = db.getCollection("coll");
coll.drop();
db.createCollection(coll.getNamespace().getCollectionName(), new CreateCollectionOptions());
HashMap<String, String> cfg = new HashMap<>();
cfg.put(
MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG,
OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT));
task.start(cfg);
int id = 0;
Document expected = new Document("_id", id);
coll.insertOne(expected);
coll.updateOne(Filters.eq(id), Document.parse("{ $set: { foo: 1 } }"));
coll.deleteOne(Filters.eq(id));
List<SourceRecord> records = getNextResults(task);
assertEquals(3, records.size());
Struct update = (Struct) records.get(1).value();
assertEquals(OperationType.UPDATE.getValue(), update.getString("operationType"));
Struct updateDescription = (Struct) update.get("updateDescription");
assertNull(updateDescription.getString("disambiguatedPaths"));
} finally {
db.drop();
}
}

@Test
@DisplayName("Ensure truncatedArrays works")
void testTruncatedArrays() {
assumeTrue(isAtLeastSixDotZero());
MongoDatabase db = getDatabaseWithPostfix();
try (AutoCloseableSourceTask task = createSourceTask()) {
MongoCollection<Document> coll = db.getCollection("coll");
coll.drop();
db.createCollection(coll.getNamespace().getCollectionName(), new CreateCollectionOptions());
HashMap<String, String> cfg = new HashMap<>();
cfg.put(
MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG,
OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT));
task.start(cfg);
int id = 0;
Document expected =
new Document("_id", id)
.append("items", Arrays.asList(2, 30, 5, 10, 11, 100, 200, 250, 300, 5, 600));
coll.insertOne(expected);
coll.updateOne(
Filters.eq(id),
singletonList(Document.parse("{ $set: { items: [2,30,5,10,11,100,200,250,300,5] } }")));
coll.deleteOne(Filters.eq(id));
List<SourceRecord> records = getNextResults(task);
assertEquals(3, records.size());
Struct update = (Struct) records.get(1).value();
assertEquals(OperationType.UPDATE.getValue(), update.getString("operationType"));
Struct updateDescription = (Struct) update.get("updateDescription");

Schema schema =
SchemaBuilder.struct()
.name("truncatedArray")
.field("field", Schema.STRING_SCHEMA)
.field("newSize", Schema.INT32_SCHEMA)
.build();

Struct truncatedArrayStruct = new Struct(schema).put("field", "items").put("newSize", 10);

List<Struct> expectedTruncatedArray = new ArrayList<>();
expectedTruncatedArray.add(truncatedArrayStruct);
assertEquals(expectedTruncatedArray, updateDescription.getArray("truncatedArrays"));
} finally {
db.drop();
}
}

/**
* We insert a document into a collection before starting the {@link MongoSourceTask}, yet we
* observe the change due to specifying {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ final class OperationHelper {
private static final String UPDATE_DESCRIPTION = "updateDescription";
private static final String UPDATED_FIELDS = "updatedFields";
private static final String REMOVED_FIELDS = "removedFields";
private static final String TRUNCATED_ARRAYS = "truncatedArrays";
private static final String DISAMBIGUATED_PATHS = "disambiguatedPaths";
private static final Set<String> UPDATE_DESCRIPTION_FIELDS =
new HashSet<>(asList(UPDATED_FIELDS, REMOVED_FIELDS));
new HashSet<>(asList(UPDATED_FIELDS, REMOVED_FIELDS, TRUNCATED_ARRAYS, DISAMBIGUATED_PATHS));

private static final String SET = "$set";
private static final String UNSET = "$unset";
Expand Down Expand Up @@ -125,14 +127,34 @@ static BsonDocument getUpdateDocument(final BsonDocument changeStreamDocument) {
REMOVED_FIELDS, updateDescription.get(REMOVED_FIELDS), updateDescription.toJson()));
}

if (updateDescription.containsKey(TRUNCATED_ARRAYS)
&& !updateDescription.get(TRUNCATED_ARRAYS).isArray()) {
throw new DataException(
format(
"Unexpected %s field type, expected an array but found `%s`: %s",
TRUNCATED_ARRAYS,
updateDescription.get(TRUNCATED_ARRAYS),
updateDescription.toJson()));
}

if (updateDescription.containsKey(DISAMBIGUATED_PATHS)
&& !updateDescription.get(DISAMBIGUATED_PATHS).isDocument()) {
throw new DataException(
format(
"Unexpected %s field type, expected an array but found `%s`: %s",
DISAMBIGUATED_PATHS,
updateDescription.get(DISAMBIGUATED_PATHS),
updateDescription.toJson()));
}

BsonDocument updatedFields = updateDescription.getDocument(UPDATED_FIELDS);
BsonArray removedFields = updateDescription.getArray(REMOVED_FIELDS);
BsonDocument unsetDocument = new BsonDocument();
for (final BsonValue removedField : removedFields) {
if (!removedField.isString()) {
throw new DataException(
format(
"Unexpected value type in %s, expected an string but found `%s`: %s",
"Unexpected value type in %s, expected a string but found `%s`: %s",
REMOVED_FIELDS, removedField, updateDescription.toJson()));
}
unsetDocument.append(removedField.asString().getValue(), EMPTY_STRING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,21 @@ public class MongoSourceConfig extends AbstractConfig {
+ "See https://www.mongodb.com/docs/manual/reference/method/db.collection.watch/ for more details and possible values.";
private static final String FULL_DOCUMENT_DEFAULT = EMPTY_STRING;

public static final String SHOW_EXPANDED_EVENTS_CONFIG = "change.stream.show.expanded.events";
private static final String SHOW_EXPANDED_EVENTS_DISPLAY =
"The `showExpandedEvents` configuration.";
private static final String SHOW_EXPANDED_EVENTS_DOC =
"Determines if change streams notifies for DDL events, like the createIndexes and dropIndexes events.\n"
+ "New in version 6.0.\n"
+ "See https://www.mongodb.com/docs/manual/reference/change-events/#std-label-change-streams-expanded-events for more "
+ "details on showExpandedEvents.\n"
+ "This setting is required to show updateDescription.disambiguatedPaths in update events, "
+ "helping clarify changes that involve ambiguous fields.\n"
+ "New in version 6.1.\n"
+ "See https://www.mongodb.com/docs/manual/reference/change-events/update/#path-disambiguation for more details on "
+ "disambiguatedPaths.";
private static final boolean SHOW_EXPANDED_EVENTS_DEFAULT = false;

public static final String COLLATION_CONFIG = "collation";
private static final String COLLATION_DISPLAY = "The collation options";
private static final String COLLATION_DOC =
Expand Down Expand Up @@ -803,6 +818,10 @@ Optional<FullDocument> getFullDocument() {
}
}

boolean getShowExpandedEvents() {
return getBoolean(SHOW_EXPANDED_EVENTS_CONFIG);
}

StartupConfig getStartupConfig() {
StartupConfig result = startupConfig;
if (result != null) {
Expand Down Expand Up @@ -1092,6 +1111,17 @@ public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
FULL_DOCUMENT_DISPLAY,
Validators.EnumValidatorAndRecommender.in(FullDocument.values(), FullDocument::getValue));

configDef.define(
SHOW_EXPANDED_EVENTS_CONFIG,
Type.BOOLEAN,
SHOW_EXPANDED_EVENTS_DEFAULT,
Importance.MEDIUM,
SHOW_EXPANDED_EVENTS_DOC,
group,
++orderInGroup,
Width.MEDIUM,
SHOW_EXPANDED_EVENTS_DISPLAY);

configDef.define(
COLLATION_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ private static ChangeStreamIterable<Document> getChangeStreamIterable(
if (batchSize > 0) {
changeStream.batchSize(batchSize);
}
changeStream.showExpandedEvents(sourceConfig.getShowExpandedEvents());
arahmanan marked this conversation as resolved.
Show resolved Hide resolved
sourceConfig.getFullDocumentBeforeChange().ifPresent(changeStream::fullDocumentBeforeChange);
sourceConfig.getFullDocument().ifPresent(changeStream::fullDocument);
sourceConfig.getCollation().ifPresent(changeStream::collation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,15 @@ public final class AvroSchemaDefaults {
+ " \"type\": [{\"name\": \"updateDescription\", \"type\": \"record\", \"fields\": ["
+ " {\"name\": \"updatedFields\", \"type\": [\"string\", \"null\"]},"
+ " {\"name\": \"removedFields\","
+ " \"type\": [{\"type\": \"array\", \"items\": \"string\"}, \"null\"]"
+ " }] }, \"null\"] },"
+ " \"type\": [{\"type\": \"array\", \"items\": \"string\"}, \"null\"]},"
+ " {\"name\": \"truncatedArrays\","
+ " \"type\": [{ \"type\":\"array\", \"items\": {\"type\": \"record\","
+ " \"name\": \"truncatedArray\", \"fields\": ["
+ " {\"name\": \"field\", \"type\": \"string\"},"
+ " {\"name\": \"newSize\", \"type\": \"int\"} ] }"
+ " }, \"null\" ] },"
+ " {\"name\": \"disambiguatedPaths\", \"type\": [\"string\", \"null\"]}"
arahmanan marked this conversation as resolved.
Show resolved Hide resolved
+ " ]}, \"null\"] },"
+ " { \"name\": \"clusterTime\", \"type\": [\"string\", \"null\"] },"
+ " { \"name\": \"txnNumber\", \"type\": [\"long\", \"null\"]},"
+ " { \"name\": \"lsid\", \"type\": [{\"name\": \"lsid\", \"type\": \"record\","
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ class UpdateTest {
+ " updatedFields: {"
+ " email: 'alice@10gen.com'"
+ " },"
+ " removedFields: ['phoneNumber']"
+ " removedFields: ['phoneNumber'],"
+ " truncatedArrays: [{ field: 'foo', newSize: 1 }],"
+ " disambiguatedPaths: {"
+ " 'home.town': [ 'home.town' ],"
+ " 'residences.0.0': [ 'residences', 0, '0' ]"
+ " }"
+ " },"
+ " fullDocument: {"
+ " _id: ObjectId(\"58a4eb4a30c75625e00d2820\"),"
Expand Down Expand Up @@ -148,7 +153,7 @@ void testMissingChangeEventData() {
new SinkDocument(
null,
BsonDocument.parse(
"{documentKey: {}, updateDescription: {updatedFields: 1}}")))),
"{documentKey: {}, updateDescription: {updatedFields: 1, removedFields: []}}")))),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[q] what's the reason for changing this test?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you were to adjust updatedFields to a valid value, the test would still fail because removedFields is a required field. This more closely matches the expected behavior of validating updatedFields instead of failing due to another reason.

() ->
assertThrows(
DataException.class,
Expand All @@ -157,7 +162,25 @@ void testMissingChangeEventData() {
new SinkDocument(
null,
BsonDocument.parse(
"{documentKey: {}, updateDescription: {removedFields: 1}}")))),
"{documentKey: {}, updateDescription: {updatedFields: {}, removedFields: 1}}")))),
() ->
assertThrows(
DataException.class,
() ->
UPDATE.perform(
new SinkDocument(
null,
BsonDocument.parse(
"{documentKey: {}, updateDescription: {updatedFields: {}, removedFields: [], truncatedArrays: 1}}")))),
() ->
assertThrows(
DataException.class,
() ->
UPDATE.perform(
new SinkDocument(
null,
BsonDocument.parse(
"{documentKey: {}, updateDescription: {updatedFields: {}, removedFields: [], disambiguatedPaths: 1}}")))),
() ->
assertThrows(
DataException.class,
Expand Down
Loading