diff --git a/airbyte-integrations/connectors/destination-redshift/build.gradle b/airbyte-integrations/connectors/destination-redshift/build.gradle index fa4df8ccfd6c..2e1d5815eb52 100644 --- a/airbyte-integrations/connectors/destination-redshift/build.gradle +++ b/airbyte-integrations/connectors/destination-redshift/build.gradle @@ -4,7 +4,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.25.0' + cdkVersionRequired = '0.28.19' features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-redshift/metadata.yaml b/airbyte-integrations/connectors/destination-redshift/metadata.yaml index 6a9573fd17d4..4468cfb64322 100644 --- a/airbyte-integrations/connectors/destination-redshift/metadata.yaml +++ b/airbyte-integrations/connectors/destination-redshift/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc - dockerImageTag: 2.3.2 + dockerImageTag: 2.4.0 dockerRepository: airbyte/destination-redshift documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift githubIssueLabel: destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java index 5f29dffba041..bbaa1f3687da 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java @@ -95,7 +95,8 @@ private boolean isEphemeralKeysAndPurgingStagingData(final JsonNode config, fina public AirbyteConnectionStatus check(final JsonNode config) { final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config)); final EncryptionConfig encryptionConfig = - config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption(); + config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) + : new NoEncryption(); if (isEphemeralKeysAndPurgingStagingData(config, encryptionConfig)) { return new AirbyteConnectionStatus() .withStatus(Status.FAILED) @@ -220,7 +221,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN final Consumer outputRecordCollector) throws Exception { final EncryptionConfig encryptionConfig = - config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption(); + config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) + : new NoEncryption(); final JsonNode s3Options = findS3Options(config); final S3DestinationConfig s3Config = getS3DestinationConfig(s3Options); final int numberOfFileBuffers = getNumberOfFileBuffers(s3Options); diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.java index c2b4da5c97ef..59064ed591ef 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.java @@ -28,6 +28,8 @@ import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implements StagingOperations { @@ -38,6 +40,8 @@ public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implem private final ObjectMapper objectMapper; private final byte[] keyEncryptingKey; + private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftS3StagingSqlOperations.class); + public RedshiftS3StagingSqlOperations(final NamingConventionTransformer nameTransformer, final AmazonS3 s3Client, final S3DestinationConfig s3Config, diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java index 9933f949b368..bd52ba793cb2 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java @@ -20,7 +20,7 @@ import com.google.common.collect.Iterables; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteMessage; +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations; import io.airbyte.cdk.integrations.destination.jdbc.SqlOperationsUtils; import io.airbyte.commons.json.Jsons; diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.java index 8ed4e3a17b78..3e4af2021bb8 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.java @@ -46,7 +46,7 @@ public void execute(final Sql sql) throws Exception { // see https://github.com/airbytehq/airbyte/issues/33900 modifiedStatements.add("SET enable_case_sensitive_identifier to TRUE;\n"); modifiedStatements.addAll(transaction); - jdbcDatabase.executeWithinTransaction(modifiedStatements); + getJdbcDatabase().executeWithinTransaction(modifiedStatements); } catch (final SQLException e) { log.error("Sql {}-{} failed", queryId, transactionId, e); throw e; diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftRawTableAirbyteMetaMigration.kt b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftRawTableAirbyteMetaMigration.kt index 6f23e4055416..b459683174ce 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftRawTableAirbyteMetaMigration.kt +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftRawTableAirbyteMetaMigration.kt @@ -64,7 +64,7 @@ class RedshiftRawTableAirbyteMetaMigration( "Executing RawTableAirbyteMetaMigration for ${stream.id.originalNamespace}.${stream.id.originalName} for real" ) destinationHandler.execute( - getRawTableMetaColumnAddDdl(stream.id.rawNamespace, stream.id.rawName) + getRawTableMetaColumnAddDdl(stream.id.rawNamespace!!, stream.id.rawName!!) ) // Update the state. We didn't modify the table in a relevant way, so don't invalidate the diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java index d5382cd6802f..d009eeba528d 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java @@ -133,9 +133,9 @@ protected List> extractRawDataFields(final LinkedHashMap castedField( - field(quotedName(COLUMN_NAME_DATA, column.getKey().originalName())), + field(quotedName(COLUMN_NAME_DATA, column.getKey().getOriginalName())), column.getValue(), - column.getKey().name(), + column.getKey().getName(), useExpensiveSaferCasting)) .collect(Collectors.toList()); } @@ -170,16 +170,16 @@ Field arrayConcatStmt(final List> arrays) { } Field toCastingErrorCaseStmt(final ColumnId column, final AirbyteType type) { - final Field field = field(quotedName(COLUMN_NAME_DATA, column.originalName())); + final Field field = field(quotedName(COLUMN_NAME_DATA, column.getOriginalName())); // Just checks if data is not null but casted data is null. This also accounts for conditional // casting result of array and struct. // TODO: Timestamp format issues can result in null values when cast, add regex check if destination // supports regex functions. return field(CASE_STATEMENT_SQL_TEMPLATE, - field.isNotNull().and(castedField(field, type, column.name(), true).isNull()), + field.isNotNull().and(castedField(field, type, column.getName(), true).isNull()), function("ARRAY", getSuperType(), function("JSON_PARSE", getSuperType(), val( - "{\"field\": \"" + column.name() + "\", " + "{\"field\": \"" + column.getName() + "\", " + "\"change\": \"" + Change.NULLED.value() + "\", " + "\"reason\": \"" + Reason.DESTINATION_TYPECAST_ERROR + "\"}"))), field("ARRAY()")); @@ -219,12 +219,12 @@ protected Field getRowNumber(final List primaryKeys, final Op // literally identical to postgres's getRowNumber implementation, changes here probably should // be reflected there final List> primaryKeyFields = - primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.name()))).collect(Collectors.toList()) + primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.getName()))).collect(Collectors.toList()) : new ArrayList<>(); final List> orderedFields = new ArrayList<>(); // We can still use Jooq's field to get the quoted name with raw sql templating. // jooq's .desc returns SortField instead of Field and NULLS LAST doesn't work with it - cursor.ifPresent(columnId -> orderedFields.add(field("{0} desc NULLS LAST", field(quotedName(columnId.name()))))); + cursor.ifPresent(columnId -> orderedFields.add(field("{0} desc NULLS LAST", field(quotedName(columnId.getName()))))); orderedFields.add(field("{0} desc", quotedName(COLUMN_NAME_AB_EXTRACTED_AT))); return rowNumber() .over() @@ -235,7 +235,7 @@ protected Field getRowNumber(final List primaryKeys, final Op @Override protected Condition cdcDeletedAtNotNullCondition() { return field(name(COLUMN_NAME_AB_LOADED_AT)).isNotNull() - .and(function("JSON_TYPEOF", SQLDataType.VARCHAR, field(quotedName(COLUMN_NAME_DATA, cdcDeletedAtColumn.name()))) + .and(function("JSON_TYPEOF", SQLDataType.VARCHAR, field(quotedName(COLUMN_NAME_DATA, getCdcDeletedAtColumn().getName()))) .ne("null")); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformer.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformer.java index 0dd59469c22f..bdb123fb41d8 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformer.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformer.java @@ -91,10 +91,10 @@ public Pair transform(final StreamDescriptor final String namespace = (streamDescriptor.getNamespace() != null && !streamDescriptor.getNamespace().isEmpty()) ? streamDescriptor.getNamespace() : defaultNamespace; final StreamConfig streamConfig = parsedCatalog.getStream(namespace, streamDescriptor.getName()); - final Optional cursorField = streamConfig.cursor().map(ColumnId::originalName); + final Optional cursorField = streamConfig.getCursor().map(ColumnId::getOriginalName); // convert List to Set for faster lookup - final Set primaryKeys = streamConfig.primaryKey().stream().map(ColumnId::originalName).collect(Collectors.toSet()); - final DestinationSyncMode syncMode = streamConfig.destinationSyncMode(); + final Set primaryKeys = streamConfig.getPrimaryKey().stream().map(ColumnId::getOriginalName).collect(Collectors.toSet()); + final DestinationSyncMode syncMode = streamConfig.getDestinationSyncMode(); final TransformationInfo transformationInfo = transformNodes(jsonNode, DEFAULT_PREDICATE_VARCHAR_GREATER_THAN_64K); final int originalBytes = transformationInfo.originalBytes; final int transformedBytes = transformationInfo.originalBytes - transformationInfo.removedBytes; diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java index e76bfdbbca82..221230a3d49a 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java @@ -67,9 +67,9 @@ public void testRawTableMetaMigration_append() throws Exception { .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) - .withJsonSchema(SCHEMA)))); + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())))); // First sync without _airbyte_meta final List messages1 = readMessages("dat/sync1_messages_before_meta.jsonl"); @@ -92,9 +92,9 @@ public void testRawTableMetaMigration_incrementalDedupe() throws Exception { .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) .withPrimaryKey(List.of(List.of("id1"), List.of("id2"))) .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) - .withJsonSchema(SCHEMA)))); + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())))); // First sync without _airbyte_meta final List messages1 = readMessages("dat/sync1_messages_before_meta.jsonl"); @@ -145,16 +145,16 @@ public void testRawTableLoadWithSuperVarcharLimitation() throws Exception { .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) - .withJsonSchema(SCHEMA)))); + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) + .withJsonSchema(getSchema())))); final AirbyteMessage message1 = Jsons.deserialize(record1, AirbyteMessage.class); - message1.getRecord().setNamespace(streamNamespace); - message1.getRecord().setStream(streamName); + message1.getRecord().setNamespace(getStreamNamespace()); + message1.getRecord().setStream(getStreamName()); ((ObjectNode) message1.getRecord().getData()).put("name", largeString1); final AirbyteMessage message2 = Jsons.deserialize(record2, AirbyteMessage.class); - message2.getRecord().setNamespace(streamNamespace); - message2.getRecord().setStream(streamName); + message2.getRecord().setNamespace(getStreamNamespace()); + message2.getRecord().setStream(getStreamName()); ((ObjectNode) message2.getRecord().getData()).put("name", largeString2); // message1 should be preserved which is just on limit, message2 should be nulled. diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java index 75515b5130b7..7fdf96b1e421 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java @@ -152,7 +152,7 @@ protected DSLContext getDslContext() { @Override protected DestinationHandler getDestinationHandler() { - return new RedshiftDestinationHandler(databaseName, database, namespace); + return new RedshiftDestinationHandler(databaseName, database, getNamespace()); } @Override @@ -178,9 +178,9 @@ protected Field toJsonValue(final String valueAsString) { @Override @Test public void testCreateTableIncremental() throws Exception { - final Sql sql = generator.createTable(incrementalDedupStream, "", false); - destinationHandler.execute(sql); - List> initialStatuses = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); + final Sql sql = getGenerator().createTable(getIncrementalDedupStream(), "", false); + getDestinationHandler().execute(sql); + List> initialStatuses = getDestinationHandler().gatherInitialState(List.of(getIncrementalDedupStream())); assertEquals(1, initialStatuses.size()); final DestinationInitialStatus initialStatus = initialStatuses.getFirst(); assertTrue(initialStatus.isFinalTablePresent()); diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 2926d563ec10..9fc85b1d70f6 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -236,6 +236,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.4.0 | 2024-03-21 | [\#36589](https://github.com/airbytehq/airbyte/pull/36589) | Adapt to Kotlin cdk 0.28.19 | | 2.3.2 | 2024-03-21 | [\#36374](https://github.com/airbytehq/airbyte/pull/36374) | Supress Jooq DataAccessException error message in logs | | 2.3.1 | 2024-03-18 | [\#36255](https://github.com/airbytehq/airbyte/pull/36255) | Mark as Certified-GA | | 2.3.0 | 2024-03-18 | [\#36203](https://github.com/airbytehq/airbyte/pull/36203) | CDK 0.25.0; Record nulling for VARCHAR > 64K & record > 16MB (super limit) |