diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageUploadConfiguration.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageUploadConfiguration.kt index d868ca1c9e9c..bce3284eb365 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageUploadConfiguration.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageUploadConfiguration.kt @@ -4,7 +4,15 @@ package io.airbyte.cdk.load.command.object_storage -data class ObjectStorageUploadConfiguration(val streamingUploadPartSize: Long) +data class ObjectStorageUploadConfiguration( + val streamingUploadPartSize: Long = DEFAULT_STREAMING_UPLOAD_PART_SIZE, + val maxNumConcurrentUploads: Int = DEFAULT_MAX_NUM_CONCURRENT_UPLOADS +) { + companion object { + const val DEFAULT_STREAMING_UPLOAD_PART_SIZE = 5L * 1024L * 1024L + const val DEFAULT_MAX_NUM_CONCURRENT_UPLOADS = 5 + } +} interface ObjectStorageUploadConfigurationProvider { val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt index c5826554ed6f..638a97182346 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt @@ -24,6 +24,7 @@ import io.airbyte.cdk.load.file.NoopProcessor import io.airbyte.cdk.load.file.StreamProcessor import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient import io.airbyte.cdk.load.file.object_storage.RemoteObject +import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton @@ -31,6 +32,8 @@ import java.io.ByteArrayOutputStream import java.io.InputStream import java.io.OutputStream import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withPermit data class S3Object(override val key: String, override val storageConfig: S3BucketConfiguration) : RemoteObject { @@ -44,6 +47,8 @@ class S3Client( val bucketConfig: S3BucketConfiguration, private val uploadConfig: ObjectStorageUploadConfiguration?, ) : ObjectStorageClient { + private val log = KotlinLogging.logger {} + private val uploadPermits = uploadConfig?.maxNumConcurrentUploads?.let { Semaphore(it) } override suspend fun list(prefix: String) = flow { var request = ListObjectsRequest { @@ -129,6 +134,24 @@ class S3Client( metadata: Map, streamProcessor: StreamProcessor?, block: suspend (OutputStream) -> Unit + ): S3Object { + if (uploadPermits != null) { + uploadPermits.withPermit { + log.info { + "Attempting to acquire upload permit for $key (${uploadPermits.availablePermits} available)" + } + return streamingUploadInner(key, metadata, streamProcessor, block) + } + } else { + return streamingUploadInner(key, metadata, streamProcessor, block) + } + } + + private suspend fun streamingUploadInner( + key: String, + metadata: Map, + streamProcessor: StreamProcessor?, + block: suspend (OutputStream) -> Unit ): S3Object { val request = CreateMultipartUploadRequest { this.bucket = bucketConfig.s3BucketName diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index e379c6530b07..482d5314665f 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.2.1 + dockerImageTag: 0.2.2 dockerRepository: airbyte/destination-s3-v2 githubIssueLabel: destination-s3-v2 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt index 8c4be0f6c6d5..5e3aed6af2ec 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt @@ -31,8 +31,7 @@ data class S3V2Configuration( override val objectStorageCompressionConfiguration: ObjectStorageCompressionConfiguration, // Internal configuration - override val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration = - ObjectStorageUploadConfiguration(5L * 1024 * 1024), + override val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration, override val recordBatchSizeBytes: Long = 200L * 1024 * 1024, ) : DestinationConfiguration(), @@ -52,7 +51,14 @@ class S3V2ConfigurationFactory : s3BucketConfiguration = pojo.toS3BucketConfiguration(), objectStoragePathConfiguration = pojo.toObjectStoragePathConfiguration(), objectStorageFormatConfiguration = pojo.toObjectStorageFormatConfiguration(), - objectStorageCompressionConfiguration = pojo.toCompressionConfiguration() + objectStorageCompressionConfiguration = pojo.toCompressionConfiguration(), + objectStorageUploadConfiguration = + ObjectStorageUploadConfiguration( + pojo.uploadPartSize + ?: ObjectStorageUploadConfiguration.DEFAULT_STREAMING_UPLOAD_PART_SIZE, + pojo.maxConcurrentUploads + ?: ObjectStorageUploadConfiguration.DEFAULT_MAX_NUM_CONCURRENT_UPLOADS + ) ) } } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt index 240d1299665b..8eb521b747d5 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt @@ -4,12 +4,14 @@ package io.airbyte.integrations.destination.s3_v2 +import com.fasterxml.jackson.annotation.JsonProperty import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.load.command.aws.AWSAccessKeySpecification import io.airbyte.cdk.load.command.object_storage.JsonFormatSpecification import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatSpecification import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatSpecificationProvider +import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration import io.airbyte.cdk.load.command.s3.S3BucketRegion import io.airbyte.cdk.load.command.s3.S3BucketSpecification import io.airbyte.cdk.load.command.s3.S3PathSpecification @@ -36,6 +38,12 @@ class S3V2Specification : override val fileNamePattern: String? = null override val useStagingDirectory: Boolean? = null override val s3StagingPrefix: String? = null + + @JsonProperty("max_concurrent_uploads") + val maxConcurrentUploads: Int? = + ObjectStorageUploadConfiguration.DEFAULT_MAX_NUM_CONCURRENT_UPLOADS + @JsonProperty("upload_part_size") + val uploadPartSize: Long? = ObjectStorageUploadConfiguration.DEFAULT_STREAMING_UPLOAD_PART_SIZE } @Singleton diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json index 587e25a241de..05e36a7fdc9f 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json @@ -316,6 +316,12 @@ "description" : "Path to use when staging data in the bucket directory. Airbyte will stage data here during sync and/or write small manifest/recovery files.", "title" : "S3 Staging Prefix", "examples" : [ "__staging/data_sync/test" ] + }, + "max_concurrent_uploads" : { + "type" : "integer" + }, + "upload_part_size" : { + "type" : "integer" } }, "required" : [ "access_key_id", "secret_access_key", "s3_bucket_name", "s3_bucket_path", "s3_bucket_region", "format" ] diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json index 587e25a241de..05e36a7fdc9f 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json @@ -316,6 +316,12 @@ "description" : "Path to use when staging data in the bucket directory. Airbyte will stage data here during sync and/or write small manifest/recovery files.", "title" : "S3 Staging Prefix", "examples" : [ "__staging/data_sync/test" ] + }, + "max_concurrent_uploads" : { + "type" : "integer" + }, + "upload_part_size" : { + "type" : "integer" } }, "required" : [ "access_key_id", "secret_access_key", "s3_bucket_name", "s3_bucket_path", "s3_bucket_region", "format" ]