Skip to content

Commit

Permalink
Bulk Load CDK: S3V2: Concurrency limits for streaming multipart upload (
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Nov 13, 2024
1 parent 7253ebd commit bb880c4
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@

package io.airbyte.cdk.load.command.object_storage

data class ObjectStorageUploadConfiguration(val streamingUploadPartSize: Long)
data class ObjectStorageUploadConfiguration(
val streamingUploadPartSize: Long = DEFAULT_STREAMING_UPLOAD_PART_SIZE,
val maxNumConcurrentUploads: Int = DEFAULT_MAX_NUM_CONCURRENT_UPLOADS
) {
companion object {
const val DEFAULT_STREAMING_UPLOAD_PART_SIZE = 5L * 1024L * 1024L
const val DEFAULT_MAX_NUM_CONCURRENT_UPLOADS = 5
}
}

interface ObjectStorageUploadConfigurationProvider {
val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import io.airbyte.cdk.load.file.NoopProcessor
import io.airbyte.cdk.load.file.StreamProcessor
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
import io.airbyte.cdk.load.file.object_storage.RemoteObject
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.io.OutputStream
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit

data class S3Object(override val key: String, override val storageConfig: S3BucketConfiguration) :
RemoteObject<S3BucketConfiguration> {
Expand All @@ -44,6 +47,8 @@ class S3Client(
val bucketConfig: S3BucketConfiguration,
private val uploadConfig: ObjectStorageUploadConfiguration?,
) : ObjectStorageClient<S3Object> {
private val log = KotlinLogging.logger {}
private val uploadPermits = uploadConfig?.maxNumConcurrentUploads?.let { Semaphore(it) }

override suspend fun list(prefix: String) = flow {
var request = ListObjectsRequest {
Expand Down Expand Up @@ -129,6 +134,24 @@ class S3Client(
metadata: Map<String, String>,
streamProcessor: StreamProcessor<U>?,
block: suspend (OutputStream) -> Unit
): S3Object {
if (uploadPermits != null) {
uploadPermits.withPermit {
log.info {
"Attempting to acquire upload permit for $key (${uploadPermits.availablePermits} available)"
}
return streamingUploadInner(key, metadata, streamProcessor, block)
}
} else {
return streamingUploadInner(key, metadata, streamProcessor, block)
}
}

private suspend fun <U : OutputStream> streamingUploadInner(
key: String,
metadata: Map<String, String>,
streamProcessor: StreamProcessor<U>?,
block: suspend (OutputStream) -> Unit
): S3Object {
val request = CreateMultipartUploadRequest {
this.bucket = bucketConfig.s3BucketName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
dockerRepository: airbyte/destination-s3-v2
githubIssueLabel: destination-s3-v2
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ data class S3V2Configuration<T : OutputStream>(
override val objectStorageCompressionConfiguration: ObjectStorageCompressionConfiguration<T>,

// Internal configuration
override val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration =
ObjectStorageUploadConfiguration(5L * 1024 * 1024),
override val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration,
override val recordBatchSizeBytes: Long = 200L * 1024 * 1024,
) :
DestinationConfiguration(),
Expand All @@ -52,7 +51,14 @@ class S3V2ConfigurationFactory :
s3BucketConfiguration = pojo.toS3BucketConfiguration(),
objectStoragePathConfiguration = pojo.toObjectStoragePathConfiguration(),
objectStorageFormatConfiguration = pojo.toObjectStorageFormatConfiguration(),
objectStorageCompressionConfiguration = pojo.toCompressionConfiguration()
objectStorageCompressionConfiguration = pojo.toCompressionConfiguration(),
objectStorageUploadConfiguration =
ObjectStorageUploadConfiguration(
pojo.uploadPartSize
?: ObjectStorageUploadConfiguration.DEFAULT_STREAMING_UPLOAD_PART_SIZE,
pojo.maxConcurrentUploads
?: ObjectStorageUploadConfiguration.DEFAULT_MAX_NUM_CONCURRENT_UPLOADS
)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

package io.airbyte.integrations.destination.s3_v2

import com.fasterxml.jackson.annotation.JsonProperty
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.load.command.aws.AWSAccessKeySpecification
import io.airbyte.cdk.load.command.object_storage.JsonFormatSpecification
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatSpecification
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatSpecificationProvider
import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration
import io.airbyte.cdk.load.command.s3.S3BucketRegion
import io.airbyte.cdk.load.command.s3.S3BucketSpecification
import io.airbyte.cdk.load.command.s3.S3PathSpecification
Expand All @@ -36,6 +38,12 @@ class S3V2Specification :
override val fileNamePattern: String? = null
override val useStagingDirectory: Boolean? = null
override val s3StagingPrefix: String? = null

@JsonProperty("max_concurrent_uploads")
val maxConcurrentUploads: Int? =
ObjectStorageUploadConfiguration.DEFAULT_MAX_NUM_CONCURRENT_UPLOADS
@JsonProperty("upload_part_size")
val uploadPartSize: Long? = ObjectStorageUploadConfiguration.DEFAULT_STREAMING_UPLOAD_PART_SIZE
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,12 @@
"description" : "Path to use when staging data in the bucket directory. Airbyte will stage data here during sync and/or write small manifest/recovery files.",
"title" : "S3 Staging Prefix",
"examples" : [ "__staging/data_sync/test" ]
},
"max_concurrent_uploads" : {
"type" : "integer"
},
"upload_part_size" : {
"type" : "integer"
}
},
"required" : [ "access_key_id", "secret_access_key", "s3_bucket_name", "s3_bucket_path", "s3_bucket_region", "format" ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,12 @@
"description" : "Path to use when staging data in the bucket directory. Airbyte will stage data here during sync and/or write small manifest/recovery files.",
"title" : "S3 Staging Prefix",
"examples" : [ "__staging/data_sync/test" ]
},
"max_concurrent_uploads" : {
"type" : "integer"
},
"upload_part_size" : {
"type" : "integer"
}
},
"required" : [ "access_key_id", "secret_access_key", "s3_bucket_name", "s3_bucket_path", "s3_bucket_region", "format" ]
Expand Down

0 comments on commit bb880c4

Please sign in to comment.