Skip to content

Commit

Permalink
24153 Destination Databricks: Fix support for external tables on S3 (a…
Browse files Browse the repository at this point in the history
…irbytehq#24657)

* 24153 Destination Databricks: Update s3 source external

* 24153 Destination Databricks: Fixed databricks.md formatting

* 24153 Destination Databricks: Update the version

* auto-bump connector version

---------

Co-authored-by: Evan Tahler <evan@airbyte.io>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people authored and btkcodedev committed Apr 26, 2023
1 parent 62d417e commit 4ecc6ac
Show file tree
Hide file tree
Showing 27 changed files with 177 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
- name: Databricks Lakehouse
destinationDefinitionId: 072d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-databricks
dockerImageTag: 1.0.0
dockerImageTag: 1.0.1
documentationUrl: https://docs.airbyte.com/integrations/destinations/databricks
icon: databricks.svg
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1752,7 +1752,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-databricks:1.0.0"
- dockerImage: "airbyte/destination-databricks:1.0.1"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/databricks"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1630,7 +1630,7 @@
"destinationDefinitionId": "072d5540-f236-4294-ba7c-ade8fd918496",
"name": "Databricks Lakehouse",
"dockerRepository": "airbyte/destination-databricks",
"dockerImageTag": "1.0.0",
"dockerImageTag": "1.0.1",
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/databricks",
"icon": "databricks.svg",
"spec": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-databricks

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=1.0.1
LABEL io.airbyte.name=airbyte/destination-databricks
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public abstract class DatabricksStreamCopier implements StreamCopier {
private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksStreamCopier.class);

protected final String schemaName;
protected final String catalogName;
protected final String streamName;
protected final DestinationSyncMode destinationSyncMode;
private final boolean purgeStagingData;
Expand All @@ -45,13 +46,15 @@ public abstract class DatabricksStreamCopier implements StreamCopier {
protected final DatabricksDestinationConfig databricksConfig;

public DatabricksStreamCopier(final String stagingFolder,
final String catalog,
final String schema,
final ConfiguredAirbyteStream configuredStream,
final JdbcDatabase database,
final DatabricksDestinationConfig databricksConfig,
final StandardNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
this.schemaName = schema;
this.catalogName = catalog;
this.streamName = configuredStream.getStream().getName();
this.destinationSyncMode = configuredStream.getDestinationSyncMode();
this.purgeStagingData = databricksConfig.isPurgeStagingData();
Expand Down Expand Up @@ -110,15 +113,15 @@ public String createDestinationTable() throws Exception {
: "CREATE TABLE IF NOT EXISTS";

final String createTable = String.format(
"%s %s.%s " +
"%s %s.%s.%s " +
"USING delta " +
"LOCATION '%s' " +
"COMMENT 'Created from stream %s' " +
"TBLPROPERTIES ('airbyte.destinationSyncMode' = '%s', %s) " +
// create the table based on the schema of the tmp table
"AS SELECT * FROM %s.%s LIMIT 0",
createStatement,
schemaName, destTableName,
catalogName, schemaName, destTableName,
getDestTableLocation(),
streamName,
destinationSyncMode.value(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class DatabricksAzureBlobStorageStreamCopier extends DatabricksStreamCopi
protected String currentFile;

public DatabricksAzureBlobStorageStreamCopier(final String stagingFolder,
final String catalog,
final String schema,
final ConfiguredAirbyteStream configuredStream,
final JdbcDatabase database,
Expand All @@ -65,7 +66,7 @@ public DatabricksAzureBlobStorageStreamCopier(final String stagingFolder,
final SqlOperations sqlOperations,
final SpecializedBlobClientBuilder specializedBlobClientBuilder,
final AzureBlobStorageConfig azureConfig) {
super(stagingFolder, schema, configuredStream, database, databricksConfig, nameTransformer, sqlOperations);
super(stagingFolder, catalog, schema, configuredStream, database, databricksConfig, nameTransformer, sqlOperations);

this.specializedBlobClientBuilder = specializedBlobClientBuilder;
this.azureConfig = azureConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ public StreamCopier create(final String configuredSchema,
try {
final AirbyteStream stream = configuredStream.getStream();
final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer);
String catalog = databricksConfig.catalog();

final AzureBlobStorageConfig azureConfig = databricksConfig.storageConfig().getAzureBlobStorageConfigOrThrow();
final SpecializedBlobClientBuilder specializedBlobClientBuilder = new SpecializedBlobClientBuilder()
.endpoint(azureConfig.getEndpointUrl())
.sasToken(azureConfig.getSasToken())
.containerName(azureConfig.getContainerName());
return new DatabricksAzureBlobStorageStreamCopier(stagingFolder, schema, configuredStream, database,
return new DatabricksAzureBlobStorageStreamCopier(stagingFolder,catalog, schema, configuredStream, database,
databricksConfig, nameTransformer, sqlOperations, specializedBlobClientBuilder, azureConfig);
} catch (final Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class DatabricksS3StreamCopier extends DatabricksStreamCopier {
private final S3ParquetWriter parquetWriter;

public DatabricksS3StreamCopier(final String stagingFolder,
final String catalog,
final String schema,
final ConfiguredAirbyteStream configuredStream,
final AmazonS3 s3Client,
Expand All @@ -62,7 +63,7 @@ public DatabricksS3StreamCopier(final String stagingFolder,
final S3WriterFactory writerFactory,
final Timestamp uploadTime)
throws Exception {
super(stagingFolder, schema, configuredStream, database, databricksConfig, nameTransformer, sqlOperations);
super(stagingFolder, catalog, schema, configuredStream, database, databricksConfig, nameTransformer, sqlOperations);
this.s3Client = s3Client;
this.s3Config = databricksConfig.storageConfig().getS3DestinationConfigOrThrow();
final S3DestinationConfig stagingS3Config = getStagingS3DestinationConfig(s3Config, stagingFolder);
Expand Down Expand Up @@ -105,17 +106,19 @@ protected String getDestTableLocation() {

@Override
protected String getCreateTempTableStatement() {
return String.format("CREATE TABLE %s.%s USING parquet LOCATION '%s';", schemaName, tmpTableName, getTmpTableLocation());
return String.format("CREATE TABLE %s.%s.%s USING parquet LOCATION '%s';", catalogName, schemaName, tmpTableName, getTmpTableLocation());
}

@Override
public String generateMergeStatement(final String destTableName) {
final String copyData = String.format(
"COPY INTO %s.%s " +
"COPY INTO %s.%s.%s " +
"FROM '%s' " +
"FILEFORMAT = PARQUET " +
"PATTERN = '%s'",
schemaName, destTableName,
catalogName,
schemaName,
destTableName,
getTmpTableLocation(),
parquetWriter.getOutputFilename());
LOGGER.info(copyData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ public StreamCopier create(final String configuredSchema,
final SqlOperations sqlOperations) {
try {
final AirbyteStream stream = configuredStream.getStream();
final String catalogName = databricksConfig.catalog();
final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer);

S3DestinationConfig s3Config = databricksConfig.storageConfig().getS3DestinationConfigOrThrow();
final AmazonS3 s3Client = s3Config.getS3Client();
final ProductionWriterFactory writerFactory = new ProductionWriterFactory();
final Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());
return new DatabricksS3StreamCopier(stagingFolder, schema, configuredStream, s3Client, database,
return new DatabricksS3StreamCopier(stagingFolder, catalogName, schema, configuredStream, s3Client, database,
databricksConfig, nameTransformer, sqlOperations, writerFactory, uploadTimestamp);
} catch (final Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
throws SQLException {
final String tableName = nameTransformer.getIdentifier(streamName);
final String schemaName = StreamCopierFactory.getSchema(namespace, databricksConfig.schema(), nameTransformer);
final String catalog = databricksConfig.catalog();
final JsonFieldNameUpdater nameUpdater = AvroRecordHelper.getFieldNameUpdater(streamName, namespace, streamSchema);

try (final DSLContext dslContext = DatabricksUtilTest.getDslContext(databricksConfig)) {
final Database database = new Database(dslContext);
return database.query(ctx -> ctx.select(asterisk())
.from(String.format("%s.%s", schemaName, tableName))
.from(String.format("%s.%s.%s", catalog, schemaName, tableName))
.orderBy(field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT).asc())
.fetch().stream()
.map(record -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_DATA_SOURCE_KEY;
import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_SCHEMA_KEY;
import static io.airbyte.integrations.destination.s3.constant.S3Constants.S_3_ACCESS_KEY_ID;
import static io.airbyte.integrations.destination.s3.constant.S3Constants.S_3_BUCKET_PATH;
import static io.airbyte.integrations.destination.s3.constant.S3Constants.S_3_SECRET_ACCESS_KEY;

import com.amazonaws.services.s3.AmazonS3;
Expand All @@ -25,11 +24,9 @@
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Disabled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Disabled
public class DatabricksS3DestinationAcceptanceTest extends DatabricksDestinationAcceptanceTest {

private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksS3DestinationAcceptanceTest.class);
Expand All @@ -56,8 +53,6 @@ protected void setup(final TestDestinationEnv testEnv) {
final String randomString = RandomStringUtils.randomAlphanumeric(5);
final JsonNode configJson = Jsons.clone(baseConfigJson);
((ObjectNode) configJson).put(DATABRICKS_SCHEMA_KEY, "integration_test_" + randomString);
final JsonNode dataSource = configJson.get(DATABRICKS_DATA_SOURCE_KEY);
((ObjectNode) dataSource).put(S_3_BUCKET_PATH, "test_" + randomString);

this.configJson = configJson;
this.databricksConfig = DatabricksDestinationConfig.get(configJson);
Expand Down
2 changes: 1 addition & 1 deletion connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@
| **Cloudflare R2** | <img alt="Cloudflare R2 icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/cloudflare-r2.svg" height="30" height="30"/> | Destination | airbyte/destination-r2:0.1.0 | alpha | [docs](https://docs.airbyte.com/integrations/destinations/r2) | [connectors/destination/r2](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/r2) | [destination-r2](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-r2) | <small>`0fb07be9-7c3b-4336-850d-5efc006152ee`</small> |
| **Convex** | <img alt="Convex icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/convex.svg" height="30" height="30"/> | Destination | airbyte/destination-convex:0.1.0 | alpha | [docs](https://docs.airbyte.io/integrations/destinations/convex) | [connectors/destination/convex](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/convex) | [destination-convex](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-convex) | <small>`3eb4d99c-11fa-4561-a259-fc88e0c2f8f4`</small> |
| **Databend** | <img alt="Databend icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/databend.svg" height="30" height="30"/> | Destination | airbyte/destination-databend:0.1.2 | alpha | [docs](https://docs.airbyte.com/integrations/destinations/databend) | [connectors/destination/databend](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/databend) | [destination-databend](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-databend) | <small>`302e4d8e-08d3-4098-acd4-ac67ca365b88`</small> |
| **Databricks Lakehouse** | <img alt="Databricks Lakehouse icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/databricks.svg" height="30" height="30"/> | Destination | airbyte/destination-databricks:1.0.0 | alpha | [docs](https://docs.airbyte.com/integrations/destinations/databricks) | [connectors/destination/databricks](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/databricks) | [destination-databricks](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-databricks) | <small>`072d5540-f236-4294-ba7c-ade8fd918496`</small> |
| **Databricks Lakehouse** | <img alt="Databricks Lakehouse icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/databricks.svg" height="30" height="30"/> | Destination | airbyte/destination-databricks:1.0.1 | alpha | [docs](https://docs.airbyte.com/integrations/destinations/databricks) | [connectors/destination/databricks](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/databricks) | [destination-databricks](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-databricks) | <small>`072d5540-f236-4294-ba7c-ade8fd918496`</small> |
| **DuckDB** | <img alt="DuckDB icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/duckdb.svg" height="30" height="30"/> | Destination | airbyte/destination-duckdb:0.1.0 | alpha | [docs](https://docs.airbyte.io/integrations/destinations/duckdb) | [connectors/destination/duckdb](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/duckdb) | [destination-duckdb](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-duckdb) | <small>`94bd199c-2ff0-4aa2-b98e-17f0acb72610`</small> |
| **DynamoDB** | <img alt="DynamoDB icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/dynamodb.svg" height="30" height="30"/> | Destination | airbyte/destination-dynamodb:0.1.7 | alpha | [docs](https://docs.airbyte.com/integrations/destinations/dynamodb) | [connectors/destination/dynamodb](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/dynamodb) | [destination-dynamodb](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-dynamodb) | <small>`8ccd8909-4e99-4141-b48d-4984b70b2d89`</small> |
| **E2E Testing** | <img alt="E2E Testing icon" src="https://raw.githubusercontent.com/airbytehq/airbyte /master/airbyte-config-oss/init-oss/src/main/resources/icons/airbyte.svg" height="30" height="30"/> | Destination | airbyte/destination-e2e-test:0.2.4 | unknown | [docs](https://docs.airbyte.com/integrations/destinations/e2e-test) | [connectors/destination/e2e-test](https://github.com/airbytehq/airbyte/issues?q=is:open+is:issue+label:connectors/destination/e2e-test) | [destination-e2e-test](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-e2e-test) | <small>`2eb65e87-983a-4fd7-b3e3-9d9dc6eb8537`</small> |
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 4ecc6ac

Please sign in to comment.