From 606e59eb0dae56fd6d842e0754e2113a9542c577 Mon Sep 17 00:00:00 2001 From: Greg Solovyev Date: Thu, 5 Jan 2023 08:26:43 -0800 Subject: [PATCH] Common Jsons: add flag to apply flatten to arrays (#20993) * add flag to apply flatten to arrays * add additional unit test cases for array flattening * add backward compatibility function * bump dest-redshift version and add changelog Co-authored-by: marcosmarxm Co-authored-by: Adam Bloom Co-authored-by: Marcos Marx Co-authored-by: Davin Chia --- .../java/io/airbyte/commons/json/Jsons.java | 27 +++++- .../io/airbyte/commons/json/JsonsTest.java | 97 ++++++++++++++++--- .../destination-redshift/Dockerfile | 2 +- .../operations/RedshiftSqlOperations.java | 2 +- docs/integrations/destinations/redshift.md | 1 + 5 files changed, 112 insertions(+), 17 deletions(-) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java b/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java index e5612a8e205b..0a6b6e876b29 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java @@ -253,17 +253,29 @@ public static int getIntOrZero(final JsonNode json, final List keys) { } /** - * Flattens an ObjectNode, or dumps it into a {null: value} map if it's not an object. + * Flattens an ObjectNode, or dumps it into a {null: value} map if it's not an object. When + * applyFlattenToArray is true, each element in the array will be one entry in the returned map. + * This behavior is used in the Redshift SUPER type. When it is false, the whole array will be one + * entry. This is used in the JobTracker. */ @SuppressWarnings("PMD.ForLoopCanBeForeach") - public static Map flatten(final JsonNode node) { + public static Map flatten(final JsonNode node, final Boolean applyFlattenToArray) { if (node.isObject()) { final Map output = new HashMap<>(); for (final Iterator> it = node.fields(); it.hasNext();) { final Entry entry = it.next(); final String field = entry.getKey(); final JsonNode value = entry.getValue(); - mergeMaps(output, field, flatten(value)); + mergeMaps(output, field, flatten(value, applyFlattenToArray)); + } + return output; + } else if (node.isArray() && applyFlattenToArray) { + final Map output = new HashMap<>(); + final int arrayLen = node.size(); + for (int i = 0; i < arrayLen; i++) { + final String field = String.format("[%d]", i); + final JsonNode value = node.get(i); + mergeMaps(output, field, flatten(value, applyFlattenToArray)); } return output; } else { @@ -286,6 +298,15 @@ public static Map flatten(final JsonNode node) { } } + /** + * Flattens an ObjectNode, or dumps it into a {null: value} map if it's not an object. New usage of + * this function is best to explicitly declare the intended array mode. This version is provided for + * backward compatibility. + */ + public static Map flatten(final JsonNode node) { + return flatten(node, false); + } + /** * Prepend all keys in subMap with prefix, then merge that map into originalMap. *

diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/json/JsonsTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/json/JsonsTest.java index d1a844c02a5e..3d1d50e6f21d 100644 --- a/airbyte-commons/src/test/java/io/airbyte/commons/json/JsonsTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/json/JsonsTest.java @@ -21,8 +21,11 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; class JsonsTest { @@ -31,6 +34,11 @@ class JsonsTest { private static final String SERIALIZED_JSON2 = "{\"str\":\"abc\"}"; private static final String ABC = "abc"; private static final String DEF = "def"; + private static final String GHI = "ghi"; + private static final String JKL = "jkl"; + private static final String MNO = "mno"; + private static final String PQR = "pqr"; + private static final String STU = "stu"; private static final String TEST = "test"; private static final String TEST2 = "test2"; private static final String XYZ = "xyz"; @@ -60,7 +68,8 @@ void testSerializeJsonNode() { Jsons.serialize(Jsons.jsonNode(ImmutableMap.of( TEST, ABC, TEST2, DEF)))); - // issue: 5878 add test for binary node serialization, binary data are serialized into base64 + // issue: 5878 add test for binary node serialization, binary data are + // serialized into base64 assertEquals( "{\"test\":\"dGVzdA==\"}", Jsons.serialize(Jsons.jsonNode(ImmutableMap.of( @@ -83,7 +92,8 @@ void testDeserializeToJsonNode() { assertEquals( "[{\"str\":\"abc\"},{\"str\":\"abc\"}]", Jsons.deserialize("[{\"str\":\"abc\"},{\"str\":\"abc\"}]").toString()); - // issue: 5878 add test for binary node deserialization, for now should be base64 string + // issue: 5878 add test for binary node deserialization, for now should be + // base64 string assertEquals( "{\"test\":\"dGVzdA==\"}", Jsons.deserialize("{\"test\":\"dGVzdA==\"}").toString()); @@ -230,26 +240,27 @@ void testToPrettyString() { @Test void testGetOptional() { - final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": {}, \"mno\": \"pqr\", \"stu\": null }"); + final JsonNode json = Jsons + .deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": {}, \"mno\": \"pqr\", \"stu\": null }"); - assertEquals(Optional.of(Jsons.jsonNode("ghi")), Jsons.getOptional(json, "abc", "def")); - assertEquals(Optional.of(Jsons.emptyObject()), Jsons.getOptional(json, "jkl")); - assertEquals(Optional.of(Jsons.jsonNode("pqr")), Jsons.getOptional(json, "mno")); - assertEquals(Optional.of(Jsons.jsonNode(null)), Jsons.getOptional(json, "stu")); + assertEquals(Optional.of(Jsons.jsonNode(GHI)), Jsons.getOptional(json, ABC, DEF)); + assertEquals(Optional.of(Jsons.emptyObject()), Jsons.getOptional(json, JKL)); + assertEquals(Optional.of(Jsons.jsonNode(PQR)), Jsons.getOptional(json, MNO)); + assertEquals(Optional.of(Jsons.jsonNode(null)), Jsons.getOptional(json, STU)); assertEquals(Optional.empty(), Jsons.getOptional(json, XYZ)); assertEquals(Optional.empty(), Jsons.getOptional(json, ABC, XYZ)); assertEquals(Optional.empty(), Jsons.getOptional(json, ABC, DEF, XYZ)); - assertEquals(Optional.empty(), Jsons.getOptional(json, ABC, "jkl", XYZ)); - assertEquals(Optional.empty(), Jsons.getOptional(json, "stu", XYZ)); + assertEquals(Optional.empty(), Jsons.getOptional(json, ABC, JKL, XYZ)); + assertEquals(Optional.empty(), Jsons.getOptional(json, STU, XYZ)); } @Test void testGetStringOrNull() { final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": \"mno\", \"pqr\": 1 }"); - assertEquals("ghi", Jsons.getStringOrNull(json, ABC, DEF)); - assertEquals("mno", Jsons.getStringOrNull(json, "jkl")); - assertEquals("1", Jsons.getStringOrNull(json, "pqr")); + assertEquals(GHI, Jsons.getStringOrNull(json, ABC, DEF)); + assertEquals(MNO, Jsons.getStringOrNull(json, JKL)); + assertEquals("1", Jsons.getStringOrNull(json, PQR)); assertNull(Jsons.getStringOrNull(json, ABC, DEF, XYZ)); assertNull(Jsons.getStringOrNull(json, XYZ)); } @@ -260,6 +271,68 @@ void testGetEstimatedByteSize() { assertEquals(Jsons.toBytes(json).length, Jsons.getEstimatedByteSize(json)); } + @Test + void testFlatten__noArrays() { + final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": true, \"pqr\": 1 }"); + Map expected = Stream.of(new Object[][] { + {"abc.def", GHI}, + {JKL, true}, + {PQR, 1}, + }).collect(Collectors.toMap(data -> (String) data[0], data -> data[1])); + assertEquals(expected, Jsons.flatten(json, false)); + } + + @Test + void testFlatten__withArraysNoApplyFlatten() { + final JsonNode json = Jsons + .deserialize("{ \"abc\": [{ \"def\": \"ghi\" }, { \"fed\": \"ihg\" }], \"jkl\": true, \"pqr\": 1 }"); + Map expected = Stream.of(new Object[][] { + {ABC, "[{\"def\":\"ghi\"},{\"fed\":\"ihg\"}]"}, + {JKL, true}, + {PQR, 1}, + }).collect(Collectors.toMap(data -> (String) data[0], data -> data[1])); + assertEquals(expected, Jsons.flatten(json, false)); + } + + @Test + void testFlatten__checkBackwardCompatiblity() { + final JsonNode json = Jsons + .deserialize("{ \"abc\": [{ \"def\": \"ghi\" }, { \"fed\": \"ihg\" }], \"jkl\": true, \"pqr\": 1 }"); + Map expected = Stream.of(new Object[][] { + {ABC, "[{\"def\":\"ghi\"},{\"fed\":\"ihg\"}]"}, + {JKL, true}, + {PQR, 1}, + }).collect(Collectors.toMap(data -> (String) data[0], data -> data[1])); + assertEquals(expected, Jsons.flatten(json)); + } + + @Test + void testFlatten__withArraysApplyFlatten() { + final JsonNode json = Jsons + .deserialize("{ \"abc\": [{ \"def\": \"ghi\" }, { \"fed\": \"ihg\" }], \"jkl\": true, \"pqr\": 1 }"); + Map expected = Stream.of(new Object[][] { + {"abc.[0].def", "ghi"}, + {"abc.[1].fed", "ihg"}, + {JKL, true}, + {PQR, 1}, + }).collect(Collectors.toMap(data -> (String) data[0], data -> data[1])); + assertEquals(expected, Jsons.flatten(json, true)); + } + + @Test + void testFlatten__withArraysApplyFlattenNested() { + final JsonNode json = Jsons + .deserialize( + "{ \"abc\": [{ \"def\": {\"ghi\": [\"xyz\"] }}, { \"fed\": \"ihg\" }], \"jkl\": true, \"pqr\": 1 }"); + Map expected = Stream.of(new Object[][] { + {"abc.[0].def.ghi.[0]", "xyz"}, + {"abc.[1].fed", "ihg"}, + {JKL, true}, + {PQR, 1}, + }).collect(Collectors.toMap(data -> (String) data[0], data -> data[1])); + assertEquals(expected, Jsons.flatten(json, true)); + } + private static class ToClass { @JsonProperty("str") diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile index 24a17530da92..5f06bd32cc8e 100644 --- a/airbyte-integrations/connectors/destination-redshift/Dockerfile +++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.3.52 +LABEL io.airbyte.version=0.3.53 LABEL io.airbyte.name=airbyte/destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java index 8b41e41fefb5..08213cd3cb17 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java @@ -112,7 +112,7 @@ public boolean isValidData(final JsonNode data) { // check VARCHAR limits for VARCHAR fields within the SUPER object, if overall object is valid if (isValid) { - final Map dataMap = Jsons.flatten(data); + final Map dataMap = Jsons.flatten(data, true); for (final Object value : dataMap.values()) { if (value instanceof String stringValue) { final int stringDataSize = stringValue.getBytes(StandardCharsets.UTF_8).length; diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 2ffd20a4bc70..da21eb413b7c 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -141,6 +141,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.3.53 | 2023-01-03 | [\#17273](https://github.com/airbytehq/airbyte/pull/17273) | Fixed handling of arrays in SUPER maximum size check | | 0.3.52 | 2022-12-30 | [\#20879](https://github.com/airbytehq/airbyte/pull/20879) | Added configurable parameter for number of file buffers | | 0.3.51 | 2022-10-26 | [\#18434](https://github.com/airbytehq/airbyte/pull/18434) | Fix empty S3 bucket path handling | | 0.3.50 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |