Skip to content

Commit

Permalink
Fix destination integration tests (airbytehq#24431)
Browse files Browse the repository at this point in the history
* MS SQL does not support schema change in incremental model
* make schema change test optional
* fix compilation errors in postgres-strict-encrypt destination
* deparallelize integration tests for destination postgres
* deparalellize MS SQL integration tests
* remove broken SSH tunnel test from destination-postgres-strict-encrypt
  • Loading branch information
grishick authored Mar 24, 2023
1 parent eabbea4 commit 5f1eb87
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
normalizationRepository: airbyte/normalization-clickhouse
normalizationTag: 0.4.0
normalizationIntegrationType: clickhouse
supportsDbt: true
supportsDbt: false
- name: Cloudflare R2
destinationDefinitionId: 0fb07be9-7c3b-4336-850d-5efc006152ee
dockerRepository: airbyte/destination-r2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ private String getImageNameWithoutTag() {
return getImageName().contains(":") ? getImageName().split(":")[0] : getImageName();
}

private Optional<StandardDestinationDefinition> getOptionalDestinationDefinitionFromProvider(final String imageNameWithoutTag) {
protected Optional<StandardDestinationDefinition> getOptionalDestinationDefinitionFromProvider(
final String imageNameWithoutTag) {
final LocalDefinitionsProvider provider = new LocalDefinitionsProvider();
return provider.getDestinationDefinitions().stream()
.filter(definition -> imageNameWithoutTag.equalsIgnoreCase(definition.getDockerRepository()))
Expand Down Expand Up @@ -630,7 +631,7 @@ public void testIncrementalSync() throws Exception {
@Test
public void testIncrementalSyncWithNormalizationDropOneColumn()
throws Exception {
if (!normalizationFromDefinition()) {
if (!normalizationFromDefinition() || !supportIncrementalSchemaChanges()) {
return;
}

Expand Down Expand Up @@ -1634,6 +1635,10 @@ protected boolean supportObjectDataTypeTest() {
return false;
}

protected boolean supportIncrementalSchemaChanges() {
return false;
}

/**
* NaN and Infinity test are not supported by default. Please override this method to specify
* NaN/Infinity types support example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected boolean supportIncrementalSchemaChanges() { return true; }

@Override
protected Optional<NamingConventionTransformer> getNameTransformer() {
return Optional.of(NAME_TRANSFORMER);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# currently limit the number of parallel threads until further investigation into the issues \
# where integration tests run into race conditions
numberThreads=1
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# currently limit the number of parallel threads until further investigation into the issues \
# where integration tests run into race conditions
numberThreads=1
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# currently limit the number of parallel threads until further investigation into the issues \
# where integration tests run into race conditions
numberThreads=1
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import static io.airbyte.db.PostgresUtils.getCertificate;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -21,10 +20,10 @@
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.workers.exception.WorkerException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
Expand All @@ -40,7 +39,7 @@ public class PostgresDestinationStrictEncryptAcceptanceTest extends DestinationA

protected static final String PASSWORD = "Passw0rd";
protected static PostgresUtils.Certificate certs;

private static final String NORMALIZATION_VERSION = "dev"; //this is hacky. This test should extend or encapsulate PostgresDestinationAcceptanceTest
@Override
protected String getImageName() {
return "airbyte/destination-postgres-strict-encrypt:dev";
Expand Down Expand Up @@ -154,7 +153,7 @@ protected void tearDown(final TestDestinationEnv testEnv) {
}

@Test
void testStrictSSLUnsecuredNoTunnel() throws WorkerException {
void testStrictSSLUnsecuredNoTunnel() throws Exception {
final JsonNode config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, db.getHost())
.put(JdbcUtils.USERNAME_KEY, db.getUsername())
Expand All @@ -176,7 +175,7 @@ void testStrictSSLUnsecuredNoTunnel() throws WorkerException {
}

@Test
void testStrictSSLSecuredNoTunnel() throws WorkerException {
void testStrictSSLSecuredNoTunnel() throws Exception {
final JsonNode config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, db.getHost())
.put(JdbcUtils.USERNAME_KEY, db.getUsername())
Expand All @@ -196,25 +195,30 @@ void testStrictSSLSecuredNoTunnel() throws WorkerException {
assertEquals(Status.SUCCEEDED, actual.getStatus());
}

@Test
void testStrictSSLUnsecuredWithTunnel() throws WorkerException {
final JsonNode config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, db.getHost())
.put(JdbcUtils.USERNAME_KEY, db.getUsername())
.put(JdbcUtils.PASSWORD_KEY, db.getPassword())
.put(JdbcUtils.SCHEMA_KEY, "public")
.put(JdbcUtils.PORT_KEY, db.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, db.getDatabaseName())
.put(JdbcUtils.SSL_MODE_KEY, ImmutableMap.builder()
.put("mode", "require")
.build())
.put("tunnel_method", ImmutableMap.builder()
.put("tunnel_method", "SSH_KEY_AUTH")
.build())
.build());
final var actual = runCheck(config);
// DefaultCheckConnectionWorker is swallowing the NullPointerException
assertNull(actual);
@Override
protected boolean normalizationFromDefinition() {
return true;
}

@Override
protected boolean dbtFromDefinition() {
return true;
}

@Override
protected String getNormalizationIntegrationType() {
return getOptionalDestinationDefinitionFromProvider("airbyte/destination-postgres")
.filter(standardDestinationDefinition -> Objects.nonNull(standardDestinationDefinition.getNormalizationConfig()))
.map(standardDestinationDefinition -> standardDestinationDefinition.getNormalizationConfig().getNormalizationIntegrationType())
.orElse(null);
}

@Override
protected String getNormalizationImageName() {
return getOptionalDestinationDefinitionFromProvider("airbyte/destination-postgres")
.filter(standardDestinationDefinition -> Objects.nonNull(standardDestinationDefinition.getNormalizationConfig()))
.map(standardDestinationDefinition -> standardDestinationDefinition.getNormalizationConfig().getNormalizationRepository() + ":"
+ NORMALIZATION_VERSION)
.orElse(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# currently limit the number of parallel threads until further investigation into the issues \
# where integration tests run into race conditions
numberThreads=1
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected boolean supportIncrementalSchemaChanges() { return true; }

@Override
protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected boolean supportIncrementalSchemaChanges() { return true; }

@Override
protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
final String streamName,
Expand Down

0 comments on commit 5f1eb87

Please sign in to comment.