Skip to content

Commit

Permalink
Merge pull request #3061 from confluentinc/cherry-pick-field-oneof
Browse files Browse the repository at this point in the history
fix: nested schemas were not considered in matching the correct Json Union schema
  • Loading branch information
rayokota authored Mar 26, 2024
2 parents ffbe805 + af44d2e commit eabeec8
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public class JsonSchemaData {
});
TO_CONNECT_CONVERTERS.put(Schema.Type.STRUCT, (schema, value) -> {
if (schema.name() != null && schema.name().equals(JSON_TYPE_ONE_OF)) {
int numMatchingProperties = -1;
int numMatchingProperties = 0;
Field matchingField = null;
for (Field field : schema.fields()) {
Schema fieldSchema = field.schema();
Expand Down Expand Up @@ -255,7 +255,7 @@ private static boolean isInstanceOfSchemaTypeForSimpleSchema(Schema fieldSchema,

private static int matchStructSchema(Schema fieldSchema, JsonNode value) {
if (fieldSchema.type() != Schema.Type.STRUCT || !value.isObject()) {
return -1;
return 0;
}
Set<String> schemaFields = fieldSchema.fields()
.stream()
Expand All @@ -267,7 +267,15 @@ private static int matchStructSchema(Schema fieldSchema, JsonNode value) {
}
Set<String> intersectSet = new HashSet<>(schemaFields);
intersectSet.retainAll(objectFields);
return intersectSet.size();

int childrenMatchFactor = 0;
for (String intersectedElement: intersectSet) {
Schema childSchema = fieldSchema.field(intersectedElement).schema();
JsonNode childValue = value.get(intersectedElement);
childrenMatchFactor += matchStructSchema(childSchema, childValue);
}

return intersectSet.size() + childrenMatchFactor;
}

// Convert values in Kafka Connect form into their logical types. These logical converters are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1396,6 +1396,63 @@ public void testToConnectUnionDifferentStruct() {
checkNonObjectConversion(connectSchema, expected, schema, obj);
}

@Test
public void testToConnectUnionSecondNestedSchemas() {
StringSchema stringSchema = StringSchema.builder()
.unprocessedProperties(ImmutableMap.of("connect.index", 0))
.build();
ObjectSchema firstSchemaNested = ObjectSchema.builder()
.addPropertySchema("differentFieldNameA", stringSchema)
.build();
ObjectSchema firstSchema = ObjectSchema.builder()
.addPropertySchema("commonFieldName", firstSchemaNested)
.unprocessedProperties(ImmutableMap.of("connect.index", 0))
.build();
ObjectSchema secondSchemaNested = ObjectSchema.builder()
.addPropertySchema("differentFieldNameB", stringSchema)
.build();
ObjectSchema secondSchema = ObjectSchema.builder()
.addPropertySchema("commonFieldName", secondSchemaNested)
.unprocessedProperties(ImmutableMap.of("connect.index", 1))
.build();
CombinedSchema schema = CombinedSchema.oneOf(ImmutableList.of(firstSchema, secondSchema))
.build();

Schema field0Nested = SchemaBuilder.struct().field("differentFieldNameA", Schema.STRING_SCHEMA).build();
Schema field0 = SchemaBuilder.struct()
.field("commonFieldName", field0Nested)
.optional()
.build();
Schema field1Nested = SchemaBuilder.struct().field("differentFieldNameB", Schema.STRING_SCHEMA).build();
Schema field1 = SchemaBuilder.struct()
.field("commonFieldName", field1Nested)
.optional()
.build();
Schema connectSchema = SchemaBuilder.struct().name(JSON_TYPE_ONE_OF)
.field(JSON_TYPE_ONE_OF + ".field.0", field0)
.field(JSON_TYPE_ONE_OF + ".field.1", field1)
.build();

ObjectNode firstObj = JsonNodeFactory.instance.objectNode()
.set("differentFieldNameA", TextNode.valueOf("sample string A"));
ObjectNode secondObj = JsonNodeFactory.instance.objectNode()
.set("differentFieldNameB", TextNode.valueOf("sample string B"));
Struct firstStruct = new Struct(field0Nested).put("differentFieldNameA", "sample string A");
Struct secondStruct = new Struct(field1Nested).put("differentFieldNameB", "sample string B");

ObjectNode obj = JsonNodeFactory.instance.objectNode().
set("commonFieldName", firstObj);
Struct struct = new Struct(field0).put("commonFieldName", firstStruct);
Struct expected = new Struct(connectSchema).put(JSON_TYPE_ONE_OF + ".field.0", struct);
checkNonObjectConversion(connectSchema, expected, schema, obj);

obj = JsonNodeFactory.instance.objectNode().
set("commonFieldName", secondObj);
struct = new Struct(field1).put("commonFieldName", secondStruct);
expected = new Struct(connectSchema).put(JSON_TYPE_ONE_OF + ".field.1", struct);
checkNonObjectConversion(connectSchema, expected, schema, obj);
}

@Test
public void testToConnectMapOptionalValue() {
testToConnectMapOptional("some value");
Expand Down

0 comments on commit eabeec8

Please sign in to comment.