Skip to content

Commit

Permalink
Fix handling of duplicate normalized field names in sink (#2610)
Browse files Browse the repository at this point in the history
* Add test to confirm various working cases of duplicate normalized field combinations

* Add test to reproduce #2609

* Don't try to expand map values again for duplicate normalized field names

* Merge tests

* Update comment

* Modify input example to show which values are overwritten
  • Loading branch information
akkomar authored Aug 22, 2024
1 parent 0a5ef4a commit bc7bad8
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,11 @@ private void processField(String jsonFieldName, Field field, JsonNode value, Obj

// A record of key and value indicates we need to transformForBqSchema a map to an array.
} else if (isMapType(field)) {
expandMapType(jsonFieldName, (ObjectNode) value, field, parent, additionalProperties);

if (value.isObject()) {
// If the value is not an ObjectNode, we're dealing with a duplicate normalized field name
// which value has already been expanded to an array. We don't need to process it again.
expandMapType(jsonFieldName, (ObjectNode) value, field, parent, additionalProperties);
}
// A record with a single "list" field and a list value should be expanded appropriately.
} else if (isNestedListType(field, value)) {
expandNestedListType(jsonFieldName, (ArrayNode) value, field, parent, additionalProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,24 +193,21 @@ private static Stream<Map.Entry<Map<String, String>, byte[]>> asPubsubMessage(Ob
Json.asBytes(node.get(FieldName.PAYLOAD))).entrySet().stream();
}

@Test
public void canFormatAsPayload() throws IOException {
private void transformAndTestOutput(List<String> strictSchemaDocTypes, String inputMessagesPath,
String outputMessagesPath) throws IOException {
PubsubMessageToObjectNode transform = PubsubMessageToObjectNode //
.Payload.of(ImmutableList.of("namespace_0/foo"), TestConstant.SCHEMAS_LOCATION,
FileInputStream::new);
.Payload.of(strictSchemaDocTypes, TestConstant.SCHEMAS_LOCATION, FileInputStream::new);

final List<Map.Entry<Map<String, String>, byte[]>> input;
final String inputPath = Resources.getResource("testdata/payload-format-input.ndjson")
.getPath();
final String inputPath = Resources.getResource(inputMessagesPath).getPath();
try (Stream<String> stream = Files.lines(Paths.get(inputPath))) {
input = stream.map(IOFunction.unchecked(Json::readObjectNode))
.flatMap(IOFunction.unchecked(PubsubMessageToObjectNodeSinkTest::asPubsubMessage))
.collect(Collectors.toList());
}

final List<Map<String, Object>> expected;
final String expectedPath = Resources.getResource("testdata/payload-format-expected.ndjson")
.getPath();
final String expectedPath = Resources.getResource(outputMessagesPath).getPath();
try (Stream<String> stream = Files.lines(Paths.get(expectedPath))) {
expected = stream.map(IOFunction.unchecked(Json::readObjectNode)).map(node -> {
// convert additional_properties to a json string if present
Expand All @@ -229,4 +226,26 @@ public void canFormatAsPayload() throws IOException {
Json.asMap(transform.apply(input.get(i).getKey(), input.get(i).getValue())));
}
}

@Test
public void canFormatAsPayload() throws IOException {
transformAndTestOutput(ImmutableList.of("namespace_0/foo"),
"testdata/payload-format-input.ndjson", "testdata/payload-format-expected.ndjson");
}

@Test
public void canFormatAsPayloadWIthDuplicateNormalizedFieldNames() throws IOException {
// This test is to ensure that the transform does not fail when different JSON field names are
// normalized to the same BQ field name. It was added as a response to
// https://github.com/mozilla/gcp-ingestion/issues/2609 and uses excessively detailed inputs to
// ensure that while fixing this issue we do not modify current behavior of the sink.
transformAndTestOutput(null,
"testdata/payload-format-duplicate-normalized-field-names-input.ndjson",
"testdata/payload-format-duplicate-normalized-field-names-expected.ndjson");
// This is the failure scenario discovered in
// https://github.com/mozilla/gcp-ingestion/issues/2609
transformAndTestOutput(null,
"testdata/payload-format-duplicate-normalized-map-field-input.ndjson",
"testdata/payload-format-duplicate-normalized-map-field-expected.ndjson");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"additional_properties":{},"payload":{"test_int":-2}}
{"additional_properties":{},"payload":{"test_int":-2}}
{"additional_properties":{},"test_repeated_record":[{"field":"value"}],"test_record":{"key":"value"}}
{"additional_properties":{},"test_repeated_record":[{"field":"value"}],"test_record":{"key":"value"}}
{"additional_properties":{"test_repeated_record":[{},{"reject":"value","alsoReject":null},{}]},"test_repeated_record":[{"field":"value"},{},{}]}
{"additional_properties":{},"test_repeated_tuple":[{"f0_":"string","f1_":5,"f2_":true}]}
{"additional_properties":{},"test_repeated_tuple":[{"f0_":"string","f1_":5,"f2_":true}]}
{"additional_properties":{},"test_tuple":{"f0_":"string","f1_":5,"f2_":true}}
{"additional_properties":{},"test_tuple":{"f0_":"string","f1_":5,"f2_":true}}
{"additional_properties":{},"test_map":[{"key":"key","value":"value"}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"attributeMap":{"document_namespace":"namespace_0","document_type":"bar","document_version":"1"},"payload":{"payload":{"test_int":-3, "test.int":-2}}}
{"attributeMap":{"document_namespace":"namespace_0","document_type":"bar","document_version":"1"},"payload":{"payload":{"test.int":-2, "test_int":-3}}}
{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_repeated_record":[{"field":"lost_value"}],"test.repeated_record":[{"field":"value"}],"test_record":{"key":"lost_value"},"test.record":{"key":"value"}}}
{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test.repeated_record":[{"field":"value"}],"test_repeated_record":[{"field":"lost_value"}],"test.record":{"key":"value"},"test_record":{"key":"lost_value"}}}
{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_repeated_record":[{"field":"value"},{"reject":"value","alsoReject":null},{"field":null}]}}
{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_repeated_tuple":[["string",4,true]],"test.repeated_tuple":[["string",5,true]]}}
{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test.repeated_tuple":[["string",5,true]],"test_repeated_tuple":[["string",4,true]]}}
{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_tuple":["string",4,true],"test.tuple":["string",5,true]}}
{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test.tuple":["string",5,true],"test_tuple":["string",4,true]}}
{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_map":{"key":"lost_value"},"test.map":{"key":"value"}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"additional_properties":{},"test_map":[{"key":"key","value":"value"}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test.map":{"key":"value"},"test_map":{"key":"lost_value"}}}

0 comments on commit bc7bad8

Please sign in to comment.