diff --git a/airbyte-integrations/connectors/destination-redshift/metadata.yaml b/airbyte-integrations/connectors/destination-redshift/metadata.yaml index 6a24dc76e32d..3c2709d1f890 100644 --- a/airbyte-integrations/connectors/destination-redshift/metadata.yaml +++ b/airbyte-integrations/connectors/destination-redshift/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc - dockerImageTag: 0.7.8 + dockerImageTag: 0.7.9 dockerRepository: airbyte/destination-redshift documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift githubIssueLabel: destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java index f514c9be9045..fabe3d960ddd 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java @@ -152,7 +152,8 @@ private Field jsonSerialize(final Field field) { } /** - * Redshift ARRAY_CONCAT supports only 2 arrays, recursively build ARRAY_CONCAT for n arrays. + * Redshift ARRAY_CONCAT supports only 2 arrays. Iteratively nest ARRAY_CONCAT to support more than + * 2 * * @param arrays * @return @@ -162,16 +163,14 @@ Field arrayConcatStmt(List> arrays) { return field("ARRAY()"); // Return an empty string if the list is empty } - // Base case: if there's only one element, return it - if (arrays.size() == 1) { - return arrays.get(0); + Field result = arrays.get(0); + String renderedSql = getDslContext().render(result); + for (int i = 1; i < arrays.size(); i++) { + // We lose some nice indentation but thats ok. Queryparts + // are intentionally rendered here to avoid deep stack for function sql rendering. + result = field(getDslContext().renderNamedOrInlinedParams(function("ARRAY_CONCAT", getSuperType(), result, arrays.get(i)))); } - - // Recursive case: construct ARRAY_CONCAT function call - Field lastValue = arrays.get(arrays.size() - 1); - Field recursiveCall = arrayConcatStmt(arrays.subList(0, arrays.size() - 1)); - - return function("ARRAY_CONCAT", getSuperType(), recursiveCall, lastValue); + return result; } Field toCastingErrorCaseStmt(final ColumnId column, final AirbyteType type) { diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorTest.java b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorTest.java new file mode 100644 index 000000000000..732e23d20fd5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorTest.java @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.redshift.typing_deduping; + +import static org.junit.jupiter.api.Assertions.*; + +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; +import io.airbyte.integrations.base.destination.typing_deduping.Array; +import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; +import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.base.destination.typing_deduping.Struct; +import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer; +import io.airbyte.protocol.models.v0.DestinationSyncMode; +import io.airbyte.protocol.models.v0.SyncMode; +import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import org.jooq.DSLContext; +import org.jooq.conf.Settings; +import org.jooq.impl.DSL; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class RedshiftSqlGeneratorTest { + + private static final Random RANDOM = new Random(); + + private static final RedshiftSqlGenerator redshiftSqlGenerator = new RedshiftSqlGenerator(new RedshiftSQLNameTransformer()) { + + // Override only for tests to print formatted SQL. The actual implementation should use unformatted + // to save bytes. + @Override + protected DSLContext getDslContext() { + return DSL.using(getDialect(), new Settings().withRenderFormatted(true)); + } + + }; + + private StreamId streamId; + + private StreamConfig incrementalDedupStream; + + private StreamConfig incrementalAppendStream; + + @BeforeEach + public void setup() { + streamId = new StreamId("test_schema", "users_final", "test_schema", "users_raw", "test_schema", "users_final"); + final ColumnId id1 = redshiftSqlGenerator.buildColumnId("id1"); + final ColumnId id2 = redshiftSqlGenerator.buildColumnId("id2"); + List primaryKey = List.of(id1, id2); + ColumnId cursor = redshiftSqlGenerator.buildColumnId("updated_at"); + + LinkedHashMap columns = new LinkedHashMap<>(); + columns.put(id1, AirbyteProtocolType.INTEGER); + columns.put(id2, AirbyteProtocolType.INTEGER); + columns.put(cursor, AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); + columns.put(redshiftSqlGenerator.buildColumnId("struct"), new Struct(new LinkedHashMap<>())); + columns.put(redshiftSqlGenerator.buildColumnId("array"), new Array(AirbyteProtocolType.UNKNOWN)); + columns.put(redshiftSqlGenerator.buildColumnId("string"), AirbyteProtocolType.STRING); + columns.put(redshiftSqlGenerator.buildColumnId("number"), AirbyteProtocolType.NUMBER); + columns.put(redshiftSqlGenerator.buildColumnId("integer"), AirbyteProtocolType.INTEGER); + columns.put(redshiftSqlGenerator.buildColumnId("boolean"), AirbyteProtocolType.BOOLEAN); + columns.put(redshiftSqlGenerator.buildColumnId("timestamp_with_timezone"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); + columns.put(redshiftSqlGenerator.buildColumnId("timestamp_without_timezone"), AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE); + columns.put(redshiftSqlGenerator.buildColumnId("time_with_timezone"), AirbyteProtocolType.TIME_WITH_TIMEZONE); + columns.put(redshiftSqlGenerator.buildColumnId("time_without_timezone"), AirbyteProtocolType.TIME_WITHOUT_TIMEZONE); + columns.put(redshiftSqlGenerator.buildColumnId("date"), AirbyteProtocolType.DATE); + columns.put(redshiftSqlGenerator.buildColumnId("unknown"), AirbyteProtocolType.UNKNOWN); + columns.put(redshiftSqlGenerator.buildColumnId("_ab_cdc_deleted_at"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); + incrementalDedupStream = new StreamConfig( + streamId, + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND_DEDUP, + primaryKey, + Optional.of(cursor), + columns); + incrementalAppendStream = new StreamConfig( + streamId, + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND, + primaryKey, + Optional.of(cursor), + columns); + } + + @Test + public void testTypingAndDeduping() throws IOException { + String expectedSql = MoreResources.readResource("typing_deduping_with_cdc.sql"); + String generatedSql = + redshiftSqlGenerator.updateTable(incrementalDedupStream, "unittest", Optional.of(Instant.parse("2023-02-15T18:35:24.00Z")), false); + List expectedSqlLines = Arrays.stream(expectedSql.split("\n")).map(String::trim).toList(); + List generatedSqlLines = Arrays.stream(generatedSql.split("\n")).map(String::trim).toList(); + System.out.println(generatedSql); + assertEquals(expectedSqlLines, generatedSqlLines); + } + + @Test + public void test2000ColumnSql() { + final ColumnId id1 = redshiftSqlGenerator.buildColumnId("id1"); + final ColumnId id2 = redshiftSqlGenerator.buildColumnId("id2"); + List primaryKey = List.of(id1, id2); + ColumnId cursor = redshiftSqlGenerator.buildColumnId("updated_at"); + + LinkedHashMap columns = new LinkedHashMap<>(); + columns.put(id1, AirbyteProtocolType.INTEGER); + columns.put(id2, AirbyteProtocolType.INTEGER); + columns.put(cursor, AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); + + for (int i = 0; i < 2000; i++) { + final String columnName = RANDOM + .ints('a', 'z' + 1) + .limit(15) + .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append) + .toString(); + columns.put(redshiftSqlGenerator.buildColumnId(columnName), AirbyteProtocolType.STRING); + } + String generatedSql = redshiftSqlGenerator.updateTable(new StreamConfig( + streamId, + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND_DEDUP, + primaryKey, + Optional.of(cursor), + columns), "unittest", Optional.of(Instant.parse("2023-02-15T18:35:24.00Z")), false); + // This should not throw an exception. + assertFalse(generatedSql.isEmpty()); + } + +} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/resources/typing_deduping_with_cdc.sql b/airbyte-integrations/connectors/destination-redshift/src/test/resources/typing_deduping_with_cdc.sql new file mode 100644 index 000000000000..1bfc7ea15f31 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test/resources/typing_deduping_with_cdc.sql @@ -0,0 +1,215 @@ +BEGIN; +insert into "test_schema"."users_finalunittest" ( + "id1", + "id2", + "updated_at", + "struct", + "array", + "string", + "number", + "integer", + "boolean", + "timestamp_with_timezone", + "timestamp_without_timezone", + "time_with_timezone", + "time_without_timezone", + "date", + "unknown", + "_ab_cdc_deleted_at", + "_airbyte_raw_id", + "_airbyte_extracted_at", + "_airbyte_meta" +) +with + "intermediate_data" as ( + select + cast("_airbyte_data"."id1" as bigint) as "id1", + cast("_airbyte_data"."id2" as bigint) as "id2", + cast("_airbyte_data"."updated_at" as timestamp with time zone) as "updated_at", + CASE WHEN JSON_TYPEOF("_airbyte_data"."struct") = 'object' THEN cast("_airbyte_data"."struct" as super) END as "struct", + CASE WHEN JSON_TYPEOF("_airbyte_data"."array") = 'array' THEN cast("_airbyte_data"."array" as super) END as "array", + CASE WHEN ( + JSON_TYPEOF("_airbyte_data"."string") <> 'string' + and "_airbyte_data"."string" is not null + ) THEN JSON_SERIALIZE("_airbyte_data"."string") ELSE cast("_airbyte_data"."string" as varchar(65535)) END as "string", + cast("_airbyte_data"."number" as decimal(38, 9)) as "number", + cast("_airbyte_data"."integer" as bigint) as "integer", + cast("_airbyte_data"."boolean" as boolean) as "boolean", + cast("_airbyte_data"."timestamp_with_timezone" as timestamp with time zone) as "timestamp_with_timezone", + cast("_airbyte_data"."timestamp_without_timezone" as timestamp) as "timestamp_without_timezone", + cast("_airbyte_data"."time_with_timezone" as time with time zone) as "time_with_timezone", + cast("_airbyte_data"."time_without_timezone" as time) as "time_without_timezone", + cast("_airbyte_data"."date" as date) as "date", + cast("_airbyte_data"."unknown" as super) as "unknown", + cast("_airbyte_data"."_ab_cdc_deleted_at" as timestamp with time zone) as "_ab_cdc_deleted_at", + "_airbyte_raw_id", + "_airbyte_extracted_at", + OBJECT( + 'errors', + ARRAY_CONCAT( + ARRAY_CONCAT( + ARRAY_CONCAT( + ARRAY_CONCAT( + ARRAY_CONCAT( + ARRAY_CONCAT( + ARRAY_CONCAT( + ARRAY_CONCAT( + ARRAY_CONCAT( + ARRAY_CONCAT( + ARRAY_CONCAT( + ARRAY_CONCAT( + ARRAY_CONCAT( + ARRAY_CONCAT( + ARRAY_CONCAT( + CASE WHEN ( + "_airbyte_data"."id1" is not null + and "id1" is null + ) THEN ARRAY('Problem with `id1`') ELSE ARRAY() END , + CASE WHEN ( + "_airbyte_data"."id2" is not null + and "id2" is null + ) THEN ARRAY('Problem with `id2`') ELSE ARRAY() END + ), + CASE WHEN ( + "_airbyte_data"."updated_at" is not null + and "updated_at" is null + ) THEN ARRAY('Problem with `updated_at`') ELSE ARRAY() END + ), + CASE WHEN ( + "_airbyte_data"."struct" is not null + and "struct" is null + ) THEN ARRAY('Problem with `struct`') ELSE ARRAY() END + ), + CASE WHEN ( + "_airbyte_data"."array" is not null + and "array" is null + ) THEN ARRAY('Problem with `array`') ELSE ARRAY() END + ), + CASE WHEN ( + "_airbyte_data"."string" is not null + and "string" is null + ) THEN ARRAY('Problem with `string`') ELSE ARRAY() END + ), + CASE WHEN ( + "_airbyte_data"."number" is not null + and "number" is null + ) THEN ARRAY('Problem with `number`') ELSE ARRAY() END + ), + CASE WHEN ( + "_airbyte_data"."integer" is not null + and "integer" is null + ) THEN ARRAY('Problem with `integer`') ELSE ARRAY() END + ), + CASE WHEN ( + "_airbyte_data"."boolean" is not null + and "boolean" is null + ) THEN ARRAY('Problem with `boolean`') ELSE ARRAY() END + ), + CASE WHEN ( + "_airbyte_data"."timestamp_with_timezone" is not null + and "timestamp_with_timezone" is null + ) THEN ARRAY('Problem with `timestamp_with_timezone`') ELSE ARRAY() END + ), + CASE WHEN ( + "_airbyte_data"."timestamp_without_timezone" is not null + and "timestamp_without_timezone" is null + ) THEN ARRAY('Problem with `timestamp_without_timezone`') ELSE ARRAY() END + ), + CASE WHEN ( + "_airbyte_data"."time_with_timezone" is not null + and "time_with_timezone" is null + ) THEN ARRAY('Problem with `time_with_timezone`') ELSE ARRAY() END + ), + CASE WHEN ( + "_airbyte_data"."time_without_timezone" is not null + and "time_without_timezone" is null + ) THEN ARRAY('Problem with `time_without_timezone`') ELSE ARRAY() END + ), + CASE WHEN ( + "_airbyte_data"."date" is not null + and "date" is null + ) THEN ARRAY('Problem with `date`') ELSE ARRAY() END + ), + CASE WHEN ( + "_airbyte_data"."unknown" is not null + and "unknown" is null + ) THEN ARRAY('Problem with `unknown`') ELSE ARRAY() END + ), + CASE WHEN ( + "_airbyte_data"."_ab_cdc_deleted_at" is not null + and "_ab_cdc_deleted_at" is null + ) THEN ARRAY('Problem with `_ab_cdc_deleted_at`') ELSE ARRAY() END + ) + ) as "_airbyte_meta" + from "test_schema"."users_raw" + where ( + ( + "_airbyte_loaded_at" is null + or ( + "_airbyte_loaded_at" is not null + and JSON_TYPEOF("_airbyte_data"."_ab_cdc_deleted_at") <> 'null' + ) + ) + and "_airbyte_extracted_at" > '2023-02-15T18:35:24Z' + ) + ), + "numbered_rows" as ( + select + *, + row_number() over (partition by + "id1", + "id2" + order by + "updated_at" desc NULLS LAST, + "_airbyte_extracted_at" desc + ) as "row_number" + from "intermediate_data" + ) +select + "id1", + "id2", + "updated_at", + "struct", + "array", + "string", + "number", + "integer", + "boolean", + "timestamp_with_timezone", + "timestamp_without_timezone", + "time_with_timezone", + "time_without_timezone", + "date", + "unknown", + "_ab_cdc_deleted_at", + "_airbyte_raw_id", + "_airbyte_extracted_at", + "_airbyte_meta" +from "numbered_rows" +where row_number = 1; +delete from "test_schema"."users_finalunittest" +where "_airbyte_raw_id" in ( + select "_airbyte_raw_id" + from ( + select + "_airbyte_raw_id", + row_number() over (partition by + "id1", + "id2" + order by + "updated_at" desc NULLS LAST, + "_airbyte_extracted_at" desc + ) as "row_number" + from "test_schema"."users_finalunittest" + ) as "airbyte_ids" + where row_number <> 1 +); +delete from "test_schema"."users_finalunittest" +where "_ab_cdc_deleted_at" is not null; +update "test_schema"."users_raw" +set "_airbyte_loaded_at" = GETDATE() +where ( + "_airbyte_loaded_at" is null + and "_airbyte_extracted_at" > '2023-02-15T18:35:24Z' + ); +COMMIT; \ No newline at end of file diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 39de71c4b311..dc776c6299cb 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -155,7 +155,8 @@ Each stream will be output into its own raw table in Redshift. Each table will c ## Changelog | Version | Date | Pull Request | Subject | -| :------ | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.7.9 | 2024-01-03 | [33877](https://github.com/airbytehq/airbyte/pull/33877) | Fix Jooq StackOverflowError | | 0.7.8 | 2023-12-28 | [#33788](https://github.com/airbytehq/airbyte/pull/33788) | Thread-safe fix for file part names (s3 staging files) | | 0.7.7 | 2024-01-04 | [\#33728](https://github.com/airbytehq/airbyte/pull/33728) | Add option to only type and dedupe at the end of the sync | | 0.7.6 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | Only run T+D on a stream if it had any records during the sync | @@ -192,7 +193,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | 0.3.55 | 2023-01-26 | [\#20631](https://github.com/airbytehq/airbyte/pull/20631) | Added support for destination checkpointing with staging | | 0.3.54 | 2023-01-18 | [\#21087](https://github.com/airbytehq/airbyte/pull/21087) | Wrap Authentication Errors as Config Exceptions | | 0.3.53 | 2023-01-03 | [\#17273](https://github.com/airbytehq/airbyte/pull/17273) | Flatten JSON arrays to fix maximum size check for SUPER field | -| 0.3.52 | 2022-12-30 | [\#20879](https://github.com/airbytehq/airbyte/pull/20879) | Added configurable parameter for number of file buffers (⛔ this version has a bug and will not work; use `0.3.56` instead) | +| 0.3.52 | 2022-12-30 | [\#20879](https://github.com/airbytehq/airbyte/pull/20879) | Added configurable parameter for number of file buffers (⛔ this version has a bug and will not work; use `0.3.56` instead) | | 0.3.51 | 2022-10-26 | [\#18434](https://github.com/airbytehq/airbyte/pull/18434) | Fix empty S3 bucket path handling | | 0.3.50 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage | | 0.3.49 | 2022-09-01 | [\#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) | @@ -227,4 +228,4 @@ Each stream will be output into its own raw table in Redshift. Each table will c | 0.3.14 | 2021-10-08 | [\#5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table | | 0.3.13 | 2021-09-02 | [\#5745](https://github.com/airbytehq/airbyte/pull/5745) | Disable STATUPDATE flag when using S3 staging to speed up performance | | 0.3.12 | 2021-07-21 | [\#3555](https://github.com/airbytehq/airbyte/pull/3555) | Enable partial checkpointing for halfway syncs | -| 0.3.11 | 2021-07-20 | [\#4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec | +| 0.3.11 | 2021-07-20 | [\#4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec | \ No newline at end of file