From e72d6859e4422b9dba9283c76b81ca4250c76fbd Mon Sep 17 00:00:00 2001 From: Sergio Ropero <42538006+sergio-ropero@users.noreply.github.com> Date: Thu, 9 Mar 2023 14:31:25 +0100 Subject: [PATCH] fix: Reduce errors on Postgres source tests (#23727) * Split test for readability and increase waiting time as possible culprit of random failure * Improve testDataContent() output and test all the types without instead of stopping the test in the first one. * Format and add documentation --- .../AbstractSourceDatabaseTypeTest.java | 91 ++++++++++++++----- .../CdcWalLogsPostgresSourceDatatypeTest.java | 2 +- 2 files changed, 68 insertions(+), 25 deletions(-) diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java index 8c4e9748f000..89e3b0d8a555 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java @@ -24,13 +24,7 @@ import io.airbyte.protocol.models.v0.SyncMode; import java.io.IOException; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -94,45 +88,85 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc protected abstract String getNameSpace(); /** - * Test the discover command. TODO (liren): This is a new unit test. Some existing databases may - * fail it, so it is turned off by default. It should be enabled for all databases eventually. + * Test the 'discover' command. TODO (liren): Some existing databases may fail testDataTypes(), so + * it is turned off by default. It should be enabled for all databases eventually. */ protected boolean testCatalog() { return false; } /** - * The test checks that connector can fetch prepared data without failure. + * The test checks that the types from the catalog matches the ones discovered from the source. This + * test is disabled by default. To enable it you need to overwrite testCatalog() function. */ @Test @SuppressWarnings("unchecked") public void testDataTypes() throws Exception { - final ConfiguredAirbyteCatalog catalog = getConfiguredCatalog(); - final List allMessages = runRead(catalog); - final UUID catalogId = runDiscover(); - final Map streams = getLastPersistedCatalog().getStreams().stream() - .collect(Collectors.toMap(AirbyteStream::getName, s -> s)); - final List recordMessages = allMessages.stream().filter(m -> m.getType() == Type.RECORD).toList(); - final Map> expectedValues = new HashMap<>(); - testDataHolders.forEach(testDataHolder -> { - if (testCatalog()) { + if (testCatalog()) { + runDiscover(); + final Map streams = getLastPersistedCatalog().getStreams().stream() + .collect(Collectors.toMap(AirbyteStream::getName, s -> s)); + + // testDataHolders should be initialized using the `addDataTypeTestData` function + testDataHolders.forEach(testDataHolder -> { final AirbyteStream airbyteStream = streams.get(testDataHolder.getNameWithTestPrefix()); final Map jsonSchemaTypeMap = (Map) Jsons.deserialize( airbyteStream.getJsonSchema().get("properties").get(getTestColumnName()).toString(), Map.class); assertEquals(testDataHolder.getAirbyteType().getJsonSchemaTypeMap(), jsonSchemaTypeMap, "Expected column type for " + testDataHolder.getNameWithTestPrefix()); + }); + } + } + + /** + * The test checks that connector can fetch prepared data without failure. It uses a prepared + * catalog and read the source using that catalog. Then makes sure that the expected values are the + * ones inserted in the source. + */ + @Test + public void testDataContent() throws Exception { + // Class used to make easier the error reporting + class MissedRecords { + + // Stream that is missing any value + public String streamName; + // Type associated to the test + public String dataType; + // Which are the values that has not being gathered from the source + public List missedValues; + + public MissedRecords(String streamName, String dataType, List missedValues) { + this.streamName = streamName; + this.dataType = dataType; + this.missedValues = missedValues; } + } + + final ConfiguredAirbyteCatalog catalog = getConfiguredCatalog(); + final List allMessages = runRead(catalog); + + final List recordMessages = allMessages.stream().filter(m -> m.getType() == Type.RECORD).toList(); + final Map> expectedValues = new HashMap<>(); + final Map testTypes = new HashMap<>(); + final ArrayList missedValues = new ArrayList<>(); + + // If there is no expected value in the test set we don't include it in the list to be asserted + // (even if the table contains records) + testDataHolders.forEach(testDataHolder -> { if (!testDataHolder.getExpectedValues().isEmpty()) { expectedValues.put(testDataHolder.getNameWithTestPrefix(), testDataHolder.getExpectedValues()); + testTypes.put(testDataHolder.getNameWithTestPrefix(), testDataHolder.getSourceType()); + } else { + LOGGER.warn("Missing expected values for type: " + testDataHolder.getSourceType()); } }); - for (final AirbyteMessage msg : recordMessages) { - final String streamName = msg.getRecord().getStream(); + for (final AirbyteMessage message : recordMessages) { + final String streamName = message.getRecord().getStream(); final List expectedValuesForStream = expectedValues.get(streamName); if (expectedValuesForStream != null) { - final String value = getValueFromJsonNode(msg.getRecord().getData().get(getTestColumnName())); + final String value = getValueFromJsonNode(message.getRecord().getData().get(getTestColumnName())); assertTrue(expectedValuesForStream.contains(value), String.format("Returned value '%s' from stream %s is not in the expected list: %s", value, streamName, expectedValuesForStream)); @@ -140,8 +174,17 @@ public void testDataTypes() throws Exception { } } - expectedValues.forEach((streamName, values) -> assertTrue(values.isEmpty(), - "The streamer " + streamName + " should return all expected values. Missing values: " + values)); + // Gather all the missing values, so we don't stop the test in the first missed one + expectedValues.forEach((streamName, values) -> { + if (!values.isEmpty()) { + missedValues.add(new MissedRecords(streamName, testTypes.get(streamName), values)); + } + }); + + assertTrue(missedValues.isEmpty(), + missedValues.stream().map((entry) -> // stream each entry, map it to string value + "The stream '" + entry.streamName + "' checking type '" + entry.dataType + "' is missing values: " + entry.missedValues) + .collect(Collectors.joining("\n"))); // and join them } protected String getValueFromJsonNode(final JsonNode jsonNode) throws IOException { diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java index 91a5fec065de..572a4ae31a54 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java @@ -30,7 +30,7 @@ public class CdcWalLogsPostgresSourceDatatypeTest extends AbstractPostgresSource private static final String SCHEMA_NAME = "test"; private static final String SLOT_NAME_BASE = "debezium_slot"; private static final String PUBLICATION = "publication"; - private static final int INITIAL_WAITING_SECONDS = 30; + private static final int INITIAL_WAITING_SECONDS = 15; private JsonNode stateAfterFirstSync; @Override