Skip to content

Commit

Permalink
Bulk Load CDK: Async part upload dispatch per part, not per byte/array
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Nov 15, 2024
1 parent 33cecdd commit e47f10b
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,23 @@ class S3MultipartUpload<T : OutputStream>(
uploadConfig: ObjectStorageUploadConfiguration?,
) {
private val log = KotlinLogging.logger {}

private val uploadedParts = mutableListOf<CompletedPart>()
private val partSize =
uploadConfig?.streamingUploadPartSize
?: throw IllegalStateException("Streaming upload part size is not configured")
private val wrappingBuffer = streamProcessor.wrapper(underlyingBuffer)
private val workQueue = Channel<suspend () -> Unit>(Channel.UNLIMITED)
private val closeOnce = AtomicBoolean(false)
private val partQueue = Channel<ByteArray>(Channel.UNLIMITED)
private val isClosed = AtomicBoolean(false)

/**
* Run the upload using the provided block. This should only be used by the
* [S3Client.streamingUpload] method. Work items are processed asynchronously in the [launch]
* block. The for loop will suspend until [workQueue] is closed, after which the call to
* [complete] will finish the upload.
* [S3Client.streamingUpload] method. Completed partss are processed asynchronously in the
* [launch] block. The for loop will suspend until [partQueue] is closed, after which the call
* to [complete] will finish the upload.
*
* Moreover, [runUsing] will not return until the launch block exits. This ensures
* - work items are processed in order
* - minimal work is done in [runBlocking] (just enough to enqueue the work items)
* - parts are processed in order
* - minimal work is done in [runBlocking] (just enough to enqueue the parts, and only once per
* part)
* - the upload will not complete until the [OutputStream.close] is called (either by the user
* in [block] or when the [use] block terminates).
* - the upload will not complete until all the work is done
Expand All @@ -68,10 +67,17 @@ class S3MultipartUpload<T : OutputStream>(
"Starting multipart upload to ${response.bucket}/${response.key} (${response.uploadId}"
}
launch {
for (item in workQueue) {
item()
val uploadedParts = mutableListOf<CompletedPart>()
for (bytes in partQueue) {
val part = uploadPart(bytes, uploadedParts)
uploadedParts.add(part)
}
streamProcessor.partFinisher.invoke(wrappingBuffer)
if (underlyingBuffer.size() > 0) {
val part = uploadPart(underlyingBuffer.toByteArray(), uploadedParts)
uploadedParts.add(part)
}
complete()
complete(uploadedParts)
}
UploadStream().use { block(it) }
log.info {
Expand All @@ -80,57 +86,56 @@ class S3MultipartUpload<T : OutputStream>(
}

inner class UploadStream : OutputStream() {
override fun close() = runBlocking {
if (closeOnce.setOnce()) {
workQueue.send { workQueue.close() }
override fun close() {
if (isClosed.setOnce()) {
partQueue.close()
}
}

override fun flush() = runBlocking { workQueue.send { wrappingBuffer.flush() } }
override fun flush() = wrappingBuffer.flush()

override fun write(b: Int) = runBlocking {
workQueue.send {
wrappingBuffer.write(b)
if (underlyingBuffer.size() >= partSize) {
uploadPart()
}
override fun write(b: Int) {
wrappingBuffer.write(b)
if (underlyingBuffer.size() >= partSize) {
enqueuePart()
}
}

override fun write(b: ByteArray) = runBlocking {
workQueue.send {
wrappingBuffer.write(b)
if (underlyingBuffer.size() >= partSize) {
uploadPart()
}
override fun write(b: ByteArray) {
wrappingBuffer.write(b)
if (underlyingBuffer.size() >= partSize) {
enqueuePart()
}
}
}

private suspend fun uploadPart() {
streamProcessor.partFinisher.invoke(wrappingBuffer)
private fun enqueuePart() {
wrappingBuffer.flush()
val bytes = underlyingBuffer.toByteArray()
underlyingBuffer.reset()
runBlocking { partQueue.send(bytes) }
}

private suspend fun uploadPart(
bytes: ByteArray,
uploadedParts: List<CompletedPart>
): CompletedPart {
val partNumber = uploadedParts.size + 1
val request = UploadPartRequest {
uploadId = response.uploadId
bucket = response.bucket
key = response.key
body = ByteStream.fromBytes(underlyingBuffer.toByteArray())
body = ByteStream.fromBytes(bytes)
this.partNumber = partNumber
}
val uploadResponse = client.uploadPart(request)
uploadedParts.add(
CompletedPart {
this.partNumber = partNumber
this.eTag = uploadResponse.eTag
}
)
underlyingBuffer.reset()
return CompletedPart {
this.partNumber = partNumber
this.eTag = uploadResponse.eTag
}
}

private suspend fun complete() {
if (underlyingBuffer.size() > 0) {
uploadPart()
}
private suspend fun complete(uploadedParts: List<CompletedPart>) {
val request = CompleteMultipartUploadRequest {
uploadId = response.uploadId
bucket = response.bucket
Expand Down
15 changes: 15 additions & 0 deletions airbyte-integrations/connectors/destination-s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ application {

// uncomment and replace to run locally
//applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0', '--add-opens', 'java.base/java.lang=ALL-UNNAMED']
applicationDefaultJvmArgs = [
'-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'
// Uncomment to enable remote profiling:
// '-XX:NativeMemoryTracking=detail',
// '-Djava.rmi.server.hostname=localhost',
// '-Dcom.sun.management.jmxremote=true',
// '-Dcom.sun.management.jmxremote.port=6000',
// '-Dcom.sun.management.jmxremote.rmi.port=6000',
// '-Dcom.sun.management.jmxremote.local.only=false',
// '-Dcom.sun.management.jmxremote.authenticate=false',
// '-Dcom.sun.management.jmxremote.ssl=false',
]

// Uncomment and replace to run locally
//applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0', '--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED', '--add-opens', 'java.base/sun.security.action=ALL-UNNAMED', '--add-opens', 'java.base/java.lang=ALL-UNNAMED']
}

// uncomment to run locally
Expand Down

0 comments on commit e47f10b

Please sign in to comment.