From 13e32a6e871d89abed67303426a8340b6163d56e Mon Sep 17 00:00:00 2001 From: Cynthia Yin Date: Tue, 23 Jan 2024 16:02:00 -0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Destination=20Redshift:=20GA=20Dest?= =?UTF-8?q?inations=20V2=20(#34077)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Gireesh Sreepathi Co-authored-by: Gireesh Sreepathi --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../cdk/integrations/base/Destination.java | 7 +++ .../integrations/base/DestinationConfig.java | 16 +++++- .../integrations/base/IntegrationRunner.java | 4 +- .../base/TypingAndDedupingFlag.java | 7 +-- .../adaptive/AdaptiveDestinationRunner.java | 14 ----- .../src/main/resources/version.properties | 2 +- .../base/DestinationConfigTest.java | 9 +++- .../destination-redshift/build.gradle | 2 +- .../destination-redshift/metadata.yaml | 13 +++-- .../redshift/RedshiftDestination.java | 5 ++ .../RedshiftStagingS3Destination.java | 20 -------- .../src/main/resources/spec.json | 21 +++----- ...dshiftInsertDestinationAcceptanceTest.java | 2 + ...tagingInsertDestinationAcceptanceTest.java | 2 + ...dshiftInsertDestinationAcceptanceTest.java | 2 + ...shiftStagingDestinationAcceptanceTest.java | 2 + docs/integrations/destinations/bigquery.md | 8 +-- .../destinations/redshift-migrations.md | 14 +++++ docs/integrations/destinations/redshift.md | 51 +++++++++++-------- docs/integrations/destinations/snowflake.md | 4 +- .../upgrading_to_destinations_v2.md | 2 +- 22 files changed, 117 insertions(+), 91 deletions(-) create mode 100644 docs/integrations/destinations/redshift-migrations.md diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 3939f9081a4c..66c459fa2ce3 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.13.3 | 2024-01-23 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Denote if destinations fully support Destinations V2 | | 0.13.2 | 2024-01-18 | [\#34364](https://github.com/airbytehq/airbyte/pull/34364) | Better logging in mongo db source connector | | 0.13.1 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Add postCreateTable hook in destination JdbcSqlGenerator | | 0.13.0 | 2024-01-16 | [\#34177](https://github.com/airbytehq/airbyte/pull/34177) | Add `useExpensiveSafeCasting` param in JdbcSqlGenerator methods; add JdbcTypingDedupingTest fixture; other DV2-related changes | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/Destination.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/Destination.java index 20a0c2464bb7..acd958a0323b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/Destination.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/Destination.java @@ -162,4 +162,11 @@ public void setType(final AirbyteMessage.Type type) { } + /** + * Denotes if the destination fully supports Destinations V2. + */ + default boolean isV2Destination() { + return false; + } + } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/DestinationConfig.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/DestinationConfig.java index 280658683f7b..300a13d6a401 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/DestinationConfig.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/DestinationConfig.java @@ -20,18 +20,27 @@ public class DestinationConfig { private static DestinationConfig config; + // whether the destination fully supports Destinations V2 + private boolean isV2Destination; + @VisibleForTesting protected JsonNode root; private DestinationConfig() {} + @VisibleForTesting public static void initialize(final JsonNode root) { + initialize(root, false); + } + + public static void initialize(final JsonNode root, final boolean isV2Destination) { if (config == null) { if (root == null) { throw new IllegalArgumentException("Cannot create DestinationConfig from null."); } config = new DestinationConfig(); config.root = root; + config.isV2Destination = isV2Destination; } else { LOGGER.warn("Singleton was already initialized."); } @@ -44,6 +53,7 @@ public static DestinationConfig getInstance() { return config; } + @VisibleForTesting public static void clearInstance() { config = null; } @@ -67,7 +77,7 @@ public String getTextValue(final String key) { } // boolean value, otherwise false - public Boolean getBooleanValue(final String key) { + public boolean getBooleanValue(final String key) { final JsonNode node = getNodeValue(key); if (node == null || !node.isBoolean()) { LOGGER.debug("Cannot retrieve boolean value for node with key {}", key); @@ -76,4 +86,8 @@ public Boolean getBooleanValue(final String key) { return node.asBoolean(); } + public boolean getIsV2Destination() { + return isV2Destination; + } + } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java index 5887466c126d..8da1ff8588aa 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java @@ -140,7 +140,7 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { case CHECK -> { final JsonNode config = parseConfig(parsed.getConfigPath()); if (integration instanceof Destination) { - DestinationConfig.initialize(config); + DestinationConfig.initialize(config, ((Destination) integration).isV2Destination()); } try { validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK"); @@ -183,7 +183,7 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { final JsonNode config = parseConfig(parsed.getConfigPath()); validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE"); // save config to singleton - DestinationConfig.initialize(config); + DestinationConfig.initialize(config, ((Destination) integration).isV2Destination()); final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/TypingAndDedupingFlag.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/TypingAndDedupingFlag.java index b59e757b62c4..8820b1d7017f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/TypingAndDedupingFlag.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/TypingAndDedupingFlag.java @@ -9,11 +9,12 @@ public class TypingAndDedupingFlag { public static boolean isDestinationV2() { - return DestinationConfig.getInstance().getBooleanValue("use_1s1t_format"); + return DestinationConfig.getInstance().getIsV2Destination() + || DestinationConfig.getInstance().getBooleanValue("use_1s1t_format"); } - public static Optional getRawNamespaceOverride(String option) { - String rawOverride = DestinationConfig.getInstance().getTextValue(option); + public static Optional getRawNamespaceOverride(final String option) { + final String rawOverride = DestinationConfig.getInstance().getTextValue(option); if (rawOverride == null || rawOverride.isEmpty()) { return Optional.empty(); } else { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/adaptive/AdaptiveDestinationRunner.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/adaptive/AdaptiveDestinationRunner.java index 878eef089be0..81d508b0dd2c 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/adaptive/AdaptiveDestinationRunner.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/adaptive/AdaptiveDestinationRunner.java @@ -4,14 +4,9 @@ package io.airbyte.cdk.integrations.base.adaptive; -import io.airbyte.cdk.integrations.base.Command; import io.airbyte.cdk.integrations.base.Destination; -import io.airbyte.cdk.integrations.base.DestinationConfig; -import io.airbyte.cdk.integrations.base.IntegrationCliParser; -import io.airbyte.cdk.integrations.base.IntegrationConfig; import io.airbyte.cdk.integrations.base.IntegrationRunner; import io.airbyte.commons.features.EnvVariableFeatureFlags; -import io.airbyte.commons.json.Jsons; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,15 +83,6 @@ private Destination getDestination() { } public void run(final String[] args) throws Exception { - // getDestination() sometimes depends on the singleton being initialized. - // Parse the CLI args just so we can accomplish that. - IntegrationConfig parsedArgs = new IntegrationCliParser().parse(args); - if (parsedArgs.getCommand() != Command.SPEC) { - DestinationConfig.initialize(IntegrationRunner.parseConfig(parsedArgs.getConfigPath())); - } else { - DestinationConfig.initialize(Jsons.emptyObject()); - } - final Destination destination = getDestination(); LOGGER.info("Starting destination: {}", destination.getClass().getName()); new IntegrationRunner(destination).run(args); diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index b18dfa7feb69..a4868348559c 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.13.2 +version=0.13.3 diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/DestinationConfigTest.java b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/DestinationConfigTest.java index 2d06503baf20..68044162bb72 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/DestinationConfigTest.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/base/DestinationConfigTest.java @@ -33,18 +33,21 @@ public void testInitialization() { assertThrows(IllegalStateException.class, DestinationConfig::getInstance); // good initialization - DestinationConfig.initialize(NODE); + DestinationConfig.initialize(NODE, true); assertNotNull(DestinationConfig.getInstance()); assertEquals(NODE, DestinationConfig.getInstance().root); + assertEquals(true, DestinationConfig.getInstance().getIsV2Destination()); // initializing again doesn't change the config final JsonNode nodeUnused = Jsons.deserialize("{}"); - DestinationConfig.initialize(nodeUnused); + DestinationConfig.initialize(nodeUnused, false); assertEquals(NODE, DestinationConfig.getInstance().root); + assertEquals(true, DestinationConfig.getInstance().getIsV2Destination()); } @Test public void testValues() { + DestinationConfig.clearInstance(); DestinationConfig.initialize(NODE); assertEquals("bar", DestinationConfig.getInstance().getTextValue("foo")); @@ -60,6 +63,8 @@ public void testValues() { assertEquals(Jsons.deserialize("\"bar\""), DestinationConfig.getInstance().getNodeValue("foo")); assertEquals(Jsons.deserialize("true"), DestinationConfig.getInstance().getNodeValue("baz")); assertNull(DestinationConfig.getInstance().getNodeValue("blah")); + + assertEquals(false, DestinationConfig.getInstance().getIsV2Destination()); } } diff --git a/airbyte-integrations/connectors/destination-redshift/build.gradle b/airbyte-integrations/connectors/destination-redshift/build.gradle index aa75211ebf3e..a64ad383ffc1 100644 --- a/airbyte-integrations/connectors/destination-redshift/build.gradle +++ b/airbyte-integrations/connectors/destination-redshift/build.gradle @@ -4,7 +4,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.13.0' + cdkVersionRequired = '0.13.3' features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-redshift/metadata.yaml b/airbyte-integrations/connectors/destination-redshift/metadata.yaml index b32972ffd65e..0b140882bae2 100644 --- a/airbyte-integrations/connectors/destination-redshift/metadata.yaml +++ b/airbyte-integrations/connectors/destination-redshift/metadata.yaml @@ -5,22 +5,25 @@ data: connectorSubtype: database connectorType: destination definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc - dockerImageTag: 0.8.0 + dockerImageTag: 2.0.0 dockerRepository: airbyte/destination-redshift documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift githubIssueLabel: destination-redshift icon: redshift.svg license: MIT name: Redshift - normalizationConfig: - normalizationIntegrationType: redshift - normalizationRepository: airbyte/normalization-redshift - normalizationTag: 0.4.3 registries: cloud: enabled: true oss: enabled: true + releases: + breakingChanges: + 2.0.0: + message: > + This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures. To review the breaking changes, and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading). These changes will likely require updates to downstream dbt / SQL models, which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations). + Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. You can manually sync existing connections prior to the next scheduled sync to start the upgrade early. + upgradeDeadline: "2024-03-15" releaseStage: beta resourceRequirements: jobSpecific: diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java index d10cabd45730..5b48a24ca115 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java @@ -69,6 +69,11 @@ public ConnectorSpecification spec() throws Exception { return originalSpec; } + @Override + public boolean isV2Destination() { + return true; + } + public static void main(final String[] args) throws Exception { final Destination destination = new RedshiftDestination(); LOGGER.info("starting destination: {}", RedshiftDestination.class); diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java index 82af7555922b..6073186a51f4 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java @@ -42,7 +42,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser; import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper; import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations; -import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper; import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator; import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve; @@ -206,25 +205,6 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN of streams {} this will create more buffers than necessary, leading to nonexistent gains """, FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER, catalog.getStreams().size()); } - // Short circuit old way of running things during transition. - if (!TypingAndDedupingFlag.isDestinationV2()) { - return new StagingConsumerFactory().createAsync( - outputRecordCollector, - getDatabase(getDataSource(config)), - new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig), - getNamingResolver(), - config, - catalog, - isPurgeStagingData(s3Options), - new TypeAndDedupeOperationValve(), - new NoopTyperDeduper(), - // The parsedcatalog is only used in v2 mode, so just pass null for now - null, - // Overwriting null namespace with null is perfectly safe - null, - // still using v1 table format - false); - } final String defaultNamespace = config.get("schema").asText(); for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json index 28a274642bbf..55cb60c52a62 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json @@ -249,34 +249,27 @@ } ] }, - "use_1s1t_format": { - "type": "boolean", - "description": "(Early Access) Use Destinations V2.", - "title": "Use Destinations V2 (Early Access)", - "order": 9, - "group": "connection" - }, "raw_data_schema": { "type": "string", - "description": "(Early Access) The schema to write raw tables into", - "title": "Destinations V2 Raw Table Schema (Early Access)", - "order": 10, + "description": "The schema to write raw tables into", + "title": "Destinations V2 Raw Table Schema", + "order": 9, "group": "connection" }, "enable_incremental_final_table_updates": { "type": "boolean", "default": false, "description": "When enabled your data will load into your final tables incrementally while your data is still being synced. When Disabled (the default), your data loads into your final tables once at the end of a sync. Note that this option only applies if you elect to create Final tables", - "title": "Enable Loading Data Incrementally to Final Tables (Early Access)", - "order": 11, + "title": "Enable Loading Data Incrementally to Final Tables", + "order": 10, "group": "connection" }, "disable_type_dedupe": { "type": "boolean", "default": false, "description": "Disable Writing Final Tables. WARNING! The data format in _airbyte_data is likely stable but there are no guarantees that other metadata columns will remain the same in future versions", - "title": "Disable Final Tables. (WARNING! Unstable option; Columns in raw table schema might change between versions) (Early Access)", - "order": 12, + "title": "Disable Final Tables. (WARNING! Unstable option; Columns in raw table schema might change between versions)", + "order": 11, "group": "connection" } }, diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java index 6ca0a17fce3c..59e9130af79f 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java @@ -9,10 +9,12 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import org.junit.jupiter.api.Disabled; /** * Integration test testing the {@link RedshiftInsertDestination}. */ +@Disabled public class RedshiftInsertDestinationAcceptanceTest extends RedshiftDestinationAcceptanceTest { public JsonNode getStaticConfig() throws IOException { diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java index fc054be51232..0732e6041454 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java @@ -8,11 +8,13 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import java.nio.file.Path; +import org.junit.jupiter.api.Disabled; /** * Integration test testing {@link RedshiftStagingS3Destination}. The default Redshift integration * test credentials contain S3 credentials - this automatically causes COPY to be selected. */ +@Disabled public class RedshiftS3StagingInsertDestinationAcceptanceTest extends RedshiftDestinationAcceptanceTest { public JsonNode getStaticConfig() { diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java index 1d80069433da..4c2da0a04ce7 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java @@ -10,11 +10,13 @@ import io.airbyte.commons.json.Jsons; import java.io.IOException; import java.nio.file.Path; +import org.junit.jupiter.api.Disabled; /* * SshKeyRedshiftInsertDestinationAcceptanceTest runs basic Redshift Destination Tests using the SQL * Insert mechanism for upload of data and "key" authentication for the SSH bastion configuration. */ +@Disabled public class SshKeyRedshiftInsertDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest { @Override diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java index 6f423e5e43d1..72c992fdb183 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java @@ -10,12 +10,14 @@ import io.airbyte.commons.json.Jsons; import java.io.IOException; import java.nio.file.Path; +import org.junit.jupiter.api.Disabled; /* * SshPasswordRedshiftStagingDestinationAcceptanceTest runs basic Redshift Destination Tests using * the S3 Staging mechanism for upload of data and "password" authentication for the SSH bastion * configuration. */ +@Disabled public class SshPasswordRedshiftStagingDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest { @Override diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index b30ba594a0cb..2c5990ede42f 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -210,10 +210,10 @@ tutorials: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 2.3.31 | 2024-01-22 | [34023](https://github.com/airbytehq/airbyte/pull/34023) | Combine DDL operations into a single execution | -| 2.3.30 | 2024-01-12 | [34226](https://github.com/airbytehq/airbyte/pull/34226) | Upgrade CDK to 0.12.0; Cleanup dependencies | -| 2.3.29 | 2024-01-09 | [34003](https://github.com/airbytehq/airbyte/pull/34003) | Fix loading credentials from GCP Env | -| 2.3.28 | 2024-01-08 | [34021](https://github.com/airbytehq/airbyte/pull/34021) | Add idempotency ids in dummy insert for check call | +| 2.3.31 | 2024-01-22 | [\#34023](https://github.com/airbytehq/airbyte/pull/34023) | Combine DDL operations into a single execution | +| 2.3.30 | 2024-01-12 | [\#34226](https://github.com/airbytehq/airbyte/pull/34226) | Upgrade CDK to 0.12.0; Cleanup dependencies | +| 2.3.29 | 2024-01-09 | [\#34003](https://github.com/airbytehq/airbyte/pull/34003) | Fix loading credentials from GCP Env | +| 2.3.28 | 2024-01-08 | [\#34021](https://github.com/airbytehq/airbyte/pull/34021) | Add idempotency ids in dummy insert for check call | | 2.3.27 | 2024-01-05 | [\#33948](https://github.com/airbytehq/airbyte/pull/33948) | Skip retrieving initial table state when setup fails | | 2.3.26 | 2024-01-04 | [\#33730](https://github.com/airbytehq/airbyte/pull/33730) | Internal code structure changes | | 2.3.25 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | Update to java CDK 0.10.0 (no changes) | diff --git a/docs/integrations/destinations/redshift-migrations.md b/docs/integrations/destinations/redshift-migrations.md new file mode 100644 index 000000000000..324148a26e74 --- /dev/null +++ b/docs/integrations/destinations/redshift-migrations.md @@ -0,0 +1,14 @@ +# Redshift Migration Guide + +## Upgrading to 2.0.0 + +This version introduces [Destinations V2](/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures. To review the breaking changes, and how to upgrade, see [here](/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading). These changes will likely require updates to downstream dbt / SQL models, which we walk through [here](/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations). Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. You can manually sync existing connections prior to the next scheduled sync to start the upgrade early. + +Worthy of specific mention, this version includes: + +- Per-record error handling +- Clearer table structure +- Removal of sub-tables for nested properties +- Removal of SCD tables + +Learn more about what's new in Destinations V2 [here](/understanding-airbyte/typing-deduping). diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 7a551b8ba696..2da5b0ce27f4 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -9,7 +9,7 @@ The Airbyte Redshift destination allows you to sync data to Redshift. This Redshift destination connector has two replication strategies: 1. INSERT: Replicates data via SQL INSERT queries. This is built on top of the destination-jdbc code - base and is configured to rely on JDBC 4.2 standard drivers provided by Amazon via Mulesoft + base and is configured to rely on JDBC 4.2 standard drivers provided by Amazon via Maven Central [here](https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42) as described in Redshift documentation [here](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-install.html). **Not recommended @@ -28,8 +28,8 @@ For INSERT strategy: 2. COPY: Replicates data by first uploading data to an S3 bucket and issuing a COPY command. This is the recommended loading approach described by Redshift - [best practices](https://docs.aws.amazon.com/redshift/latest/dg/c_loading-data-best-practices.html). - Requires an S3 bucket and credentials. + [best practices](https://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-single-copy-command.html). + Requires an S3 bucket and credentials. Data is copied into S3 as multiple files with a manifest file. Airbyte automatically picks an approach depending on the given configuration - if S3 configuration is present, Airbyte will use the COPY strategy and vice versa. @@ -76,8 +76,9 @@ Optional parameters: (`ab_id`, `data`, `emitted_at`). Normally these files are deleted after the `COPY` command completes; if you want to keep them for other purposes, set `purge_staging_data` to `false`. -NOTE: S3 staging does not use the SSH Tunnel option, if configured. SSH Tunnel supports the SQL -connection only. S3 is secured through public HTTPS access only. +NOTE: S3 staging does not use the SSH Tunnel option for copying data, if configured. SSH Tunnel supports the SQL +connection only. S3 is secured through public HTTPS access only. Subsequent typing and deduping queries on final table +are executed over using provided SSH Tunnel configuration. ## Step 1: Set up Redshift @@ -204,32 +205,40 @@ All Redshift connections are encrypted using SSL. Each stream will be output into its own raw table in Redshift. Each table will contain 3 columns: -- `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. The column type in +- `_airbyte_raw_id`: a uuid assigned by Airbyte to each event that is processed. The column type in Redshift is `VARCHAR`. -- `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. +- `_airbyte_extracted_at`: a timestamp representing when the event was pulled from the data source. The column type in Redshift is `TIMESTAMP WITH TIME ZONE`. +- `_airbyte_loaded_at`: a timestamp representing when the row was processed into final table. + The column type in Redshift is `TIMESTAMP WITH TIME ZONE`. - `_airbyte_data`: a json blob representing with the event data. The column type in Redshift is `SUPER`. -## Data type mapping - -| Redshift Type | Airbyte Type | Notes | -| :-------------------- | :------------------------ | :---- | -| `boolean` | `boolean` | | -| `int` | `integer` | | -| `float` | `number` | | -| `varchar` | `string` | | -| `date/varchar` | `date` | | -| `time/varchar` | `time` | | -| `timestamptz/varchar` | `timestamp_with_timezone` | | -| `varchar` | `array` | | -| `varchar` | `object` | | +## Data type map + +| Airbyte type | Redshift type | +|:------------------------------------|:---------------------------------------| +| STRING | VARCHAR | +| STRING (BASE64) | VARCHAR | +| STRING (BIG_NUMBER) | VARCHAR | +| STRING (BIG_INTEGER) | VARCHAR | +| NUMBER | DECIMAL / NUMERIC | +| INTEGER | BIGINT / INT8 | +| BOOLEAN | BOOLEAN / BOOL | +| STRING (TIMESTAMP_WITH_TIMEZONE) | TIMESTAMPTZ / TIMESTAMP WITH TIME ZONE | +| STRING (TIMESTAMP_WITHOUT_TIMEZONE) | TIMESTAMP | +| STRING (TIME_WITH_TIMEZONE) | TIMETZ / TIME WITH TIME ZONE | +| STRING (TIME_WITHOUT_TIMEZONE) | TIME | +| DATE | DATE | +| OBJECT | SUPER | +| ARRAY | SUPER | ## Changelog | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.8.0 | 2024-01-18 | [34236](https://github.com/airbytehq/airbyte/pull/34236) | Upgrade CDK to 0.13.0 | +| 2.0.0 | 2024-01-23 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Destinations V2 | +| 0.8.0 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Upgrade CDK to 0.13.0 | | 0.7.15 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Update check method with svv_table_info permission check, fix bug where s3 staging files were not being deleted. | | 0.7.14 | 2024-01-08 | [\#34014](https://github.com/airbytehq/airbyte/pull/34014) | Update order of options in spec | | 0.7.13 | 2024-01-05 | [\#33948](https://github.com/airbytehq/airbyte/pull/33948) | Fix NPE when prepare tables fail; Add case sensitive session for super; Bastion heartbeats added | diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index aad1612b10c8..1286f1f133fa 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -246,8 +246,8 @@ Otherwise, make sure to grant the role the required permissions in the desired n | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.4.22 | 2024-01-12 | [34227](https://github.com/airbytehq/airbyte/pull/34227) | Upgrade CDK to 0.12.0; Cleanup unused dependencies | -| 3.4.21 | 2024-01-10 | [\#34083](https://github.com/airbytehq/airbte/pull/34083) | Emit destination stats as part of the state message | +| 3.4.22 | 2024-01-12 | [\#34227](https://github.com/airbytehq/airbyte/pull/34227) | Upgrade CDK to 0.12.0; Cleanup unused dependencies | +| 3.4.21 | 2024-01-10 | [\#34083](https://github.com/airbytehq/airbyte/pull/34083) | Emit destination stats as part of the state message | | 3.4.20 | 2024-01-05 | [\#33948](https://github.com/airbytehq/airbyte/pull/33948) | Skip retrieving initial table state when setup fails | | 3.4.19 | 2024-01-04 | [\#33730](https://github.com/airbytehq/airbyte/pull/33730) | Internal code structure changes | | 3.4.18 | 2024-01-02 | [\#33728](https://github.com/airbytehq/airbyte/pull/33728) | Add option to only type and dedupe at the end of the sync | diff --git a/docs/release_notes/upgrading_to_destinations_v2.md b/docs/release_notes/upgrading_to_destinations_v2.md index eee8ad098b47..f9764cac1afa 100644 --- a/docs/release_notes/upgrading_to_destinations_v2.md +++ b/docs/release_notes/upgrading_to_destinations_v2.md @@ -158,7 +158,7 @@ For each destination connector, Destinations V2 is effective as of the following | --------------------- | --------------------- | -------------------------- | ------------------------ | | BigQuery | 1.10.2 | 2.0.6+ | November 7, 2023 | | Snowflake | 2.1.7 | 3.1.0+ | November 7, 2023 | -| Redshift | 0.6.11 | [coming soon] 2.0.0+ | [coming soon] early 2024 | +| Redshift | 0.8.0 | 2.0.0+ | March 15, 2024 | | Postgres | 0.4.0 | [coming soon] 2.0.0+ | [coming soon] early 2024 | | MySQL | 0.2.0 | [coming soon] 2.0.0+ | [coming soon] early 2024 |