Skip to content

Commit

Permalink
S3 Destination: Descreased thread allocation & memory ratio for Async…
Browse files Browse the repository at this point in the history
…Consumer (#43714)
  • Loading branch information
johnny-schmidt authored Aug 13, 2024
1 parent b775298 commit 4c4a105
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.44.4
version=0.44.9
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ private val LOGGER = KotlinLogging.logger {}
abstract class BaseS3Destination
protected constructor(
protected val configFactory: S3DestinationConfigFactory = S3DestinationConfigFactory(),
protected val environment: Map<String, String> = System.getenv()
protected val environment: Map<String, String> = System.getenv(),
private val memoryRatio: Double = 0.5,
private val nThreads: Int = 5
) : BaseConnector(), Destination {
private val nameTransformer: NamingConventionTransformer = S3NameTransformer()

Expand Down Expand Up @@ -74,7 +76,9 @@ protected constructor(
outputRecordCollector,
S3StorageOperations(nameTransformer, s3Config.getS3Client(), s3Config),
s3Config,
catalog
catalog,
memoryRatio,
nThreads
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.*
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.concurrent.Executors
import java.util.function.Consumer
import java.util.function.Function
import org.joda.time.DateTime
Expand Down Expand Up @@ -158,7 +159,9 @@ class S3ConsumerFactory {
outputRecordCollector: Consumer<AirbyteMessage>,
storageOps: S3StorageOperations,
s3Config: S3DestinationConfig,
catalog: ConfiguredAirbyteCatalog
catalog: ConfiguredAirbyteCatalog,
memoryRatio: Double,
nThreads: Int
): SerializedAirbyteMessageConsumer {
val writeConfigs = createWriteConfigs(storageOps, s3Config, catalog)
// Buffer creation function: yields a file buffer that converts
Expand Down Expand Up @@ -190,7 +193,11 @@ class S3ConsumerFactory {
// S3 has no concept of default namespace
// In the "namespace from destination case", the namespace
// is simply omitted from the path.
BufferManager(defaultNamespace = null)
BufferManager(
defaultNamespace = null,
maxMemory = (Runtime.getRuntime().maxMemory() * memoryRatio).toLong()
),
workerPool = Executors.newFixedThreadPool(nThreads)
)
}

Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/destination-s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.0'
cdkVersionRequired = '0.44.9'
features = ['db-destinations', 's3-destinations']
useLocalCdk = true // TODO: Version CDK, bump required version, and set this to false
useLocalCdk = false // TODO: Version CDK, bump required version, and set this to false
}

airbyteJavaConnector.addCdkDependencies()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 0.6.6
dockerImageTag: 0.6.7
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfigFactory
import io.airbyte.cdk.integrations.destination.s3.StorageProvider

open class S3Destination : BaseS3Destination {
constructor()
constructor() : super(nThreads = 2, memoryRatio = 0.5)

@VisibleForTesting
constructor(
Expand Down
17 changes: 9 additions & 8 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -514,14 +514,15 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.6.6 | 2024-08-06 | [43343](https://github.com/airbytehq/airbyte/pull/43343) | Use Kotlin 2.0.0 |
| 0.6.5 | 2024-08-01 | [42405](https://github.com/airbytehq/airbyte/pull/42405) | S3 parallelizes workloads, checkpoints, submits counts, support for generationId in metadata for refreshes. |
| 0.6.4 | 2024-04-16 | [42006](https://github.com/airbytehq/airbyte/pull/42006) | remove unnecessary zookeeper dependency |
| 0.6.3 | 2024-04-15 | [38204](https://github.com/airbytehq/airbyte/pull/38204) | convert all production code to kotlin |
| 0.6.2 | 2024-04-15 | [38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume role auth |
| 0.6.1 | 2024-04-08 | [37546](https://github.com/airbytehq/airbyte/pull/37546) | Adapt to CDK 0.30.8; |
| 0.6.0 | 2024-04-08 | [36869](https://github.com/airbytehq/airbyte/pull/36869) | Adapt to CDK 0.29.8; Kotlin converted code. |
| 0.5.9 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. |
| 0.6.7 | 2024-08-11 | [43713](https://github.com/airbytehq/airbyte/issues/43713) | Decreased memory ratio (0.7 -> 0.5) and thread allocation (5 -> 1) for async S3 uploads. |
| 0.6.6 | 2024-08-06 | [43343](https://github.com/airbytehq/airbyte/pull/43343) | Use Kotlin 2.0.0 |
| 0.6.5 | 2024-08-01 | [42405](https://github.com/airbytehq/airbyte/pull/42405) | S3 parallelizes workloads, checkpoints, submits counts, support for generationId in metadata for refreshes. |
| 0.6.4 | 2024-04-16 | [42006](https://github.com/airbytehq/airbyte/pull/42006) | remove unnecessary zookeeper dependency |
| 0.6.3 | 2024-04-15 | [38204](https://github.com/airbytehq/airbyte/pull/38204) | convert all production code to kotlin |
| 0.6.2 | 2024-04-15 | [38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume role auth |
| 0.6.1 | 2024-04-08 | [37546](https://github.com/airbytehq/airbyte/pull/37546) | Adapt to CDK 0.30.8; |
| 0.6.0 | 2024-04-08 | [36869](https://github.com/airbytehq/airbyte/pull/36869) | Adapt to CDK 0.29.8; Kotlin converted code. |
| 0.5.9 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. |
| 0.5.8 | 2024-01-03 | [#33924](https://github.com/airbytehq/airbyte/pull/33924) | Add new ap-southeast-3 AWS region |
| 0.5.7 | 2023-12-28 | [#33788](https://github.com/airbytehq/airbyte/pull/33788) | Thread-safe fix for file part names |
| 0.5.6 | 2023-12-08 | [#33263](https://github.com/airbytehq/airbyte/pull/33263) | (incorrect filename format, do not use) Adopt java CDK version 0.7.0. |
Expand Down

0 comments on commit 4c4a105

Please sign in to comment.