Skip to content

Commit

Permalink
Destination Redshift: Adapting to Kotlin CDK (#36589)
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa authored Apr 2, 2024
1 parent 53bb59f commit ca96b04
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.25.0'
cdkVersionRequired = '0.28.19'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 2.3.2
dockerImageTag: 2.4.0
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ private boolean isEphemeralKeysAndPurgingStagingData(final JsonNode config, fina
public AirbyteConnectionStatus check(final JsonNode config) {
final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config));
final EncryptionConfig encryptionConfig =
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY))
: new NoEncryption();
if (isEphemeralKeysAndPurgingStagingData(config, encryptionConfig)) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
Expand Down Expand Up @@ -220,7 +221,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
final EncryptionConfig encryptionConfig =
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY))
: new NoEncryption();
final JsonNode s3Options = findS3Options(config);
final S3DestinationConfig s3Config = getS3DestinationConfig(s3Options);
final int numberOfFileBuffers = getNumberOfFileBuffers(s3Options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implements StagingOperations {

Expand All @@ -38,6 +40,8 @@ public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implem
private final ObjectMapper objectMapper;
private final byte[] keyEncryptingKey;

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftS3StagingSqlOperations.class);

public RedshiftS3StagingSqlOperations(final NamingConventionTransformer nameTransformer,
final AmazonS3 s3Client,
final S3DestinationConfig s3Config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.google.common.collect.Iterables;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperationsUtils;
import io.airbyte.commons.json.Jsons;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void execute(final Sql sql) throws Exception {
// see https://github.com/airbytehq/airbyte/issues/33900
modifiedStatements.add("SET enable_case_sensitive_identifier to TRUE;\n");
modifiedStatements.addAll(transaction);
jdbcDatabase.executeWithinTransaction(modifiedStatements);
getJdbcDatabase().executeWithinTransaction(modifiedStatements);
} catch (final SQLException e) {
log.error("Sql {}-{} failed", queryId, transactionId, e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class RedshiftRawTableAirbyteMetaMigration(
"Executing RawTableAirbyteMetaMigration for ${stream.id.originalNamespace}.${stream.id.originalName} for real"
)
destinationHandler.execute(
getRawTableMetaColumnAddDdl(stream.id.rawNamespace, stream.id.rawName)
getRawTableMetaColumnAddDdl(stream.id.rawNamespace!!, stream.id.rawName!!)
)

// Update the state. We didn't modify the table in a relevant way, so don't invalidate the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ protected List<Field<?>> extractRawDataFields(final LinkedHashMap<ColumnId, Airb
.entrySet()
.stream()
.map(column -> castedField(
field(quotedName(COLUMN_NAME_DATA, column.getKey().originalName())),
field(quotedName(COLUMN_NAME_DATA, column.getKey().getOriginalName())),
column.getValue(),
column.getKey().name(),
column.getKey().getName(),
useExpensiveSaferCasting))
.collect(Collectors.toList());
}
Expand Down Expand Up @@ -170,16 +170,16 @@ Field<?> arrayConcatStmt(final List<Field<?>> arrays) {
}

Field<?> toCastingErrorCaseStmt(final ColumnId column, final AirbyteType type) {
final Field<?> field = field(quotedName(COLUMN_NAME_DATA, column.originalName()));
final Field<?> field = field(quotedName(COLUMN_NAME_DATA, column.getOriginalName()));
// Just checks if data is not null but casted data is null. This also accounts for conditional
// casting result of array and struct.
// TODO: Timestamp format issues can result in null values when cast, add regex check if destination
// supports regex functions.
return field(CASE_STATEMENT_SQL_TEMPLATE,
field.isNotNull().and(castedField(field, type, column.name(), true).isNull()),
field.isNotNull().and(castedField(field, type, column.getName(), true).isNull()),
function("ARRAY", getSuperType(),
function("JSON_PARSE", getSuperType(), val(
"{\"field\": \"" + column.name() + "\", "
"{\"field\": \"" + column.getName() + "\", "
+ "\"change\": \"" + Change.NULLED.value() + "\", "
+ "\"reason\": \"" + Reason.DESTINATION_TYPECAST_ERROR + "\"}"))),
field("ARRAY()"));
Expand Down Expand Up @@ -219,12 +219,12 @@ protected Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Op
// literally identical to postgres's getRowNumber implementation, changes here probably should
// be reflected there
final List<Field<?>> primaryKeyFields =
primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.name()))).collect(Collectors.toList())
primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.getName()))).collect(Collectors.toList())
: new ArrayList<>();
final List<Field<?>> orderedFields = new ArrayList<>();
// We can still use Jooq's field to get the quoted name with raw sql templating.
// jooq's .desc returns SortField<?> instead of Field<?> and NULLS LAST doesn't work with it
cursor.ifPresent(columnId -> orderedFields.add(field("{0} desc NULLS LAST", field(quotedName(columnId.name())))));
cursor.ifPresent(columnId -> orderedFields.add(field("{0} desc NULLS LAST", field(quotedName(columnId.getName())))));
orderedFields.add(field("{0} desc", quotedName(COLUMN_NAME_AB_EXTRACTED_AT)));
return rowNumber()
.over()
Expand All @@ -235,7 +235,7 @@ protected Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Op
@Override
protected Condition cdcDeletedAtNotNullCondition() {
return field(name(COLUMN_NAME_AB_LOADED_AT)).isNotNull()
.and(function("JSON_TYPEOF", SQLDataType.VARCHAR, field(quotedName(COLUMN_NAME_DATA, cdcDeletedAtColumn.name())))
.and(function("JSON_TYPEOF", SQLDataType.VARCHAR, field(quotedName(COLUMN_NAME_DATA, getCdcDeletedAtColumn().getName())))
.ne("null"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ public Pair<JsonNode, AirbyteRecordMessageMeta> transform(final StreamDescriptor
final String namespace =
(streamDescriptor.getNamespace() != null && !streamDescriptor.getNamespace().isEmpty()) ? streamDescriptor.getNamespace() : defaultNamespace;
final StreamConfig streamConfig = parsedCatalog.getStream(namespace, streamDescriptor.getName());
final Optional<String> cursorField = streamConfig.cursor().map(ColumnId::originalName);
final Optional<String> cursorField = streamConfig.getCursor().map(ColumnId::getOriginalName);
// convert List<ColumnId> to Set<ColumnId> for faster lookup
final Set<String> primaryKeys = streamConfig.primaryKey().stream().map(ColumnId::originalName).collect(Collectors.toSet());
final DestinationSyncMode syncMode = streamConfig.destinationSyncMode();
final Set<String> primaryKeys = streamConfig.getPrimaryKey().stream().map(ColumnId::getOriginalName).collect(Collectors.toSet());
final DestinationSyncMode syncMode = streamConfig.getDestinationSyncMode();
final TransformationInfo transformationInfo = transformNodes(jsonNode, DEFAULT_PREDICATE_VARCHAR_GREATER_THAN_64K);
final int originalBytes = transformationInfo.originalBytes;
final int transformedBytes = transformationInfo.originalBytes - transformationInfo.removedBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public void testRawTableMetaMigration_append() throws Exception {
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(new AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA))));
.withNamespace(getStreamNamespace())
.withName(getStreamName())
.withJsonSchema(getSchema()))));

// First sync without _airbyte_meta
final List<AirbyteMessage> messages1 = readMessages("dat/sync1_messages_before_meta.jsonl");
Expand All @@ -92,9 +92,9 @@ public void testRawTableMetaMigration_incrementalDedupe() throws Exception {
.withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
.withPrimaryKey(List.of(List.of("id1"), List.of("id2")))
.withStream(new AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA))));
.withNamespace(getStreamNamespace())
.withName(getStreamName())
.withJsonSchema(getSchema()))));

// First sync without _airbyte_meta
final List<AirbyteMessage> messages1 = readMessages("dat/sync1_messages_before_meta.jsonl");
Expand Down Expand Up @@ -145,16 +145,16 @@ public void testRawTableLoadWithSuperVarcharLimitation() throws Exception {
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withStream(new AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA))));
.withNamespace(getStreamNamespace())
.withName(getStreamName())
.withJsonSchema(getSchema()))));
final AirbyteMessage message1 = Jsons.deserialize(record1, AirbyteMessage.class);
message1.getRecord().setNamespace(streamNamespace);
message1.getRecord().setStream(streamName);
message1.getRecord().setNamespace(getStreamNamespace());
message1.getRecord().setStream(getStreamName());
((ObjectNode) message1.getRecord().getData()).put("name", largeString1);
final AirbyteMessage message2 = Jsons.deserialize(record2, AirbyteMessage.class);
message2.getRecord().setNamespace(streamNamespace);
message2.getRecord().setStream(streamName);
message2.getRecord().setNamespace(getStreamNamespace());
message2.getRecord().setStream(getStreamName());
((ObjectNode) message2.getRecord().getData()).put("name", largeString2);

// message1 should be preserved which is just on limit, message2 should be nulled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ protected DSLContext getDslContext() {

@Override
protected DestinationHandler<RedshiftState> getDestinationHandler() {
return new RedshiftDestinationHandler(databaseName, database, namespace);
return new RedshiftDestinationHandler(databaseName, database, getNamespace());
}

@Override
Expand All @@ -178,9 +178,9 @@ protected Field<?> toJsonValue(final String valueAsString) {
@Override
@Test
public void testCreateTableIncremental() throws Exception {
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
destinationHandler.execute(sql);
List<DestinationInitialStatus<RedshiftState>> initialStatuses = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
final Sql sql = getGenerator().createTable(getIncrementalDedupStream(), "", false);
getDestinationHandler().execute(sql);
List<DestinationInitialStatus<RedshiftState>> initialStatuses = getDestinationHandler().gatherInitialState(List.of(getIncrementalDedupStream()));
assertEquals(1, initialStatuses.size());
final DestinationInitialStatus<RedshiftState> initialStatus = initialStatuses.getFirst();
assertTrue(initialStatus.isFinalTablePresent());
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.4.0 | 2024-03-21 | [\#36589](https://github.com/airbytehq/airbyte/pull/36589) | Adapt to Kotlin cdk 0.28.19 |
| 2.3.2 | 2024-03-21 | [\#36374](https://github.com/airbytehq/airbyte/pull/36374) | Supress Jooq DataAccessException error message in logs |
| 2.3.1 | 2024-03-18 | [\#36255](https://github.com/airbytehq/airbyte/pull/36255) | Mark as Certified-GA |
| 2.3.0 | 2024-03-18 | [\#36203](https://github.com/airbytehq/airbyte/pull/36203) | CDK 0.25.0; Record nulling for VARCHAR > 64K & record > 16MB (super limit) |
Expand Down

0 comments on commit ca96b04

Please sign in to comment.