From d5685450ef0edd974d980fe80f75043783a096c2 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 10 Nov 2020 09:06:09 -0800 Subject: [PATCH 1/3] wip --- .../destination/TestDestination.java | 60 +++++++++++++++++-- .../main/resources/exchange_rate_catalog.json | 3 + .../postgres/PostgresIntegrationTest.java | 1 - 3 files changed, 59 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java index 8a62cccae549..ff0bdbbcb65e 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java @@ -27,10 +27,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.resources.MoreResources; @@ -59,6 +61,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -70,9 +74,13 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class TestDestination { + private static final Logger LOGGER = LoggerFactory.getLogger(TestDestination.class); + private TestDestinationEnv testEnv; private Path jobRoot; @@ -166,6 +174,7 @@ void setUpInternal() throws Exception { final Path workspaceRoot = Files.createTempDirectory(testDir, "test"); jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job")); localRoot = Files.createTempDirectory(testDir, "output"); + LOGGER.info("jobRoot: {}", jobRoot); testEnv = new TestDestinationEnv(localRoot); setup(testEnv); @@ -303,7 +312,7 @@ private void runSync(JsonNode config, List messages, AirbyteCata target.notifyEndOfStream(); target.close(); - if (!implementsBasicNormalization()) + if (config.get(WorkerConstants.BASIC_NORMALIZATION_KEY).isNull() || !config.get(WorkerConstants.BASIC_NORMALIZATION_KEY).asBoolean()) return; final NormalizationRunner runner = NormalizationRunnerFactory.create( @@ -325,6 +334,9 @@ private void assertSameMessages(List expected, List ac .map(AirbyteRecordMessage::getData) .collect(Collectors.toList()); + LOGGER.info("expected: {}", expectedJson); + LOGGER.info("actual: {}", actual); + // we want to ignore order in this comparison. assertEquals(expectedJson.size(), actual.size()); assertTrue(expectedJson.containsAll(actual)); @@ -332,15 +344,55 @@ private void assertSameMessages(List expected, List ac } private void assertEquivalentMessages(List expected, List actual) { - final List expectedJson = expected.stream() + final List expectedPruned = expected.stream() .filter(message -> message.getType() == AirbyteMessage.Type.RECORD) .map(AirbyteMessage::getRecord) .map(AirbyteRecordMessage::getData) + .map(Jsons::clone) + .map(this::prune) .collect(Collectors.toList()); + final List actualPruned = actual.stream().map(this::prune).collect(Collectors.toList()); + + LOGGER.info("expectedPruned: {}", expectedPruned); + LOGGER.info("actualPruned: {}", actualPruned); // we want to ignore order in this comparison. - assertEquals(expectedJson.size(), actual.size()); - // todo Message can be slightly different with additional columns? + assertEquals(expectedPruned.size(), actualPruned.size()); + assertTrue(expectedPruned.containsAll(actualPruned)); + assertTrue(actualPruned.containsAll(expectedPruned)); + } + + /** + * Same as {@link #pruneMutate(JsonNode)}, except does a defensive copy and returns a new json node + * object instead of mutating in place. + * + * @param json - json that will be pruned. + * @return pruned json node. + */ + private JsonNode prune(JsonNode json) { + final JsonNode clone = Jsons.clone(json); + pruneMutate(clone); + return clone; + } + + /** + * Prune fields that are added internally by airbyte and are not part of the original data. Used so + * that we can compare data that is persisted by an Airbyte worker to the original data. + * + * @param json - json that will be pruned. + */ + private void pruneMutate(JsonNode json) { + final Set keys = Jsons.object(json, new TypeReference>() {}).keySet(); + for (final String key : keys) { + final JsonNode node = json.get(key); + if (node.isObject() || node.isArray()) { + pruneMutate(node); + } + + if (Sets.newHashSet("emitted_at", "ab_id", "normalized_at").contains(key) || key.matches("^_.*_hashid$") || json.get(key).isNull()) { + ((ObjectNode) json).remove(key); + } + } } private JsonNode getConfigWithBasicNormalization() throws Exception { diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/resources/exchange_rate_catalog.json b/airbyte-integrations/bases/standard-destination-test/src/main/resources/exchange_rate_catalog.json index 5c98e8d2ba42..1b28762d4746 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/resources/exchange_rate_catalog.json +++ b/airbyte-integrations/bases/standard-destination-test/src/main/resources/exchange_rate_catalog.json @@ -12,6 +12,9 @@ }, "NZD": { "type": "number" + }, + "USD": { + "type": "number" } } } diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java index 02c24adce68e..82dff5fcee99 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java @@ -29,7 +29,6 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.db.Databases; import io.airbyte.integrations.standardtest.destination.TestDestination; -import io.airbyte.protocol.models.AirbyteMessage; import java.util.List; import java.util.stream.Collectors; import org.jooq.JSONFormat; From 9c46335cf979da08c948f5c2b079cc61330877cb Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 10 Nov 2020 09:25:27 -0800 Subject: [PATCH 2/3] clean up --- .../destination/TestDestination.java | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java index ff0bdbbcb65e..1f87c61b61b9 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java @@ -253,8 +253,9 @@ public void testSync(String messagesFilename, String catalogFilename) throws Exc @ParameterizedTest @ArgumentsSource(DataArgumentsProvider.class) public void testSyncWithNormalization(String messagesFilename, String catalogFilename) throws Exception { - if (!implementsBasicNormalization()) + if (!implementsBasicNormalization()) { return; + } final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class); final List messages = MoreResources.readResource(messagesFilename).lines() @@ -262,7 +263,7 @@ public void testSyncWithNormalization(String messagesFilename, String catalogFil runSync(getConfigWithBasicNormalization(), messages, catalog); assertSameMessages(messages, retrieveRecords(testEnv, catalog.getStreams().get(0).getName())); - assertEquivalentMessages(messages, retrieveNormalizedRecords(testEnv, catalog.getStreams().get(0).getName())); + assertSameMessagesPruneAirbyteInternalFields(messages, retrieveNormalizedRecords(testEnv, catalog.getStreams().get(0).getName())); } /** @@ -312,8 +313,10 @@ private void runSync(JsonNode config, List messages, AirbyteCata target.notifyEndOfStream(); target.close(); - if (config.get(WorkerConstants.BASIC_NORMALIZATION_KEY).isNull() || !config.get(WorkerConstants.BASIC_NORMALIZATION_KEY).asBoolean()) + // skip if basic normalization is not configured to run (either not set or false). + if (!config.hasNonNull(WorkerConstants.BASIC_NORMALIZATION_KEY) || !config.get(WorkerConstants.BASIC_NORMALIZATION_KEY).asBoolean()) { return; + } final NormalizationRunner runner = NormalizationRunnerFactory.create( getImageName(), @@ -334,16 +337,10 @@ private void assertSameMessages(List expected, List ac .map(AirbyteRecordMessage::getData) .collect(Collectors.toList()); - LOGGER.info("expected: {}", expectedJson); - LOGGER.info("actual: {}", actual); - - // we want to ignore order in this comparison. - assertEquals(expectedJson.size(), actual.size()); - assertTrue(expectedJson.containsAll(actual)); - assertTrue(actual.containsAll(expectedJson)); + assertSameData(expectedJson, actual); } - private void assertEquivalentMessages(List expected, List actual) { + private void assertSameMessagesPruneAirbyteInternalFields(List expected, List actual) { final List expectedPruned = expected.stream() .filter(message -> message.getType() == AirbyteMessage.Type.RECORD) .map(AirbyteMessage::getRecord) @@ -353,13 +350,17 @@ private void assertEquivalentMessages(List expected, List actualPruned = actual.stream().map(this::prune).collect(Collectors.toList()); + assertSameData(expectedPruned, actualPruned); + } + + private void assertSameData(List expected, List actual) { + LOGGER.info("expected: {}", expected); + LOGGER.info("actual: {}", actual); - LOGGER.info("expectedPruned: {}", expectedPruned); - LOGGER.info("actualPruned: {}", actualPruned); // we want to ignore order in this comparison. - assertEquals(expectedPruned.size(), actualPruned.size()); - assertTrue(expectedPruned.containsAll(actualPruned)); - assertTrue(actualPruned.containsAll(expectedPruned)); + assertEquals(expected.size(), actual.size()); + assertTrue(expected.containsAll(actual)); + assertTrue(actual.containsAll(expected)); } /** @@ -385,10 +386,18 @@ private void pruneMutate(JsonNode json) { final Set keys = Jsons.object(json, new TypeReference>() {}).keySet(); for (final String key : keys) { final JsonNode node = json.get(key); + // recursively prune all airbyte internal fields. if (node.isObject() || node.isArray()) { pruneMutate(node); } + // prune the following + // - airbyte internal fields + // - fields that match what airbyte generates as hash ids + // - null values -- normalization will often return `: null` but in the original data that key + // likely did not exist in the original message. the most consistent thing to do is always remove + // the null fields (this choice does decrease our ability to check that normalization creates + // columns even if all the values in that column are null) if (Sets.newHashSet("emitted_at", "ab_id", "normalized_at").contains(key) || key.matches("^_.*_hashid$") || json.get(key).isNull()) { ((ObjectNode) json).remove(key); } From c3c3a5957f295e676ff8240bec7e66cbe6ed6937 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 10 Nov 2020 09:26:49 -0800 Subject: [PATCH 3/3] one more comment --- .../standardtest/destination/TestDestination.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java index 1f87c61b61b9..24187f7dc8ae 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java @@ -378,9 +378,10 @@ private JsonNode prune(JsonNode json) { /** * Prune fields that are added internally by airbyte and are not part of the original data. Used so - * that we can compare data that is persisted by an Airbyte worker to the original data. + * that we can compare data that is persisted by an Airbyte worker to the original data. This method + * mutates the provided json in place. * - * @param json - json that will be pruned. + * @param json - json that will be pruned. will be mutated in place! */ private void pruneMutate(JsonNode json) { final Set keys = Jsons.object(json, new TypeReference>() {}).keySet();