diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 66c459fa2ce3..659a873e9251 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.14.0 | 2024-01-23 | [\#34461](https://github.com/airbytehq/airbyte/pull/34461) | Revert non backward compatible signature changes from 0.13.1 | | 0.13.3 | 2024-01-23 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Denote if destinations fully support Destinations V2 | | 0.13.2 | 2024-01-18 | [\#34364](https://github.com/airbytehq/airbyte/pull/34364) | Better logging in mongo db source connector | | 0.13.1 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Add postCreateTable hook in destination JdbcSqlGenerator | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index a4868348559c..3cbaf2aad32b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.13.3 +version=0.14.0 diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.java index b01962a17bc7..0eef0c5343bf 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.java @@ -45,6 +45,7 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database, final String schema = writeConfig.getOutputSchemaName(); final String stream = writeConfig.getStreamName(); final String dstTableName = writeConfig.getOutputTableName(); + final String stageName = stagingOperations.getStageName(schema, dstTableName); final String stagingPath = stagingOperations.getStagingPath(SerialStagingConsumerFactory.RANDOM_CONNECTION_ID, schema, stream, writeConfig.getOutputTableName(), writeConfig.getWriteDatetime()); @@ -54,7 +55,7 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database, stagingOperations.createSchemaIfNotExists(database, schema); stagingOperations.createTableIfNotExists(database, schema, dstTableName); - stagingOperations.createStageIfNotExists(); + stagingOperations.createStageIfNotExists(database, stageName); /* * When we're in OVERWRITE, clear out the table at the start of a sync, this is an expected side @@ -78,6 +79,7 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database, * upload was unsuccessful */ public static void copyIntoTableFromStage(final JdbcDatabase database, + final String stageName, final String stagingPath, final List stagedFiles, final String tableName, @@ -92,7 +94,7 @@ public static void copyIntoTableFromStage(final JdbcDatabase database, final Lock rawTableInsertLock = typerDeduper.getRawTableInsertLock(streamNamespace, streamName); rawTableInsertLock.lock(); try { - stagingOperations.copyIntoTableFromStage(database, stagingPath, stagedFiles, + stagingOperations.copyIntoTableFromStage(database, stageName, stagingPath, stagedFiles, tableName, schemaName); } finally { rawTableInsertLock.unlock(); @@ -131,6 +133,7 @@ public static OnCloseFunction onCloseFunction(final JdbcDatabase database, for (final WriteConfig writeConfig : writeConfigs) { final String schemaName = writeConfig.getOutputSchemaName(); if (purgeStagingData) { + final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getOutputTableName()); final String stagePath = stagingOperations.getStagingPath( RANDOM_CONNECTION_ID, schemaName, @@ -139,7 +142,9 @@ public static OnCloseFunction onCloseFunction(final JdbcDatabase database, writeConfig.getWriteDatetime()); log.info("Cleaning stage in destination started for stream {}. schema {}, stage: {}", writeConfig.getStreamName(), schemaName, stagePath); - stagingOperations.dropStageIfExists(database, stagePath); + // TODO: This is another weird manifestation of Redshift vs Snowflake using either or variables from + // stageName/StagingPath. + stagingOperations.dropStageIfExists(database, stageName, stagePath); } } typerDeduper.commitFinalTables(); diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/SerialFlush.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/SerialFlush.java index a4cb0c5fdaf3..767eea233364 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/SerialFlush.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/SerialFlush.java @@ -81,14 +81,15 @@ public static FlushBufferFunction function( final WriteConfig writeConfig = pairToWriteConfig.get(pair); final String schemaName = writeConfig.getOutputSchemaName(); + final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getOutputTableName()); final String stagingPath = stagingOperations.getStagingPath( SerialStagingConsumerFactory.RANDOM_CONNECTION_ID, schemaName, writeConfig.getStreamName(), writeConfig.getOutputTableName(), writeConfig.getWriteDatetime()); try (writer) { writer.flush(); - final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stagingPath); - GeneralStagingFunctions.copyIntoTableFromStage(database, stagingPath, List.of(stagedFile), writeConfig.getOutputTableName(), + final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stageName, stagingPath); + GeneralStagingFunctions.copyIntoTableFromStage(database, stageName, stagingPath, List.of(stagedFile), writeConfig.getOutputTableName(), schemaName, stagingOperations, writeConfig.getNamespace(), diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/StagingOperations.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/StagingOperations.java index aac9351b4b7d..ae1b132de43f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/StagingOperations.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/StagingOperations.java @@ -15,6 +15,10 @@ * Staging operations focuses on the SQL queries that are needed to success move data into a staging * environment like GCS or S3. In general, the reference of staging is the usage of an object * storage for the purposes of efficiently uploading bulk data to destinations + * + * TODO: This interface is shared between Snowflake and Redshift connectors where the staging + * mechanism is different wire protocol. Make the interface more Generic and have sub interfaces to + * support BlobStorageOperations or Jdbc based staging operations. */ public interface StagingOperations extends SqlOperations { @@ -25,10 +29,19 @@ public interface StagingOperations extends SqlOperations { */ String getStagingPath(UUID connectionId, String namespace, String streamName, String outputTableName, DateTime writeDatetime); + /** + * Returns the staging environment's name + * + * @param namespace Name of schema + * @param streamName Name of the stream + * @return Fully qualified name of the staging environment + */ + String getStageName(String namespace, String streamName); + /** * Create a staging folder where to upload temporary files before loading into the final destination */ - void createStageIfNotExists() throws Exception; + void createStageIfNotExists(JdbcDatabase database, String stageName) throws Exception; /** * Upload the data file into the stage area. @@ -39,7 +52,7 @@ public interface StagingOperations extends SqlOperations { * @param stagingPath path of staging folder to data files * @return the name of the file that was uploaded. */ - String uploadRecordsToStage(JdbcDatabase database, SerializableBuffer recordsData, String schemaName, String stagingPath) + String uploadRecordsToStage(JdbcDatabase database, SerializableBuffer recordsData, String schemaName, String stageName, String stagingPath) throws Exception; /** @@ -52,6 +65,7 @@ String uploadRecordsToStage(JdbcDatabase database, SerializableBuffer recordsDat * @param schemaName name of schema */ void copyIntoTableFromStage(JdbcDatabase database, + String stageName, String stagingPath, List stagedFiles, String tableName, @@ -64,6 +78,6 @@ void copyIntoTableFromStage(JdbcDatabase database, * @param database database used for syncing * @param stageName Name of the staging area used to store files */ - void dropStageIfExists(JdbcDatabase database, String stageName) throws Exception; + void dropStageIfExists(JdbcDatabase database, String stageName, String stagingPath) throws Exception; } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.java b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.java index 564e3d3ade85..3e9f7f05132c 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.java +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.java @@ -103,6 +103,7 @@ public void flush(final StreamDescriptor decs, final Stream