diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 47539a8cc70f..20a57cbb399c 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.13.1 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Add postCreateTable hook in destination JdbcSqlGenerator | | 0.13.0 | 2024-01-16 | [\#34177](https://github.com/airbytehq/airbyte/pull/34177) | Add `useExpensiveSafeCasting` param in JdbcSqlGenerator methods; add JdbcTypingDedupingTest fixture; other DV2-related changes | | 0.12.1 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Add hook for additional destination specific checks to JDBC destination check method | | 0.12.0 | 2024-01-10 | [\#33875](https://github.com/airbytehq/airbyte/pull/33875) | Upgrade sshd-mina to 2.11.1 | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index f6cee2374148..aa31273d9042 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.13.0 +version=0.13.1 diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java index 7ce3a8a7a01c..1ffd5f0c93ae 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java @@ -71,6 +71,9 @@ protected Optional checkForKnownConfigExceptions(final Exc public void createTableIfNotExists(final JdbcDatabase database, final String schemaName, final String tableName) throws SQLException { try { database.execute(createTableQuery(database, schemaName, tableName)); + for (final String postCreateSql : postCreateTableQueries(schemaName, tableName)) { + database.execute(postCreateSql); + } } catch (final SQLException e) { throw checkForKnownConfigExceptions(e).orElseThrow(() -> e); } @@ -85,6 +88,15 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN } } + /** + * Some subclasses may want to execute additional SQL statements after creating the raw table. For + * example, Postgres does not support index definitions within a CREATE TABLE statement, so we need + * to run CREATE INDEX statements after creating the table. + */ + protected List postCreateTableQueries(final String schemaName, final String tableName) { + return List.of(); + } + protected String createTableQueryV1(final String schemaName, final String tableName) { return String.format( """ diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle index 4f56294fe85e..8a0ce598b05f 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle @@ -4,7 +4,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.13.0' + cdkVersionRequired = '0.13.1' features = [ 'db-sources', // required for tests 'db-destinations' diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/gradle.properties b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/gradle.properties index 4dbe8b8729df..23da4989675e 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/gradle.properties +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/gradle.properties @@ -1 +1,3 @@ -testExecutionConcurrency=-1 +# our testcontainer has issues with too much concurrency. +# 4 threads seems to be the sweet spot. +testExecutionConcurrency=4 diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml index d4f379fdbe5c..573dbcf99fd9 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml @@ -1,16 +1,10 @@ data: - registries: - cloud: - dockerImageTag: 0.4.0 - enabled: false # strict encrypt connectors are deployed to Cloud by their non strict encrypt sibling. - oss: - dockerImageTag: 0.4.0 - enabled: false # strict encrypt connectors are not used on OSS. connectorSubtype: database connectorType: destination definitionId: 25c5221d-dce2-4163-ade9-739ef790f503 - dockerImageTag: 0.5.4 + dockerImageTag: 0.5.5 dockerRepository: airbyte/destination-postgres-strict-encrypt + documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres githubIssueLabel: destination-postgres icon: postgresql.svg license: ELv2 @@ -19,8 +13,14 @@ data: normalizationIntegrationType: postgres normalizationRepository: airbyte/normalization normalizationTag: 0.4.1 + registries: + cloud: + dockerImageTag: 0.4.0 + enabled: false + oss: + dockerImageTag: 0.4.0 + enabled: false releaseStage: alpha - documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres supportsDbt: true tags: - language:java diff --git a/airbyte-integrations/connectors/destination-postgres/build.gradle b/airbyte-integrations/connectors/destination-postgres/build.gradle index e5c75a6632c4..da4dcd05868d 100644 --- a/airbyte-integrations/connectors/destination-postgres/build.gradle +++ b/airbyte-integrations/connectors/destination-postgres/build.gradle @@ -4,7 +4,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.13.0' + cdkVersionRequired = '0.13.1' features = [ 'db-sources', // required for tests 'db-destinations', diff --git a/airbyte-integrations/connectors/destination-postgres/gradle.properties b/airbyte-integrations/connectors/destination-postgres/gradle.properties index 4dbe8b8729df..23da4989675e 100644 --- a/airbyte-integrations/connectors/destination-postgres/gradle.properties +++ b/airbyte-integrations/connectors/destination-postgres/gradle.properties @@ -1 +1,3 @@ -testExecutionConcurrency=-1 +# our testcontainer has issues with too much concurrency. +# 4 threads seems to be the sweet spot. +testExecutionConcurrency=4 diff --git a/airbyte-integrations/connectors/destination-postgres/metadata.yaml b/airbyte-integrations/connectors/destination-postgres/metadata.yaml index 23020e463a0b..b0983ff2e291 100644 --- a/airbyte-integrations/connectors/destination-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/destination-postgres/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 25c5221d-dce2-4163-ade9-739ef790f503 - dockerImageTag: 0.5.4 + dockerImageTag: 0.5.5 dockerRepository: airbyte/destination-postgres documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres githubIssueLabel: destination-postgres diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java index 43236bf65d1d..01e4904b5684 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresSqlOperations.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.postgres; import io.airbyte.cdk.db.jdbc.JdbcDatabase; +import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag; import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import java.io.BufferedReader; @@ -14,6 +15,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.sql.SQLException; +import java.util.Collections; import java.util.List; import org.postgresql.copy.CopyManager; import org.postgresql.core.BaseConnection; @@ -24,6 +26,23 @@ public PostgresSqlOperations() { super(new PostgresDataAdapter()); } + @Override + protected List postCreateTableQueries(final String schemaName, final String tableName) { + if (TypingAndDedupingFlag.isDestinationV2()) { + return List.of( + // the raw_id index _could_ be unique (since raw_id is a UUID) + // but there's no reason to do that (because it's a UUID :P ) + // and it would just slow down inserts. + // also, intentionally don't specify the type of index (btree, hash, etc). Just use the default. + "CREATE INDEX IF NOT EXISTS " + tableName + "_raw_id" + " ON " + schemaName + "." + tableName + "(_airbyte_raw_id)", + "CREATE INDEX IF NOT EXISTS " + tableName + "_extracted_at" + " ON " + schemaName + "." + tableName + "(_airbyte_extracted_at)", + "CREATE INDEX IF NOT EXISTS " + tableName + "_loaded_at" + " ON " + schemaName + "." + tableName + + "(_airbyte_loaded_at, _airbyte_extracted_at)"); + } else { + return Collections.emptyList(); + } + } + @Override protected void insertRecordsInternalV2(final JdbcDatabase database, final List records, diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java index 2d6469192ed1..342ab040fc79 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java @@ -7,6 +7,7 @@ import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT; import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT; import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID; import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA; import static java.util.Collections.emptyList; import static org.jooq.impl.DSL.array; @@ -28,6 +29,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; import io.airbyte.integrations.base.destination.typing_deduping.Array; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; +import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.Struct; import io.airbyte.protocol.models.v0.DestinationSyncMode; @@ -36,10 +38,13 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.jooq.Condition; import org.jooq.DataType; import org.jooq.Field; +import org.jooq.Name; import org.jooq.SQLDialect; import org.jooq.impl.DefaultDataType; import org.jooq.impl.SQLDataType; @@ -79,6 +84,41 @@ protected SQLDialect getDialect() { return SQLDialect.POSTGRES; } + @Override + public Sql createTable(final StreamConfig stream, final String suffix, final boolean force) { + final List statements = new ArrayList<>(); + final Name finalTableName = name(stream.id().finalNamespace(), stream.id().finalName() + suffix); + + statements.add(super.createTable(stream, suffix, force)); + + if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { + // An index for our ROW_NUMBER() PARTITION BY pk ORDER BY cursor, extracted_at function + final List pkNames = stream.primaryKey().stream() + .map(pk -> quotedName(pk.name())) + .toList(); + statements.add(Sql.of(getDslContext().createIndex().on( + finalTableName, + Stream.of( + pkNames.stream(), + // if cursor is present, then a stream containing its name + // but if no cursor, then empty stream + stream.cursor().stream().map(cursor -> quotedName(cursor.name())), + Stream.of(name(COLUMN_NAME_AB_EXTRACTED_AT))).flatMap(Function.identity()).toList()) + .getSQL())); + } + statements.add(Sql.of(getDslContext().createIndex().on( + finalTableName, + name(COLUMN_NAME_AB_EXTRACTED_AT)) + .getSQL())); + + statements.add(Sql.of(getDslContext().createIndex().on( + finalTableName, + name(COLUMN_NAME_AB_RAW_ID)) + .getSQL())); + + return Sql.concat(statements); + } + @Override protected List createIndexSql(final StreamConfig stream, final String suffix) { if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP && !stream.primaryKey().isEmpty()) { diff --git a/airbyte-integrations/connectors/destination-redshift/build.gradle b/airbyte-integrations/connectors/destination-redshift/build.gradle index 33f09966a63d..aa75211ebf3e 100644 --- a/airbyte-integrations/connectors/destination-redshift/build.gradle +++ b/airbyte-integrations/connectors/destination-redshift/build.gradle @@ -4,7 +4,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.12.1' + cdkVersionRequired = '0.13.0' features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-redshift/metadata.yaml b/airbyte-integrations/connectors/destination-redshift/metadata.yaml index 93f27e28ecd8..b32972ffd65e 100644 --- a/airbyte-integrations/connectors/destination-redshift/metadata.yaml +++ b/airbyte-integrations/connectors/destination-redshift/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc - dockerImageTag: 0.7.15 + dockerImageTag: 0.8.0 dockerRepository: airbyte/destination-redshift documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift githubIssueLabel: destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java index e94118e279ee..82af7555922b 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java @@ -149,7 +149,7 @@ protected Map getDefaultConnectionProperties(final JsonNode conf // TODO: Pull common code from RedshiftInsertDestination and RedshiftStagingS3Destination into a // base class. // The following properties can be overriden through jdbcUrlParameters in the config. - Map connectionOptions = new HashMap<>(); + final Map connectionOptions = new HashMap<>(); // Redshift properties // https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-configuration-options.html#jdbc20-connecttimeout-option // connectTimeout is different from Hikari pool's connectionTimout, driver defaults to 10seconds so @@ -177,7 +177,7 @@ protected JdbcSqlGenerator getSqlGenerator() { } @Override - protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) { + protected JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database) { return new RedshiftDestinationHandler(databaseName, database); } @@ -247,7 +247,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN parsedCatalog = catalogParser.parseCatalog(catalog); final JdbcV1V2Migrator migrator = new JdbcV1V2Migrator(getNamingResolver(), database, databaseName); final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator(); - boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); + final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); final int defaultThreadCount = 8; if (disableTypeDedupe) { typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator, diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java index bf208f6bcdbf..4c110d7a20cd 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java @@ -105,17 +105,17 @@ protected SQLDialect getDialect() { */ @Override - protected Field castedField(final Field field, final AirbyteType type, final String alias) { + protected Field castedField(final Field field, final AirbyteType type, final String alias, final boolean useExpensiveSaferCasting) { if (type instanceof final AirbyteProtocolType airbyteProtocolType) { switch (airbyteProtocolType) { case STRING -> { return field(CASE_STATEMENT_SQL_TEMPLATE, jsonTypeOf(field).ne("string").and(field.isNotNull()), jsonSerialize(field), - castedField(field, airbyteProtocolType)).as(quotedName(alias)); + castedField(field, airbyteProtocolType, useExpensiveSaferCasting)).as(quotedName(alias)); } default -> { - return castedField(field, airbyteProtocolType).as(quotedName(alias)); + return castedField(field, airbyteProtocolType, useExpensiveSaferCasting).as(quotedName(alias)); } } @@ -129,7 +129,7 @@ protected Field castedField(final Field field, final AirbyteType type, fin jsonTypeOf(field).eq("array"), cast(field, getArrayType())).as(quotedName(alias)); // No nested Unions supported so this will definitely not result in infinite recursion. - case Union.TYPE -> castedField(field, ((Union) type).chooseType(), alias); + case Union.TYPE -> castedField(field, ((Union) type).chooseType(), alias, useExpensiveSaferCasting); default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + type); }; } @@ -139,7 +139,11 @@ protected List> extractRawDataFields(final LinkedHashMap castedField(field(quotedName(COLUMN_NAME_DATA, column.getKey().originalName())), column.getValue(), column.getKey().name())) + .map(column -> castedField( + field(quotedName(COLUMN_NAME_DATA, column.getKey().originalName())), + column.getValue(), + column.getKey().name(), + useExpensiveSaferCasting)) .collect(Collectors.toList()); } @@ -180,7 +184,7 @@ Field toCastingErrorCaseStmt(final ColumnId column, final AirbyteType type) { // TODO: Timestamp format issues can result in null values when cast, add regex check if destination // supports regex functions. return field(CASE_STATEMENT_SQL_TEMPLATE, - field.isNotNull().and(castedField(field, type, column.name()).isNull()), + field.isNotNull().and(castedField(field, type, column.name(), true).isNull()), function("ARRAY", getSuperType(), val(COLUMN_ERROR_MESSAGE_FORMAT.formatted(column.name()))), field("ARRAY()")); } @@ -198,6 +202,7 @@ protected Field buildAirbyteMetaColumn(final LinkedHashMap intendedColumns = stream.columns().entrySet().stream() .collect(LinkedHashMap::new, (map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue()).getTypeName()), @@ -227,6 +232,8 @@ public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, fina */ @Override protected Field getRowNumber(final List primaryKeys, final Optional cursor) { + // literally identical to postgres's getRowNumber implementation, changes here probably should + // be reflected there final List> primaryKeyFields = primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.name()))).collect(Collectors.toList()) : new ArrayList<>(); diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java index 72a3bf1d1906..c5928ab534d6 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java @@ -5,38 +5,18 @@ package io.airbyte.integrations.destination.redshift.typing_deduping; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.cdk.db.factory.DataSourceFactory; -import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.commons.io.IOs; -import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest; +import io.airbyte.cdk.db.JdbcCompatibleSourceOperations; +import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcTypingDedupingTest; import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; -import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.destination.redshift.RedshiftInsertDestination; import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer; import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGeneratorIntegrationTest.RedshiftSourceOperations; -import java.nio.file.Path; -import java.util.List; import javax.sql.DataSource; import org.jooq.DSLContext; import org.jooq.conf.Settings; import org.jooq.impl.DSL; -/** - * This class is basically the same as - * {@link io.airbyte.integrations.destination.snowflake.typing_deduping.AbstractSnowflakeTypingDedupingTest}. - * But (a) it uses jooq to construct the sql statements, and (b) it doesn't need to upcase anything. - * At some point we might (?) want to do a refactor to combine them. At the very least, this class - * is probably useful for other JDBC destination implementations. - */ -public abstract class AbstractRedshiftTypingDedupingTest extends BaseTypingDedupingTest { - - private JdbcDatabase database; - private DataSource dataSource; - - protected abstract String getConfigPath(); +public abstract class AbstractRedshiftTypingDedupingTest extends JdbcTypingDedupingTest { @Override protected String getImageName() { @@ -44,45 +24,13 @@ protected String getImageName() { } @Override - protected JsonNode generateConfig() { - final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of(getConfigPath()))); - ((ObjectNode) config).put("schema", "typing_deduping_default_schema" + getUniqueSuffix()); - final RedshiftInsertDestination insertDestination = new RedshiftInsertDestination(); - dataSource = insertDestination.getDataSource(config); - database = insertDestination.getDatabase(dataSource, new RedshiftSourceOperations()); - return config; - } - - @Override - protected List dumpRawTableRecords(String streamNamespace, final String streamName) throws Exception { - if (streamNamespace == null) { - streamNamespace = getDefaultSchema(); - } - final String tableName = StreamId.concatenateRawTableName(streamNamespace, streamName); - final String schema = getRawSchema(); - return database.queryJsons(DSL.selectFrom(DSL.name(schema, tableName)).getSQL()); - } - - @Override - protected List dumpFinalTableRecords(String streamNamespace, final String streamName) throws Exception { - if (streamNamespace == null) { - streamNamespace = getDefaultSchema(); - } - return database.queryJsons(DSL.selectFrom(DSL.name(streamNamespace, streamName)).getSQL()); - } - - @Override - protected void teardownStreamAndNamespace(String streamNamespace, final String streamName) throws Exception { - if (streamNamespace == null) { - streamNamespace = getDefaultSchema(); - } - database.execute(DSL.dropTableIfExists(DSL.name(getRawSchema(), StreamId.concatenateRawTableName(streamNamespace, streamName))).getSQL()); - database.execute(DSL.dropSchemaIfExists(DSL.name(streamNamespace)).cascade().getSQL()); + protected DataSource getDataSource(final JsonNode config) { + return new RedshiftInsertDestination().getDataSource(config); } @Override - protected void globalTeardown() throws Exception { - DataSourceFactory.close(dataSource); + protected JdbcCompatibleSourceOperations getSourceOperations() { + return new RedshiftSourceOperations(); } @Override @@ -99,15 +47,4 @@ protected DSLContext getDslContext() { }; } - /** - * Subclasses using a config with a nonstandard raw table schema should override this method. - */ - protected String getRawSchema() { - return JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE; - } - - private String getDefaultSchema() { - return getConfig().get("schema").asText(); - } - } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftS3StagingRawSchemaOverrideDisableTypingDedupingTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftS3StagingRawSchemaOverrideDisableTypingDedupingTest.java index 6bec5adb9adc..972cde0a1a58 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftS3StagingRawSchemaOverrideDisableTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftS3StagingRawSchemaOverrideDisableTypingDedupingTest.java @@ -4,14 +4,18 @@ package io.airbyte.integrations.destination.redshift.typing_deduping; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import java.nio.file.Path; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; public class RedshiftS3StagingRawSchemaOverrideDisableTypingDedupingTest extends AbstractRedshiftTypingDedupingTest { @Override - protected String getConfigPath() { - return "secrets/1s1t_config_staging_raw_schema_override.json"; + protected ObjectNode getBaseConfig() { + return (ObjectNode) Jsons.deserialize(IOs.readFile(Path.of("secrets/1s1t_config_staging_raw_schema_override.json"))); } @Override diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftS3StagingTypingDedupingTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftS3StagingTypingDedupingTest.java index c1c0d0194106..c38182ffa54a 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftS3StagingTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftS3StagingTypingDedupingTest.java @@ -4,11 +4,16 @@ package io.airbyte.integrations.destination.redshift.typing_deduping; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import java.nio.file.Path; + public class RedshiftS3StagingTypingDedupingTest extends AbstractRedshiftTypingDedupingTest { @Override - protected String getConfigPath() { - return "secrets/1s1t_config_staging.json"; + protected ObjectNode getBaseConfig() { + return (ObjectNode) Jsons.deserialize(IOs.readFile(Path.of("secrets/1s1t_config_staging.json"))); } } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java index 9402264ae03b..4ac040794f26 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java @@ -169,6 +169,7 @@ protected SQLDialect getSqlDialect() { return SQLDialect.POSTGRES; } + @Override protected Field toJsonValue(final String valueAsString) { return DSL.function("JSON_PARSE", String.class, DSL.val(escapeStringLiteral(valueAsString))); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsRawSchemaOverrideDisableTypingDedupingTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsRawSchemaOverrideDisableTypingDedupingTest.java index 89a54f8eb1e2..b7c78a4cec8e 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsRawSchemaOverrideDisableTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsRawSchemaOverrideDisableTypingDedupingTest.java @@ -4,14 +4,18 @@ package io.airbyte.integrations.destination.redshift.typing_deduping; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import java.nio.file.Path; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; public class RedshiftStandardInsertsRawSchemaOverrideDisableTypingDedupingTest extends AbstractRedshiftTypingDedupingTest { @Override - protected String getConfigPath() { - return "secrets/1s1t_config_raw_schema_override.json"; + protected ObjectNode getBaseConfig() { + return (ObjectNode) Jsons.deserialize(IOs.readFile(Path.of("secrets/1s1t_config_raw_schema_override.json"))); } @Override diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsTypingDedupingTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsTypingDedupingTest.java index 17ffb70547a6..d99d597e4510 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsTypingDedupingTest.java @@ -4,11 +4,16 @@ package io.airbyte.integrations.destination.redshift.typing_deduping; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import java.nio.file.Path; + public class RedshiftStandardInsertsTypingDedupingTest extends AbstractRedshiftTypingDedupingTest { @Override - protected String getConfigPath() { - return "secrets/1s1t_config.json"; + protected ObjectNode getBaseConfig() { + return (ObjectNode) Jsons.deserialize(IOs.readFile(Path.of("secrets/1s1t_config.json"))); } } diff --git a/docs/integrations/destinations/postgres.md b/docs/integrations/destinations/postgres.md index 454b399d0609..b8fffc91ab93 100644 --- a/docs/integrations/destinations/postgres.md +++ b/docs/integrations/destinations/postgres.md @@ -170,6 +170,7 @@ Now that you have set up the Postgres destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------| +| 0.5.5 | 2024-01-18 | [34236](https://github.com/airbytehq/airbyte/pull/34236) | Upgrade CDK to 0.13.1; Add indexes in raw table for query optimization | | 0.5.4 | 2024-01-11 | [34177](https://github.com/airbytehq/airbyte/pull/34177) | Add code for DV2 beta (no user-visible changes) | | 0.5.3 | 2024-01-10 | [34135](https://github.com/airbytehq/airbyte/pull/34135) | Use published CDK missed in previous release | | 0.5.2 | 2024-01-08 | [33875](https://github.com/airbytehq/airbyte/pull/33875) | Update CDK to get Tunnel heartbeats feature | @@ -191,4 +192,4 @@ Now that you have set up the Postgres destination connector, check out the follo | 0.3.13 | 2021-12-01 | [\#8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key | | 0.3.12 | 2021-11-08 | [\#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count | | 0.3.11 | 2021-09-07 | [\#5743](https://github.com/airbytehq/airbyte/pull/5743) | Add SSH Tunnel support | -| 0.3.10 | 2021-08-11 | [\#5336](https://github.com/airbytehq/airbyte/pull/5336) | Destination Postgres: fix \u0000\(NULL\) value processing | +| 0.3.10 | 2021-08-11 | [\#5336](https://github.com/airbytehq/airbyte/pull/5336) | Destination Postgres: fix \u0000\(NULL\) value processing | \ No newline at end of file diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 18f338d83861..7a551b8ba696 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -229,6 +229,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.8.0 | 2024-01-18 | [34236](https://github.com/airbytehq/airbyte/pull/34236) | Upgrade CDK to 0.13.0 | | 0.7.15 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Update check method with svv_table_info permission check, fix bug where s3 staging files were not being deleted. | | 0.7.14 | 2024-01-08 | [\#34014](https://github.com/airbytehq/airbyte/pull/34014) | Update order of options in spec | | 0.7.13 | 2024-01-05 | [\#33948](https://github.com/airbytehq/airbyte/pull/33948) | Fix NPE when prepare tables fail; Add case sensitive session for super; Bastion heartbeats added | @@ -307,4 +308,4 @@ Each stream will be output into its own raw table in Redshift. Each table will c | 0.3.14 | 2021-10-08 | [\#5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table | | 0.3.13 | 2021-09-02 | [\#5745](https://github.com/airbytehq/airbyte/pull/5745) | Disable STATUPDATE flag when using S3 staging to speed up performance | | 0.3.12 | 2021-07-21 | [\#3555](https://github.com/airbytehq/airbyte/pull/3555) | Enable partial checkpointing for halfway syncs | -| 0.3.11 | 2021-07-20 | [\#4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec | +| 0.3.11 | 2021-07-20 | [\#4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec | \ No newline at end of file