diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 2304ee7d2fdd..cd902125b15e 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.44.4 +version=0.44.9 diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt index 8aaca609840e..358abe22d7c0 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt @@ -21,7 +21,9 @@ private val LOGGER = KotlinLogging.logger {} abstract class BaseS3Destination protected constructor( protected val configFactory: S3DestinationConfigFactory = S3DestinationConfigFactory(), - protected val environment: Map = System.getenv() + protected val environment: Map = System.getenv(), + private val memoryRatio: Double = 0.5, + private val nThreads: Int = 5 ) : BaseConnector(), Destination { private val nameTransformer: NamingConventionTransformer = S3NameTransformer() @@ -74,7 +76,9 @@ protected constructor( outputRecordCollector, S3StorageOperations(nameTransformer, s3Config.getS3Client(), s3Config), s3Config, - catalog + catalog, + memoryRatio, + nThreads ) } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt index 9d32bf7b1b68..2a8ac4f9f820 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt @@ -24,6 +24,7 @@ import io.airbyte.commons.exceptions.ConfigErrorException import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.* import io.github.oshai.kotlinlogging.KotlinLogging +import java.util.concurrent.Executors import java.util.function.Consumer import java.util.function.Function import org.joda.time.DateTime @@ -158,7 +159,9 @@ class S3ConsumerFactory { outputRecordCollector: Consumer, storageOps: S3StorageOperations, s3Config: S3DestinationConfig, - catalog: ConfiguredAirbyteCatalog + catalog: ConfiguredAirbyteCatalog, + memoryRatio: Double, + nThreads: Int ): SerializedAirbyteMessageConsumer { val writeConfigs = createWriteConfigs(storageOps, s3Config, catalog) // Buffer creation function: yields a file buffer that converts @@ -190,7 +193,11 @@ class S3ConsumerFactory { // S3 has no concept of default namespace // In the "namespace from destination case", the namespace // is simply omitted from the path. - BufferManager(defaultNamespace = null) + BufferManager( + defaultNamespace = null, + maxMemory = (Runtime.getRuntime().maxMemory() * memoryRatio).toLong() + ), + workerPool = Executors.newFixedThreadPool(nThreads) ) } diff --git a/airbyte-integrations/connectors/destination-s3/build.gradle b/airbyte-integrations/connectors/destination-s3/build.gradle index dac6e37f252f..b33e1ccd744a 100644 --- a/airbyte-integrations/connectors/destination-s3/build.gradle +++ b/airbyte-integrations/connectors/destination-s3/build.gradle @@ -4,9 +4,9 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.44.0' + cdkVersionRequired = '0.44.9' features = ['db-destinations', 's3-destinations'] - useLocalCdk = true // TODO: Version CDK, bump required version, and set this to false + useLocalCdk = false // TODO: Version CDK, bump required version, and set this to false } airbyteJavaConnector.addCdkDependencies() diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml index 6e327752f5a8..1fc2ba920c5c 100644 --- a/airbyte-integrations/connectors/destination-s3/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 - dockerImageTag: 0.6.6 + dockerImageTag: 0.6.7 dockerRepository: airbyte/destination-s3 githubIssueLabel: destination-s3 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt index 8c871eaa53d5..89173b904b09 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt +++ b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt @@ -10,7 +10,7 @@ import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfigFactory import io.airbyte.cdk.integrations.destination.s3.StorageProvider open class S3Destination : BaseS3Destination { - constructor() + constructor() : super(nThreads = 2, memoryRatio = 0.5) @VisibleForTesting constructor( diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 69be35fc5061..b5899113b082 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -514,14 +514,15 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.6.6 | 2024-08-06 | [43343](https://github.com/airbytehq/airbyte/pull/43343) | Use Kotlin 2.0.0 | -| 0.6.5 | 2024-08-01 | [42405](https://github.com/airbytehq/airbyte/pull/42405) | S3 parallelizes workloads, checkpoints, submits counts, support for generationId in metadata for refreshes. | -| 0.6.4 | 2024-04-16 | [42006](https://github.com/airbytehq/airbyte/pull/42006) | remove unnecessary zookeeper dependency | -| 0.6.3 | 2024-04-15 | [38204](https://github.com/airbytehq/airbyte/pull/38204) | convert all production code to kotlin | -| 0.6.2 | 2024-04-15 | [38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume role auth | -| 0.6.1 | 2024-04-08 | [37546](https://github.com/airbytehq/airbyte/pull/37546) | Adapt to CDK 0.30.8; | -| 0.6.0 | 2024-04-08 | [36869](https://github.com/airbytehq/airbyte/pull/36869) | Adapt to CDK 0.29.8; Kotlin converted code. | -| 0.5.9 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. | +| 0.6.7 | 2024-08-11 | [43713](https://github.com/airbytehq/airbyte/issues/43713) | Decreased memory ratio (0.7 -> 0.5) and thread allocation (5 -> 1) for async S3 uploads. | +| 0.6.6 | 2024-08-06 | [43343](https://github.com/airbytehq/airbyte/pull/43343) | Use Kotlin 2.0.0 | +| 0.6.5 | 2024-08-01 | [42405](https://github.com/airbytehq/airbyte/pull/42405) | S3 parallelizes workloads, checkpoints, submits counts, support for generationId in metadata for refreshes. | +| 0.6.4 | 2024-04-16 | [42006](https://github.com/airbytehq/airbyte/pull/42006) | remove unnecessary zookeeper dependency | +| 0.6.3 | 2024-04-15 | [38204](https://github.com/airbytehq/airbyte/pull/38204) | convert all production code to kotlin | +| 0.6.2 | 2024-04-15 | [38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume role auth | +| 0.6.1 | 2024-04-08 | [37546](https://github.com/airbytehq/airbyte/pull/37546) | Adapt to CDK 0.30.8; | +| 0.6.0 | 2024-04-08 | [36869](https://github.com/airbytehq/airbyte/pull/36869) | Adapt to CDK 0.29.8; Kotlin converted code. | +| 0.5.9 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. | | 0.5.8 | 2024-01-03 | [#33924](https://github.com/airbytehq/airbyte/pull/33924) | Add new ap-southeast-3 AWS region | | 0.5.7 | 2023-12-28 | [#33788](https://github.com/airbytehq/airbyte/pull/33788) | Thread-safe fix for file part names | | 0.5.6 | 2023-12-08 | [#33263](https://github.com/airbytehq/airbyte/pull/33263) | (incorrect filename format, do not use) Adopt java CDK version 0.7.0. |