diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java index 8a62cccae549..24187f7dc8ae 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java @@ -27,10 +27,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.resources.MoreResources; @@ -59,6 +61,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -70,9 +74,13 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class TestDestination { + private static final Logger LOGGER = LoggerFactory.getLogger(TestDestination.class); + private TestDestinationEnv testEnv; private Path jobRoot; @@ -166,6 +174,7 @@ void setUpInternal() throws Exception { final Path workspaceRoot = Files.createTempDirectory(testDir, "test"); jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job")); localRoot = Files.createTempDirectory(testDir, "output"); + LOGGER.info("jobRoot: {}", jobRoot); testEnv = new TestDestinationEnv(localRoot); setup(testEnv); @@ -244,8 +253,9 @@ public void testSync(String messagesFilename, String catalogFilename) throws Exc @ParameterizedTest @ArgumentsSource(DataArgumentsProvider.class) public void testSyncWithNormalization(String messagesFilename, String catalogFilename) throws Exception { - if (!implementsBasicNormalization()) + if (!implementsBasicNormalization()) { return; + } final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class); final List messages = MoreResources.readResource(messagesFilename).lines() @@ -253,7 +263,7 @@ public void testSyncWithNormalization(String messagesFilename, String catalogFil runSync(getConfigWithBasicNormalization(), messages, catalog); assertSameMessages(messages, retrieveRecords(testEnv, catalog.getStreams().get(0).getName())); - assertEquivalentMessages(messages, retrieveNormalizedRecords(testEnv, catalog.getStreams().get(0).getName())); + assertSameMessagesPruneAirbyteInternalFields(messages, retrieveNormalizedRecords(testEnv, catalog.getStreams().get(0).getName())); } /** @@ -303,8 +313,10 @@ private void runSync(JsonNode config, List messages, AirbyteCata target.notifyEndOfStream(); target.close(); - if (!implementsBasicNormalization()) + // skip if basic normalization is not configured to run (either not set or false). + if (!config.hasNonNull(WorkerConstants.BASIC_NORMALIZATION_KEY) || !config.get(WorkerConstants.BASIC_NORMALIZATION_KEY).asBoolean()) { return; + } final NormalizationRunner runner = NormalizationRunnerFactory.create( getImageName(), @@ -325,22 +337,72 @@ private void assertSameMessages(List expected, List ac .map(AirbyteRecordMessage::getData) .collect(Collectors.toList()); - // we want to ignore order in this comparison. - assertEquals(expectedJson.size(), actual.size()); - assertTrue(expectedJson.containsAll(actual)); - assertTrue(actual.containsAll(expectedJson)); + assertSameData(expectedJson, actual); } - private void assertEquivalentMessages(List expected, List actual) { - final List expectedJson = expected.stream() + private void assertSameMessagesPruneAirbyteInternalFields(List expected, List actual) { + final List expectedPruned = expected.stream() .filter(message -> message.getType() == AirbyteMessage.Type.RECORD) .map(AirbyteMessage::getRecord) .map(AirbyteRecordMessage::getData) + .map(Jsons::clone) + .map(this::prune) .collect(Collectors.toList()); + final List actualPruned = actual.stream().map(this::prune).collect(Collectors.toList()); + assertSameData(expectedPruned, actualPruned); + } + + private void assertSameData(List expected, List actual) { + LOGGER.info("expected: {}", expected); + LOGGER.info("actual: {}", actual); + // we want to ignore order in this comparison. - assertEquals(expectedJson.size(), actual.size()); - // todo Message can be slightly different with additional columns? + assertEquals(expected.size(), actual.size()); + assertTrue(expected.containsAll(actual)); + assertTrue(actual.containsAll(expected)); + } + + /** + * Same as {@link #pruneMutate(JsonNode)}, except does a defensive copy and returns a new json node + * object instead of mutating in place. + * + * @param json - json that will be pruned. + * @return pruned json node. + */ + private JsonNode prune(JsonNode json) { + final JsonNode clone = Jsons.clone(json); + pruneMutate(clone); + return clone; + } + + /** + * Prune fields that are added internally by airbyte and are not part of the original data. Used so + * that we can compare data that is persisted by an Airbyte worker to the original data. This method + * mutates the provided json in place. + * + * @param json - json that will be pruned. will be mutated in place! + */ + private void pruneMutate(JsonNode json) { + final Set keys = Jsons.object(json, new TypeReference>() {}).keySet(); + for (final String key : keys) { + final JsonNode node = json.get(key); + // recursively prune all airbyte internal fields. + if (node.isObject() || node.isArray()) { + pruneMutate(node); + } + + // prune the following + // - airbyte internal fields + // - fields that match what airbyte generates as hash ids + // - null values -- normalization will often return `: null` but in the original data that key + // likely did not exist in the original message. the most consistent thing to do is always remove + // the null fields (this choice does decrease our ability to check that normalization creates + // columns even if all the values in that column are null) + if (Sets.newHashSet("emitted_at", "ab_id", "normalized_at").contains(key) || key.matches("^_.*_hashid$") || json.get(key).isNull()) { + ((ObjectNode) json).remove(key); + } + } } private JsonNode getConfigWithBasicNormalization() throws Exception { diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/resources/exchange_rate_catalog.json b/airbyte-integrations/bases/standard-destination-test/src/main/resources/exchange_rate_catalog.json index 5c98e8d2ba42..1b28762d4746 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/resources/exchange_rate_catalog.json +++ b/airbyte-integrations/bases/standard-destination-test/src/main/resources/exchange_rate_catalog.json @@ -12,6 +12,9 @@ }, "NZD": { "type": "number" + }, + "USD": { + "type": "number" } } } diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java index 02c24adce68e..82dff5fcee99 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java @@ -29,7 +29,6 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.db.Databases; import io.airbyte.integrations.standardtest.destination.TestDestination; -import io.airbyte.protocol.models.AirbyteMessage; import java.util.List; import java.util.stream.Collectors; import org.jooq.JSONFormat;