Skip to content

Commit

Permalink
fix: Reduce errors on Postgres source tests (#23727)
Browse files Browse the repository at this point in the history
* Split test for readability and increase waiting time as possible culprit of random failure

* Improve testDataContent() output and test all the types without instead of stopping the test in the first one.

* Format and add documentation
  • Loading branch information
sergio-ropero authored Mar 9, 2023
1 parent 0036de4 commit e72d685
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,7 @@
import io.airbyte.protocol.models.v0.SyncMode;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
Expand Down Expand Up @@ -94,54 +88,103 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
protected abstract String getNameSpace();

/**
* Test the discover command. TODO (liren): This is a new unit test. Some existing databases may
* fail it, so it is turned off by default. It should be enabled for all databases eventually.
* Test the 'discover' command. TODO (liren): Some existing databases may fail testDataTypes(), so
* it is turned off by default. It should be enabled for all databases eventually.
*/
protected boolean testCatalog() {
return false;
}

/**
* The test checks that connector can fetch prepared data without failure.
* The test checks that the types from the catalog matches the ones discovered from the source. This
* test is disabled by default. To enable it you need to overwrite testCatalog() function.
*/
@Test
@SuppressWarnings("unchecked")
public void testDataTypes() throws Exception {
final ConfiguredAirbyteCatalog catalog = getConfiguredCatalog();
final List<AirbyteMessage> allMessages = runRead(catalog);
final UUID catalogId = runDiscover();
final Map<String, AirbyteStream> streams = getLastPersistedCatalog().getStreams().stream()
.collect(Collectors.toMap(AirbyteStream::getName, s -> s));
final List<AirbyteMessage> recordMessages = allMessages.stream().filter(m -> m.getType() == Type.RECORD).toList();
final Map<String, List<String>> expectedValues = new HashMap<>();
testDataHolders.forEach(testDataHolder -> {
if (testCatalog()) {
if (testCatalog()) {
runDiscover();
final Map<String, AirbyteStream> streams = getLastPersistedCatalog().getStreams().stream()
.collect(Collectors.toMap(AirbyteStream::getName, s -> s));

// testDataHolders should be initialized using the `addDataTypeTestData` function
testDataHolders.forEach(testDataHolder -> {
final AirbyteStream airbyteStream = streams.get(testDataHolder.getNameWithTestPrefix());
final Map<String, Object> jsonSchemaTypeMap = (Map<String, Object>) Jsons.deserialize(
airbyteStream.getJsonSchema().get("properties").get(getTestColumnName()).toString(), Map.class);
assertEquals(testDataHolder.getAirbyteType().getJsonSchemaTypeMap(), jsonSchemaTypeMap,
"Expected column type for " + testDataHolder.getNameWithTestPrefix());
});
}
}

/**
* The test checks that connector can fetch prepared data without failure. It uses a prepared
* catalog and read the source using that catalog. Then makes sure that the expected values are the
* ones inserted in the source.
*/
@Test
public void testDataContent() throws Exception {
// Class used to make easier the error reporting
class MissedRecords {

// Stream that is missing any value
public String streamName;
// Type associated to the test
public String dataType;
// Which are the values that has not being gathered from the source
public List<String> missedValues;

public MissedRecords(String streamName, String dataType, List<String> missedValues) {
this.streamName = streamName;
this.dataType = dataType;
this.missedValues = missedValues;
}

}

final ConfiguredAirbyteCatalog catalog = getConfiguredCatalog();
final List<AirbyteMessage> allMessages = runRead(catalog);

final List<AirbyteMessage> recordMessages = allMessages.stream().filter(m -> m.getType() == Type.RECORD).toList();
final Map<String, List<String>> expectedValues = new HashMap<>();
final Map<String, String> testTypes = new HashMap<>();
final ArrayList<MissedRecords> missedValues = new ArrayList<>();

// If there is no expected value in the test set we don't include it in the list to be asserted
// (even if the table contains records)
testDataHolders.forEach(testDataHolder -> {
if (!testDataHolder.getExpectedValues().isEmpty()) {
expectedValues.put(testDataHolder.getNameWithTestPrefix(), testDataHolder.getExpectedValues());
testTypes.put(testDataHolder.getNameWithTestPrefix(), testDataHolder.getSourceType());
} else {
LOGGER.warn("Missing expected values for type: " + testDataHolder.getSourceType());
}
});

for (final AirbyteMessage msg : recordMessages) {
final String streamName = msg.getRecord().getStream();
for (final AirbyteMessage message : recordMessages) {
final String streamName = message.getRecord().getStream();
final List<String> expectedValuesForStream = expectedValues.get(streamName);
if (expectedValuesForStream != null) {
final String value = getValueFromJsonNode(msg.getRecord().getData().get(getTestColumnName()));
final String value = getValueFromJsonNode(message.getRecord().getData().get(getTestColumnName()));
assertTrue(expectedValuesForStream.contains(value),
String.format("Returned value '%s' from stream %s is not in the expected list: %s",
value, streamName, expectedValuesForStream));
expectedValuesForStream.remove(value);
}
}

expectedValues.forEach((streamName, values) -> assertTrue(values.isEmpty(),
"The streamer " + streamName + " should return all expected values. Missing values: " + values));
// Gather all the missing values, so we don't stop the test in the first missed one
expectedValues.forEach((streamName, values) -> {
if (!values.isEmpty()) {
missedValues.add(new MissedRecords(streamName, testTypes.get(streamName), values));
}
});

assertTrue(missedValues.isEmpty(),
missedValues.stream().map((entry) -> // stream each entry, map it to string value
"The stream '" + entry.streamName + "' checking type '" + entry.dataType + "' is missing values: " + entry.missedValues)
.collect(Collectors.joining("\n"))); // and join them
}

protected String getValueFromJsonNode(final JsonNode jsonNode) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class CdcWalLogsPostgresSourceDatatypeTest extends AbstractPostgresSource
private static final String SCHEMA_NAME = "test";
private static final String SLOT_NAME_BASE = "debezium_slot";
private static final String PUBLICATION = "publication";
private static final int INITIAL_WAITING_SECONDS = 30;
private static final int INITIAL_WAITING_SECONDS = 15;
private JsonNode stateAfterFirstSync;

@Override
Expand Down

0 comments on commit e72d685

Please sign in to comment.