diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/transform/PubsubMessageToObjectNode.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/transform/PubsubMessageToObjectNode.java index 5c2f1b5c9..a89eb170b 100644 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/transform/PubsubMessageToObjectNode.java +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/transform/PubsubMessageToObjectNode.java @@ -13,7 +13,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; +import com.google.common.collect.Lists; import com.google.common.collect.Streams; import com.google.common.util.concurrent.UncheckedExecutionException; import com.google.pubsub.v1.PubsubMessage; @@ -265,7 +265,7 @@ private void transformForBqSchema(ObjectNode parent, List bqFields, final Map bqFieldMap = bqFields.stream() .collect(Collectors.toMap(Field::getName, Function.identity())); - for (String jsonFieldName : Sets.newHashSet(parent.fieldNames())) { + for (String jsonFieldName : Lists.newArrayList(parent.fieldNames())) { final JsonNode value = parent.get(jsonFieldName); final String bqFieldName; @@ -316,7 +316,7 @@ private static boolean isNestedListType(Field field, JsonNode value) { private void processField(String jsonFieldName, Field field, JsonNode value, ObjectNode parent, ObjectNode additionalProperties) { - final String name = field.getName(); + final String bqFieldName = field.getName(); // A record of key and value indicates we need to transformForBqSchema a map to an array. if (isMapType(field)) { @@ -332,8 +332,8 @@ private void processField(String jsonFieldName, Field field, JsonNode value, Obj // An array signifies a fixed length tuple which should be given anonymous field names. if (value.isArray()) { - updateParent(parent, name, processTupleField(jsonFieldName, field.getSubFields(), - (ArrayNode) value, additionalProperties)); + updateParent(parent, jsonFieldName, bqFieldName, processTupleField(jsonFieldName, + field.getSubFields(), (ArrayNode) value, additionalProperties)); } else { // Only transform value if it is not null if (value.isObject()) { @@ -344,7 +344,7 @@ private void processField(String jsonFieldName, Field field, JsonNode value, Obj additionalProperties.set(jsonFieldName, props); } } - updateParent(parent, name, value); + updateParent(parent, jsonFieldName, bqFieldName, value); } // Likewise, we need to recursively call transformForBqSchema on repeated record types. @@ -388,20 +388,17 @@ private void processField(String jsonFieldName, Field field, JsonNode value, Obj if (!Streams.stream(repeatedAdditionalProperties).allMatch(EMPTY_OBJECT::equals)) { additionalProperties.set(jsonFieldName, repeatedAdditionalProperties); } - updateParent(parent, name, value); + updateParent(parent, jsonFieldName, bqFieldName, value); // If we've made it here, we have a basic type or a list of basic types. } else { final Optional coerced = coerceToBqType(value, field); - if (coerced.isPresent()) { - updateParent(parent, name, coerced.get()); - } else { - // An empty coerced value means the actual type didn't match expected and we don't define - // a coercion. We put the value to additional_properties instead. - if (additionalProperties != null) { - additionalProperties.set(jsonFieldName, value); - } - parent.remove(name); + // use coerced.orElse(null) to remove the field via updateParent if necessary + updateParent(parent, jsonFieldName, bqFieldName, coerced.orElse(null)); + // If coerced is not present that means the actual type didn't match expected and we don't + // define a coercion. We put the value to additional_properties instead. + if (!coerced.isPresent() && additionalProperties != null) { + additionalProperties.set(jsonFieldName, value); } } } @@ -456,10 +453,12 @@ private void expandMapType(String jsonFieldName, ObjectNode value, Field field, final ArrayNode unmapped = Json.createArrayNode(); value.fields().forEachRemaining(e -> { - ObjectNode kv = Json.createObjectNode().put(FieldName.KEY, e.getKey()); + ObjectNode kv = Json.createObjectNode(); valueFieldOption .ifPresent(valueField -> processField(e.getKey(), valueField, e.getValue(), kv, props)); - unmapped.add(kv); + // add key after processField so it can't be dropped due to e.getKey() matching + // FieldName.KEY when e.getValue() is null or empty or can't be coerced + unmapped.add(kv.put(FieldName.KEY, e.getKey())); }); if (!Json.isNullOrEmpty(props)) { additionalProperties.set(jsonFieldName, props); @@ -567,11 +566,12 @@ private Optional coerceSingleValueToBqType(JsonNode o, Field field) { } } - private static void updateParent(ObjectNode parent, String name, JsonNode value) { + private static void updateParent(ObjectNode parent, String jsonFieldName, String bqFieldName, + JsonNode value) { if (Json.isNullOrEmpty(value)) { - parent.remove(name); + parent.remove(jsonFieldName); } else { - parent.set(name, value); + parent.set(bqFieldName, value); } } diff --git a/ingestion-sink/src/test/resources/testdata/payload-format-expected.ndjson b/ingestion-sink/src/test/resources/testdata/payload-format-expected.ndjson index 19f2c241b..1b04105c9 100644 --- a/ingestion-sink/src/test/resources/testdata/payload-format-expected.ndjson +++ b/ingestion-sink/src/test/resources/testdata/payload-format-expected.ndjson @@ -23,3 +23,5 @@ {"additional_properties":{},"test_nested_list":[{"list":["a","b","c"]}],"test_nested_csv":[{"list":"a,b,c"}],"test_list":["a","b","c"]} {"additional_properties":{},"test_nested_list":[{}]} {"additional_properties":{}} +{"additional_properties":{"test_int64_":"invalid"},"test_int64":8} +{"additional_properties":{},"test_map":[{"key":"key"},{"key":"value"},{"key":"other","value":"other"}]} diff --git a/ingestion-sink/src/test/resources/testdata/payload-format-input.ndjson b/ingestion-sink/src/test/resources/testdata/payload-format-input.ndjson index ed506c7ee..2f4757c73 100644 --- a/ingestion-sink/src/test/resources/testdata/payload-format-input.ndjson +++ b/ingestion-sink/src/test/resources/testdata/payload-format-input.ndjson @@ -23,3 +23,5 @@ {"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_nested_list":[["a","b","c"]],"test_nested_csv":[{"list":"a,b,c"}],"test_list":["a","b","c"]}} {"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_list":null,"test_nested_list":[null],"test_record":null}} {"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_record":{"key":null}}} +{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_int64 ":null,"test_int64":7,"test_int64_":"invalid","test_int64+":8}} +{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_map":{"key":null,"value":null,"other":"other"}}}