Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-165: Add change.stream.show.expanded.events property #172

Merged
merged 8 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ final class OperationHelper {
private static final String UPDATE_DESCRIPTION = "updateDescription";
private static final String UPDATED_FIELDS = "updatedFields";
private static final String REMOVED_FIELDS = "removedFields";
private static final String TRUNCATED_ARRAYS = "truncatedArrays";
private static final String DISAMBIGUATED_PATHS = "disambiguatedPaths";
private static final Set<String> UPDATE_DESCRIPTION_FIELDS =
new HashSet<>(asList(UPDATED_FIELDS, REMOVED_FIELDS));
new HashSet<>(asList(UPDATED_FIELDS, REMOVED_FIELDS, TRUNCATED_ARRAYS, DISAMBIGUATED_PATHS));

private static final String SET = "$set";
private static final String UNSET = "$unset";
Expand Down Expand Up @@ -125,14 +127,34 @@ static BsonDocument getUpdateDocument(final BsonDocument changeStreamDocument) {
REMOVED_FIELDS, updateDescription.get(REMOVED_FIELDS), updateDescription.toJson()));
}

if (updateDescription.containsKey(TRUNCATED_ARRAYS)
&& !updateDescription.get(TRUNCATED_ARRAYS).isArray()) {
throw new DataException(
format(
"Unexpected %s field type, expected an array but found `%s`: %s",
TRUNCATED_ARRAYS,
updateDescription.get(TRUNCATED_ARRAYS),
updateDescription.toJson()));
}

if (updateDescription.containsKey(DISAMBIGUATED_PATHS)
&& !updateDescription.get(DISAMBIGUATED_PATHS).isDocument()) {
throw new DataException(
format(
"Unexpected %s field type, expected an array but found `%s`: %s",
DISAMBIGUATED_PATHS,
updateDescription.get(DISAMBIGUATED_PATHS),
updateDescription.toJson()));
}

BsonDocument updatedFields = updateDescription.getDocument(UPDATED_FIELDS);
BsonArray removedFields = updateDescription.getArray(REMOVED_FIELDS);
BsonDocument unsetDocument = new BsonDocument();
for (final BsonValue removedField : removedFields) {
if (!removedField.isString()) {
throw new DataException(
format(
"Unexpected value type in %s, expected an string but found `%s`: %s",
"Unexpected value type in %s, expected a string but found `%s`: %s",
REMOVED_FIELDS, removedField, updateDescription.toJson()));
}
unsetDocument.append(removedField.asString().getValue(), EMPTY_STRING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,21 @@ public class MongoSourceConfig extends AbstractConfig {
+ "See https://www.mongodb.com/docs/manual/reference/method/db.collection.watch/ for more details and possible values.";
private static final String FULL_DOCUMENT_DEFAULT = EMPTY_STRING;

public static final String SHOW_EXPANDED_EVENTS_CONFIG = "change.stream.show.expanded.events";
private static final String SHOW_EXPANDED_EVENTS_DISPLAY =
"The `showExpandedEvents` configuration.";
private static final String SHOW_EXPANDED_EVENTS_DOC =
"Determines if change streams notifies for DDL events, like the createIndexes and dropIndexes events.\n"
+ "New in version 6.0.\n"
+ "See https://www.mongodb.com/docs/manual/reference/change-events/#std-label-change-streams-expanded-events for more "
+ "details on showExpandedEvents.\n"
+ "This setting is also required in order to show disambiguatedPaths within the updateDescription of an update event, "
Calvinnix marked this conversation as resolved.
Show resolved Hide resolved
+ "this field is used to help provide clarification when a change involves ambiguous fields.\n"
+ "New in version 6.1.\n"
+ "See https://www.mongodb.com/docs/manual/reference/change-events/update/#path-disambiguation for more details on "
+ "disambiguatedPaths.";
private static final boolean SHOW_EXPANDED_EVENTS_DEFAULT = false;

public static final String COLLATION_CONFIG = "collation";
private static final String COLLATION_DISPLAY = "The collation options";
private static final String COLLATION_DOC =
Expand Down Expand Up @@ -803,6 +818,10 @@ Optional<FullDocument> getFullDocument() {
}
}

boolean getShowExpandedEvents() {
return getBoolean(SHOW_EXPANDED_EVENTS_CONFIG);
}

StartupConfig getStartupConfig() {
StartupConfig result = startupConfig;
if (result != null) {
Expand Down Expand Up @@ -1092,6 +1111,17 @@ public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
FULL_DOCUMENT_DISPLAY,
Validators.EnumValidatorAndRecommender.in(FullDocument.values(), FullDocument::getValue));

configDef.define(
SHOW_EXPANDED_EVENTS_CONFIG,
Type.BOOLEAN,
SHOW_EXPANDED_EVENTS_DEFAULT,
Importance.MEDIUM,
SHOW_EXPANDED_EVENTS_DOC,
group,
++orderInGroup,
Width.MEDIUM,
SHOW_EXPANDED_EVENTS_DISPLAY);

configDef.define(
COLLATION_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ private static ChangeStreamIterable<Document> getChangeStreamIterable(
if (batchSize > 0) {
changeStream.batchSize(batchSize);
}
changeStream.showExpandedEvents(sourceConfig.getShowExpandedEvents());
arahmanan marked this conversation as resolved.
Show resolved Hide resolved
sourceConfig.getFullDocumentBeforeChange().ifPresent(changeStream::fullDocumentBeforeChange);
sourceConfig.getFullDocument().ifPresent(changeStream::fullDocument);
sourceConfig.getCollation().ifPresent(changeStream::collation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ public final class AvroSchemaDefaults {
+ " \"type\": [{\"name\": \"updateDescription\", \"type\": \"record\", \"fields\": ["
+ " {\"name\": \"updatedFields\", \"type\": [\"string\", \"null\"]},"
+ " {\"name\": \"removedFields\","
+ " \"type\": [{\"type\": \"array\", \"items\": \"string\"}, \"null\"]"
+ " }] }, \"null\"] },"
+ " \"type\": [{\"type\": \"array\", \"items\": \"string\"}, \"null\"]},"
+ " {\"name\": \"truncatedArrays\", \"type\": [{ \"type\":\"array\","
+ " \"items\": \"string\" }, \"null\"]},"
+ " {\"name\": \"disambiguatedPaths\", \"type\": [\"string\", \"null\"]}"
arahmanan marked this conversation as resolved.
Show resolved Hide resolved
+ " ]}, \"null\"] },"
+ " { \"name\": \"clusterTime\", \"type\": [\"string\", \"null\"] },"
+ " { \"name\": \"txnNumber\", \"type\": [\"long\", \"null\"]},"
+ " { \"name\": \"lsid\", \"type\": [{\"name\": \"lsid\", \"type\": \"record\","
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ class UpdateTest {
+ " updatedFields: {"
+ " email: 'alice@10gen.com'"
+ " },"
+ " removedFields: ['phoneNumber']"
+ " removedFields: ['phoneNumber'],"
+ " truncatedArrays: [{ field: 'foo', newSize: 1 }],"
+ " disambiguatedPaths: {"
+ " 'home.town': [ 'home.town' ],"
+ " 'residences.0.0': [ 'residences', 0, '0' ]"
+ " }"
+ " },"
+ " fullDocument: {"
+ " _id: ObjectId(\"58a4eb4a30c75625e00d2820\"),"
Expand Down Expand Up @@ -148,7 +153,7 @@ void testMissingChangeEventData() {
new SinkDocument(
null,
BsonDocument.parse(
"{documentKey: {}, updateDescription: {updatedFields: 1}}")))),
"{documentKey: {}, updateDescription: {updatedFields: 1, removedFields: []}}")))),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[q] what's the reason for changing this test?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you were to adjust updatedFields to a valid value, the test would still fail because removedFields is a required field. This more closely matches the expected behavior of validating updatedFields instead of failing due to another reason.

() ->
assertThrows(
DataException.class,
Expand All @@ -157,7 +162,25 @@ void testMissingChangeEventData() {
new SinkDocument(
null,
BsonDocument.parse(
"{documentKey: {}, updateDescription: {removedFields: 1}}")))),
"{documentKey: {}, updateDescription: {updatedFields: {}, removedFields: 1}}")))),
() ->
assertThrows(
DataException.class,
() ->
UPDATE.perform(
new SinkDocument(
null,
BsonDocument.parse(
"{documentKey: {}, updateDescription: {updatedFields: {}, removedFields: [], truncatedArrays: 1}}")))),
() ->
assertThrows(
DataException.class,
() ->
UPDATE.perform(
new SinkDocument(
null,
BsonDocument.parse(
"{documentKey: {}, updateDescription: {updatedFields: {}, removedFields: [], disambiguatedPaths: 1}}")))),
() ->
assertThrows(
DataException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ static Struct generateExpectedValue(final boolean simplified) {
{
put("updatedFields", getUpdatedField(simplified));
put("removedFields", singletonList("legacyUUID"));
put("truncatedArrays", singletonList(getTruncatedArrays(simplified)));
put("disambiguatedPaths", "{\"home.town\": \"New York City\"}");
}
});
put("clusterTime", "{\"$timestamp\": {\"t\": 123456789, \"i\": 42}}");
Expand Down Expand Up @@ -378,6 +380,12 @@ static String getUpdatedField(final boolean simplified) {
: "{\"myString\": \"some foo bla text\", \"myInt\": {\"$numberInt\": \"42\"}}";
}

static String getTruncatedArrays(final boolean simplified) {
return simplified
? "{\"field\": \"foo\", \"newSize\": 1}"
: "{\"field\": \"foo\", \"newSize\": {\"$numberInt\": \"1\"}}";
}

static String getLsidId(final boolean simplified) {
return getLsidId(simplified, false);
}
Expand Down Expand Up @@ -409,7 +417,9 @@ static String generateJson(final boolean simplified) {
+ " \"documentKey\": %s,"
+ " \"updateDescription\":"
+ " {\"updatedFields\": %s,"
+ " \"removedFields\": [\"legacyUUID\"]},"
+ " \"removedFields\": [\"legacyUUID\"],"
+ " \"truncatedArrays\": [%s],"
+ " \"disambiguatedPaths\": {\"home.town\": \"New York City\"}},"
Calvinnix marked this conversation as resolved.
Show resolved Hide resolved
+ " \"clusterTime\": {\"$timestamp\": {\"t\": 123456789, \"i\": 42}},"
+ " \"txnNumber\": 987654321,"
+ " \"lsid\": {\"id\": %s, \"uid\": %s}"
Expand All @@ -418,6 +428,7 @@ static String generateJson(final boolean simplified) {
getFullDocument(simplified),
getDocumentKey(simplified),
getUpdatedField(simplified),
getTruncatedArrays(simplified),
getLsidId(simplified, true),
getLsidUid(simplified, true));
}
Expand Down