Skip to content

Commit

Permalink
✨ Destination Redshift: GA Destinations V2 (#34077)
Browse files Browse the repository at this point in the history
Signed-off-by: Gireesh Sreepathi <gisripa@gmail.com>
Co-authored-by: Gireesh Sreepathi <gisripa@gmail.com>
  • Loading branch information
cynthiaxyin and gisripa authored Jan 24, 2024
1 parent 3b2dbe0 commit 13e32a6
Show file tree
Hide file tree
Showing 22 changed files with 117 additions and 91 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.13.3 | 2024-01-23 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Denote if destinations fully support Destinations V2 |
| 0.13.2 | 2024-01-18 | [\#34364](https://github.com/airbytehq/airbyte/pull/34364) | Better logging in mongo db source connector |
| 0.13.1 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Add postCreateTable hook in destination JdbcSqlGenerator |
| 0.13.0 | 2024-01-16 | [\#34177](https://github.com/airbytehq/airbyte/pull/34177) | Add `useExpensiveSafeCasting` param in JdbcSqlGenerator methods; add JdbcTypingDedupingTest fixture; other DV2-related changes |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,11 @@ public void setType(final AirbyteMessage.Type type) {

}

/**
* Denotes if the destination fully supports Destinations V2.
*/
default boolean isV2Destination() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,27 @@ public class DestinationConfig {

private static DestinationConfig config;

// whether the destination fully supports Destinations V2
private boolean isV2Destination;

@VisibleForTesting
protected JsonNode root;

private DestinationConfig() {}

@VisibleForTesting
public static void initialize(final JsonNode root) {
initialize(root, false);
}

public static void initialize(final JsonNode root, final boolean isV2Destination) {
if (config == null) {
if (root == null) {
throw new IllegalArgumentException("Cannot create DestinationConfig from null.");
}
config = new DestinationConfig();
config.root = root;
config.isV2Destination = isV2Destination;
} else {
LOGGER.warn("Singleton was already initialized.");
}
Expand All @@ -44,6 +53,7 @@ public static DestinationConfig getInstance() {
return config;
}

@VisibleForTesting
public static void clearInstance() {
config = null;
}
Expand All @@ -67,7 +77,7 @@ public String getTextValue(final String key) {
}

// boolean value, otherwise false
public Boolean getBooleanValue(final String key) {
public boolean getBooleanValue(final String key) {
final JsonNode node = getNodeValue(key);
if (node == null || !node.isBoolean()) {
LOGGER.debug("Cannot retrieve boolean value for node with key {}", key);
Expand All @@ -76,4 +86,8 @@ public Boolean getBooleanValue(final String key) {
return node.asBoolean();
}

public boolean getIsV2Destination() {
return isV2Destination;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
case CHECK -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
if (integration instanceof Destination) {
DestinationConfig.initialize(config);
DestinationConfig.initialize(config, ((Destination) integration).isV2Destination());
}
try {
validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK");
Expand Down Expand Up @@ -183,7 +183,7 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
// save config to singleton
DestinationConfig.initialize(config);
DestinationConfig.initialize(config, ((Destination) integration).isV2Destination());
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);

try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
public class TypingAndDedupingFlag {

public static boolean isDestinationV2() {
return DestinationConfig.getInstance().getBooleanValue("use_1s1t_format");
return DestinationConfig.getInstance().getIsV2Destination()
|| DestinationConfig.getInstance().getBooleanValue("use_1s1t_format");
}

public static Optional<String> getRawNamespaceOverride(String option) {
String rawOverride = DestinationConfig.getInstance().getTextValue(option);
public static Optional<String> getRawNamespaceOverride(final String option) {
final String rawOverride = DestinationConfig.getInstance().getTextValue(option);
if (rawOverride == null || rawOverride.isEmpty()) {
return Optional.empty();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@

package io.airbyte.cdk.integrations.base.adaptive;

import io.airbyte.cdk.integrations.base.Command;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.DestinationConfig;
import io.airbyte.cdk.integrations.base.IntegrationCliParser;
import io.airbyte.cdk.integrations.base.IntegrationConfig;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,15 +83,6 @@ private Destination getDestination() {
}

public void run(final String[] args) throws Exception {
// getDestination() sometimes depends on the singleton being initialized.
// Parse the CLI args just so we can accomplish that.
IntegrationConfig parsedArgs = new IntegrationCliParser().parse(args);
if (parsedArgs.getCommand() != Command.SPEC) {
DestinationConfig.initialize(IntegrationRunner.parseConfig(parsedArgs.getConfigPath()));
} else {
DestinationConfig.initialize(Jsons.emptyObject());
}

final Destination destination = getDestination();
LOGGER.info("Starting destination: {}", destination.getClass().getName());
new IntegrationRunner(destination).run(args);
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.13.2
version=0.13.3
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,21 @@ public void testInitialization() {
assertThrows(IllegalStateException.class, DestinationConfig::getInstance);

// good initialization
DestinationConfig.initialize(NODE);
DestinationConfig.initialize(NODE, true);
assertNotNull(DestinationConfig.getInstance());
assertEquals(NODE, DestinationConfig.getInstance().root);
assertEquals(true, DestinationConfig.getInstance().getIsV2Destination());

// initializing again doesn't change the config
final JsonNode nodeUnused = Jsons.deserialize("{}");
DestinationConfig.initialize(nodeUnused);
DestinationConfig.initialize(nodeUnused, false);
assertEquals(NODE, DestinationConfig.getInstance().root);
assertEquals(true, DestinationConfig.getInstance().getIsV2Destination());
}

@Test
public void testValues() {
DestinationConfig.clearInstance();
DestinationConfig.initialize(NODE);

assertEquals("bar", DestinationConfig.getInstance().getTextValue("foo"));
Expand All @@ -60,6 +63,8 @@ public void testValues() {
assertEquals(Jsons.deserialize("\"bar\""), DestinationConfig.getInstance().getNodeValue("foo"));
assertEquals(Jsons.deserialize("true"), DestinationConfig.getInstance().getNodeValue("baz"));
assertNull(DestinationConfig.getInstance().getNodeValue("blah"));

assertEquals(false, DestinationConfig.getInstance().getIsV2Destination());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.13.0'
cdkVersionRequired = '0.13.3'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,25 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 0.8.0
dockerImageTag: 2.0.0
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
icon: redshift.svg
license: MIT
name: Redshift
normalizationConfig:
normalizationIntegrationType: redshift
normalizationRepository: airbyte/normalization-redshift
normalizationTag: 0.4.3
registries:
cloud:
enabled: true
oss:
enabled: true
releases:
breakingChanges:
2.0.0:
message: >
This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures. To review the breaking changes, and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading). These changes will likely require updates to downstream dbt / SQL models, which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations).
Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. You can manually sync existing connections prior to the next scheduled sync to start the upgrade early.
upgradeDeadline: "2024-03-15"
releaseStage: beta
resourceRequirements:
jobSpecific:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public ConnectorSpecification spec() throws Exception {
return originalSpec;
}

@Override
public boolean isV2Destination() {
return true;
}

public static void main(final String[] args) throws Exception {
final Destination destination = new RedshiftDestination();
LOGGER.info("starting destination: {}", RedshiftDestination.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations;
import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
Expand Down Expand Up @@ -206,25 +205,6 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
of streams {} this will create more buffers than necessary, leading to nonexistent gains
""", FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER, catalog.getStreams().size());
}
// Short circuit old way of running things during transition.
if (!TypingAndDedupingFlag.isDestinationV2()) {
return new StagingConsumerFactory().createAsync(
outputRecordCollector,
getDatabase(getDataSource(config)),
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig),
getNamingResolver(),
config,
catalog,
isPurgeStagingData(s3Options),
new TypeAndDedupeOperationValve(),
new NoopTyperDeduper(),
// The parsedcatalog is only used in v2 mode, so just pass null for now
null,
// Overwriting null namespace with null is perfectly safe
null,
// still using v1 table format
false);
}

final String defaultNamespace = config.get("schema").asText();
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,34 +249,27 @@
}
]
},
"use_1s1t_format": {
"type": "boolean",
"description": "(Early Access) Use <a href=\"https://docs.airbyte.com/understanding-airbyte/typing-deduping\" target=\"_blank\">Destinations V2</a>.",
"title": "Use Destinations V2 (Early Access)",
"order": 9,
"group": "connection"
},
"raw_data_schema": {
"type": "string",
"description": "(Early Access) The schema to write raw tables into",
"title": "Destinations V2 Raw Table Schema (Early Access)",
"order": 10,
"description": "The schema to write raw tables into",
"title": "Destinations V2 Raw Table Schema",
"order": 9,
"group": "connection"
},
"enable_incremental_final_table_updates": {
"type": "boolean",
"default": false,
"description": "When enabled your data will load into your final tables incrementally while your data is still being synced. When Disabled (the default), your data loads into your final tables once at the end of a sync. Note that this option only applies if you elect to create Final tables",
"title": "Enable Loading Data Incrementally to Final Tables (Early Access)",
"order": 11,
"title": "Enable Loading Data Incrementally to Final Tables",
"order": 10,
"group": "connection"
},
"disable_type_dedupe": {
"type": "boolean",
"default": false,
"description": "Disable Writing Final Tables. WARNING! The data format in _airbyte_data is likely stable but there are no guarantees that other metadata columns will remain the same in future versions",
"title": "Disable Final Tables. (WARNING! Unstable option; Columns in raw table schema might change between versions) (Early Access)",
"order": 12,
"title": "Disable Final Tables. (WARNING! Unstable option; Columns in raw table schema might change between versions)",
"order": 11,
"group": "connection"
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.junit.jupiter.api.Disabled;

/**
* Integration test testing the {@link RedshiftInsertDestination}.
*/
@Disabled
public class RedshiftInsertDestinationAcceptanceTest extends RedshiftDestinationAcceptanceTest {

public JsonNode getStaticConfig() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import java.nio.file.Path;
import org.junit.jupiter.api.Disabled;

/**
* Integration test testing {@link RedshiftStagingS3Destination}. The default Redshift integration
* test credentials contain S3 credentials - this automatically causes COPY to be selected.
*/
@Disabled
public class RedshiftS3StagingInsertDestinationAcceptanceTest extends RedshiftDestinationAcceptanceTest {

public JsonNode getStaticConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
import io.airbyte.commons.json.Jsons;
import java.io.IOException;
import java.nio.file.Path;
import org.junit.jupiter.api.Disabled;

/*
* SshKeyRedshiftInsertDestinationAcceptanceTest runs basic Redshift Destination Tests using the SQL
* Insert mechanism for upload of data and "key" authentication for the SSH bastion configuration.
*/
@Disabled
public class SshKeyRedshiftInsertDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import io.airbyte.commons.json.Jsons;
import java.io.IOException;
import java.nio.file.Path;
import org.junit.jupiter.api.Disabled;

/*
* SshPasswordRedshiftStagingDestinationAcceptanceTest runs basic Redshift Destination Tests using
* the S3 Staging mechanism for upload of data and "password" authentication for the SSH bastion
* configuration.
*/
@Disabled
public class SshPasswordRedshiftStagingDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest {

@Override
Expand Down
8 changes: 4 additions & 4 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,10 @@ tutorials:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.3.31 | 2024-01-22 | [34023](https://github.com/airbytehq/airbyte/pull/34023) | Combine DDL operations into a single execution |
| 2.3.30 | 2024-01-12 | [34226](https://github.com/airbytehq/airbyte/pull/34226) | Upgrade CDK to 0.12.0; Cleanup dependencies |
| 2.3.29 | 2024-01-09 | [34003](https://github.com/airbytehq/airbyte/pull/34003) | Fix loading credentials from GCP Env |
| 2.3.28 | 2024-01-08 | [34021](https://github.com/airbytehq/airbyte/pull/34021) | Add idempotency ids in dummy insert for check call |
| 2.3.31 | 2024-01-22 | [\#34023](https://github.com/airbytehq/airbyte/pull/34023) | Combine DDL operations into a single execution |
| 2.3.30 | 2024-01-12 | [\#34226](https://github.com/airbytehq/airbyte/pull/34226) | Upgrade CDK to 0.12.0; Cleanup dependencies |
| 2.3.29 | 2024-01-09 | [\#34003](https://github.com/airbytehq/airbyte/pull/34003) | Fix loading credentials from GCP Env |
| 2.3.28 | 2024-01-08 | [\#34021](https://github.com/airbytehq/airbyte/pull/34021) | Add idempotency ids in dummy insert for check call |
| 2.3.27 | 2024-01-05 | [\#33948](https://github.com/airbytehq/airbyte/pull/33948) | Skip retrieving initial table state when setup fails |
| 2.3.26 | 2024-01-04 | [\#33730](https://github.com/airbytehq/airbyte/pull/33730) | Internal code structure changes |
| 2.3.25 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | Update to java CDK 0.10.0 (no changes) |
Expand Down
Loading

0 comments on commit 13e32a6

Please sign in to comment.