Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions aws-runtime/aws-http/api/aws-http.api
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ public final class aws/sdk/kotlin/runtime/http/interceptors/IgnoreCompositeFlexi
public final class aws/sdk/kotlin/runtime/http/interceptors/businessmetrics/AwsBusinessMetric : java/lang/Enum, aws/smithy/kotlin/runtime/businessmetrics/BusinessMetric {
public static final field DDB_MAPPER Laws/sdk/kotlin/runtime/http/interceptors/businessmetrics/AwsBusinessMetric;
public static final field S3_EXPRESS_BUCKET Laws/sdk/kotlin/runtime/http/interceptors/businessmetrics/AwsBusinessMetric;
public static final field S3_TRANSFER Laws/sdk/kotlin/runtime/http/interceptors/businessmetrics/AwsBusinessMetric;
public static final field S3_TRANSFER_DOWNLOAD_DIRECTORY Laws/sdk/kotlin/runtime/http/interceptors/businessmetrics/AwsBusinessMetric;
public static final field S3_TRANSFER_UPLOAD_DIRECTORY Laws/sdk/kotlin/runtime/http/interceptors/businessmetrics/AwsBusinessMetric;
public static fun getEntries ()Lkotlin/enums/EnumEntries;
public fun getIdentifier ()Ljava/lang/String;
public fun toString ()Ljava/lang/String;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ internal fun formatMetrics(metrics: MutableSet<BusinessMetric>, logger: Logger):
public enum class AwsBusinessMetric(public override val identifier: String) : BusinessMetric {
S3_EXPRESS_BUCKET("J"),
DDB_MAPPER("d"),
S3_TRANSFER("G"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two new features S3_TRANSFER_UPLOAD_DIRECTORY and S3_TRANSFER_DOWNLOAD_DIRECTORY we should add here

S3_TRANSFER_UPLOAD_DIRECTORY("9"),
S3_TRANSFER_DOWNLOAD_DIRECTORY("+"),
;

@InternalApi
Expand Down
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ smithy-kotlin-telemetry-provider-micrometer = { module = "aws.smithy.kotlin:tele
smithy-kotlin-telemetry-provider-otel = { module = "aws.smithy.kotlin:telemetry-provider-otel", version.ref = "smithy-kotlin-runtime-version" }
smithy-kotlin-test-suite = { module = "aws.smithy.kotlin:test-suite", version.ref = "smithy-kotlin-runtime-version" }
smithy-kotlin-testing = { module = "aws.smithy.kotlin:testing", version.ref = "smithy-kotlin-runtime-version" }
smithy-kotlin-test-jvm = { module = "aws.smithy.kotlin:http-test-jvm", version.ref = "smithy-kotlin-runtime-version" }
smithy-kotlin-testing-jvm = { module = "aws.smithy.kotlin:testing-jvm", version.ref = "smithy-kotlin-runtime-version" }

smithy-kotlin-codegen = { module = "software.amazon.smithy.kotlin:smithy-kotlin-codegen", version.ref = "smithy-kotlin-codegen-version" }
smithy-kotlin-codegen-testutils = { module = "software.amazon.smithy.kotlin:smithy-kotlin-codegen-testutils", version.ref = "smithy-kotlin-codegen-version" }
Expand Down
2 changes: 1 addition & 1 deletion hll/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ val hllPreviewVersion = if (sdkVersion.contains("-SNAPSHOT")) { // e.g. 1.3.29-b

subprojects {
group = "aws.sdk.kotlin"
version = hllPreviewVersion
version = if (name == "s3-transfer-manager") sdkVersion else hllPreviewVersion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can override the version in the subproject rather than hacking it in here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or better, override / set hllPreviewVersion in the submodules which need it, and use sdkVersion as the default

// TODO Use configurePublishing when migrating to Sonatype Publisher API / JReleaser
configurePublishing("aws-sdk-kotlin")
}
Expand Down
303 changes: 303 additions & 0 deletions hll/s3-transfer-manager/api/s3-transfer-manager.api

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions hll/s3-transfer-manager/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

import org.gradle.kotlin.dsl.dependencies
import org.gradle.kotlin.dsl.sourceSets

/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

description = "S3 Transfer Manager for the AWS SDK for Kotlin"
extra["displayName"] = "AWS :: SDK :: Kotlin :: HLL :: S3 Transfer Manager"
extra["moduleName"] = "aws.sdk.kotlin.hll.s3transfermanager"

kotlin {
sourceSets {
commonMain {
dependencies {
implementation(project(":aws-runtime:aws-http"))
implementation(project(":services:s3"))
}
}
jvmTest {
dependencies {
implementation(libs.smithy.kotlin.test.jvm)
implementation(libs.smithy.kotlin.testing.jvm)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package aws.sdk.kotlin.hll.s3transfermanager

import aws.sdk.kotlin.hll.s3transfermanager.model.MultipartDownloadType
import aws.sdk.kotlin.hll.s3transfermanager.model.Part
import aws.sdk.kotlin.hll.s3transfermanager.model.UploadFileRequest
import aws.sdk.kotlin.hll.s3transfermanager.model.UploadFileResponse
import aws.sdk.kotlin.hll.s3transfermanager.utils.S3TransferManagerException
import aws.sdk.kotlin.hll.s3transfermanager.utils.buildCompleteMultipartUploadRequest
import aws.sdk.kotlin.hll.s3transfermanager.utils.buildUploadPartRequest
import aws.sdk.kotlin.hll.s3transfermanager.utils.ceilDiv
import aws.sdk.kotlin.hll.s3transfermanager.utils.getNextPart
import aws.sdk.kotlin.hll.s3transfermanager.utils.resolvePartSize
import aws.sdk.kotlin.hll.s3transfermanager.utils.toCreateMultiPartUploadRequest
import aws.sdk.kotlin.hll.s3transfermanager.utils.toPutObjectRequest
import aws.sdk.kotlin.hll.s3transfermanager.utils.toUploadFileResponse
import aws.sdk.kotlin.services.s3.S3Client
import aws.sdk.kotlin.services.s3.abortMultipartUpload
import aws.sdk.kotlin.services.s3.model.CompleteMultipartUploadRequest
import aws.sdk.kotlin.services.s3.model.CompleteMultipartUploadResponse
import aws.sdk.kotlin.services.s3.model.CompletedPart
import aws.sdk.kotlin.services.s3.model.CreateMultipartUploadRequest
import aws.sdk.kotlin.services.s3.model.CreateMultipartUploadResponse
import aws.sdk.kotlin.services.s3.model.PutObjectRequest
import aws.sdk.kotlin.services.s3.model.PutObjectResponse
import aws.sdk.kotlin.services.s3.model.UploadPartRequest
import aws.sdk.kotlin.services.s3.model.UploadPartResponse
import aws.sdk.kotlin.services.s3.withConfig
import aws.smithy.kotlin.runtime.content.ByteStream
import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.telemetry.logging.logger
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope

/**
* High level utility for managing transfers to Amazon S3.
*/
public class S3TransferManager private constructor(
Comment on lines +40 to +43
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: This class has too much logic inside of it and that won't be sustainable once we add more operations and variants. We should refactor this, possibly so that individual operations are located in a single file and maybe even represented by a class.

public val client: S3Client,
public val partSizeBytes: Long,
public val multipartUploadThresholdBytes: Long,
public val multipartDownloadType: MultipartDownloadType,
public val interceptors: MutableList<TransferInterceptor>,
) {
internal var context: TransferContext = TransferContext()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: We cannot share a single mutable context across an S3TM instance. Multiple transfers may be happening at the same time. Contexts should either be local to an operation in progress just like with the SDK itself (i.e., ExecutionContext is related to a call, not to the overall client) or local to a hook invocation.


public companion object {
public operator fun invoke(block: Builder.() -> Unit): S3TransferManager =
Builder().apply(block).build()
}

public class Builder {
public var client: S3Client? = null
public var partSizeBytes: Long = 8_000_000
public var multipartUploadThresholdBytes: Long = 16_000_000L
public var multipartDownloadType: MultipartDownloadType = Part
public var interceptors: MutableList<TransferInterceptor> = mutableListOf()

internal fun build(): S3TransferManager =
S3TransferManager(
client = client?.withConfig { interceptors += S3TransferManagerBusinessMetricInterceptor } ?: error("client must be set"),
partSizeBytes = partSizeBytes,
multipartUploadThresholdBytes = multipartUploadThresholdBytes,
multipartDownloadType = multipartDownloadType,
interceptors = interceptors,
)
}

/**
* Executes a sequence of operations around a hook.
*
* The execution flow is as follows:
* 1. Runs all interceptors scheduled to execute **before** the hook.
* 2. Executes the main hook logic.
* 3. Runs all interceptors scheduled to execute **after** the hook.
*/
private suspend fun operationHook(hook: TransferHook, block: suspend () -> Any) {
when (hook) {
is TransferInitiated -> {
interceptors.forEach { it.readBeforeTransferInitiated(context) }
interceptors.forEach { context = it.modifyBeforeTransferInitiated(context) }
block.invoke()
interceptors.forEach { it.readAfterTransferInitiated(context) }
interceptors.forEach { context = it.modifyAfterTransferInitiated(context) }
}
is BytesTransferred -> {
interceptors.forEach { it.readBeforeBytesTransferred(context) }
interceptors.forEach { context = it.modifyBeforeBytesTransferred(context) }
block.invoke()
interceptors.forEach { it.readAfterBytesTransferred(context) }
interceptors.forEach { context = it.modifyAfterBytesTransferred(context) }
}
is FileTransferred -> {
interceptors.forEach { it.readBeforeFileTransferred(context) }
interceptors.forEach { context = it.modifyBeforeFileTransferred(context) }
block.invoke()
interceptors.forEach { it.readAfterFileTransferred(context) }
interceptors.forEach { context = it.modifyAfterFileTransferred(context) }
}
is TransferCompleted -> {
interceptors.forEach { it.readBeforeTransferCompleted(context) }
interceptors.forEach { context = it.modifyBeforeTransferCompleted(context) }
block.invoke()
interceptors.forEach { it.readAfterTransferCompleted(context) }
interceptors.forEach { context = it.modifyAfterTransferCompleted(context) }
}
else -> error("TransferHook not implemented: ${hook::class.simpleName}")
}
}
Comment on lines +82 to +114
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: This doesn't follow the same pattern as low-level interceptors or DDB Mapper interceptors in that a single failure in any of the read* calls will prevent later interceptors/hooks from running.


/**
* Uploads a byte stream to Amazon S3, automatically using multipart uploads
* for large objects as needed.
*
* This function handles the complexity of splitting the data into parts,
* uploading each part, and completing the multipart upload. For object smaller than [multipartUploadThresholdBytes],
* a standard single-part upload is performed automatically.
*
* If the specified [partSizeBytes] for multipart uploads is too small to allow
* all parts to fit within S3's limit of 10,000 parts, the part size will be
* automatically increased so that exactly 10,000 parts are uploaded.
*/
public suspend fun uploadFile(uploadFileRequest: UploadFileRequest): Deferred<UploadFileResponse> = coroutineScope {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: Method is too long and should be refactored.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: This method shouldn't return Deferred<UploadFileResponse>, it should return UploadFileResponse just like our low-level APIs do. If users want this to happen asynchronously, they can wrap it in async { } themselves.

Note that coroutineScope does not return until all the work inside is completed (including child coroutines) so the Deferred value would already be filled anyway.

val contentLength = uploadFileRequest.body?.contentLength ?: throw S3TransferManagerException("UploadFileRequest.body.contentLength must be set")
val multiPartUpload = contentLength >= multipartUploadThresholdBytes
val uploadedParts = mutableListOf<CompletedPart>()
lateinit var mpuUploadId: String

val logger = coroutineContext.logger<S3TransferManager>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: How/when is a logger set in the CoroutineContext?


/*
Handles transfer initiated hook
*/
suspend fun transferInitiated(multiPartUpload: Boolean) {
context.transferredBytes = 0L
context.transferableBytes = contentLength
context.request = if (multiPartUpload) {
uploadFileRequest.toCreateMultiPartUploadRequest()
} else {
uploadFileRequest.toPutObjectRequest()
}
operationHook(TransferInitiated) {
if (multiPartUpload) {
context.response = client.createMultipartUpload(context.request as CreateMultipartUploadRequest)
mpuUploadId = (context.response as CreateMultipartUploadResponse).uploadId ?: throw S3TransferManagerException("Missing upload id in create multipart upload response")
}
}
}

/*
Handles bytes transferred hook
*/
suspend fun transferBytes(multiPartUpload: Boolean) {
if (multiPartUpload) {
try {
val partSize = resolvePartSize(contentLength, this@S3TransferManager, logger)
val numberOfParts = ceilDiv(contentLength, partSize)
val partSource = when (uploadFileRequest.body) {
is ByteStream.Buffer -> uploadFileRequest.body.bytes()
is ByteStream.ChannelStream -> uploadFileRequest.body.readFrom()
is ByteStream.SourceStream -> uploadFileRequest.body.readFrom()
else -> throw S3TransferManagerException("Unhandled body type: ${uploadFileRequest.body?.let { it::class.simpleName } ?: "null"}")
}
val partBuffer = SdkBuffer()
var currentPartNumber = 1L

while (context.transferredBytes!! < context.transferableBytes!!) {
partBuffer.getNextPart(partSource, partSize, this@S3TransferManager)
if (currentPartNumber != numberOfParts) {
if (partBuffer.size != partSize) {
throw S3TransferManagerException("Part #$currentPartNumber size mismatch detected. Expected $partSize, actual: ${partBuffer.size}")
}
}

context.request =
buildUploadPartRequest(
uploadFileRequest,
partBuffer,
currentPartNumber,
mpuUploadId,
)

operationHook(BytesTransferred) {
context.response = client.uploadPart(context.request as UploadPartRequest)
context.transferredBytes = context.transferredBytes!! + partSize
}

uploadedParts += CompletedPart {
partNumber = currentPartNumber.toInt()
eTag = (context.response as UploadPartResponse).eTag
}
currentPartNumber += 1
}

if (uploadedParts.size != numberOfParts.toInt()) {
throw S3TransferManagerException("The number of uploaded parts does not match the expected count. Expected $numberOfParts, actual: ${uploadedParts.size}")
}
} catch (uploadPartThrowable: Throwable) {
try {
client.abortMultipartUpload {
bucket = uploadFileRequest.bucket
expectedBucketOwner = uploadFileRequest.expectedBucketOwner
key = uploadFileRequest.key
requestPayer = uploadFileRequest.requestPayer
uploadId = mpuUploadId
}
throw S3TransferManagerException("Multipart upload failed (ID: $mpuUploadId). One or more parts could not be uploaded", uploadPartThrowable)
} catch (abortThrowable: Throwable) {
throw S3TransferManagerException("Multipart upload failed (ID: $mpuUploadId). Unable to abort multipart upload.", abortThrowable)
}
}
} else {
operationHook(BytesTransferred) {
context.response = client.putObject(context.request as PutObjectRequest)
context.transferredBytes = context.transferableBytes
}
}
}

/*
Handles transfer completed hook
*/
suspend fun transferComplete(multiPartUpload: Boolean) {
if (multiPartUpload) {
context.request = buildCompleteMultipartUploadRequest(uploadFileRequest, mpuUploadId, uploadedParts)
}
operationHook(TransferCompleted) {
if (multiPartUpload) {
try {
context.response = client.completeMultipartUpload(context.request as CompleteMultipartUploadRequest)
} catch (t: Throwable) {
throw S3TransferManagerException("Unable to complete multipart upload with ID: $mpuUploadId", t)
}
}
}
}

async {
transferInitiated(multiPartUpload)
transferBytes(multiPartUpload)
transferComplete(multiPartUpload)

when (context.response) {
is PutObjectResponse -> (context.response as PutObjectResponse).toUploadFileResponse()
is CompleteMultipartUploadResponse -> (context.response as CompleteMultipartUploadResponse).toUploadFileResponse()
else -> throw S3TransferManagerException("Unexpected response: ${context.response?.let { it::class.simpleName } ?: "null"}")
}
}
}

/**
* Uploads a byte stream to Amazon S3, automatically using multipart uploads
* for large objects as needed.
*
* This function handles the complexity of splitting the data into parts,
* uploading each part, and completing the multipart upload. For object smaller than [multipartUploadThresholdBytes],
* a standard single-part upload is performed automatically.
*
* If the specified [partSizeBytes] for multipart uploads is too small to allow
* all parts to fit within S3's limit of 10,000 parts, the part size will be
* automatically increased so that exactly 10,000 parts are uploaded.
*/
public suspend inline fun uploadFile(crossinline block: UploadFileRequest.Builder.() -> Unit): Deferred<UploadFileResponse> = uploadFile(UploadFileRequest.Builder().apply(block).build())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package aws.sdk.kotlin.hll.s3transfermanager

import aws.sdk.kotlin.runtime.http.interceptors.businessmetrics.AwsBusinessMetric
import aws.smithy.kotlin.runtime.businessmetrics.emitBusinessMetric
import aws.smithy.kotlin.runtime.client.RequestInterceptorContext
import aws.smithy.kotlin.runtime.http.interceptors.HttpInterceptor

/**
* An interceptor that emits the S3 Transfer Manager business metric
*/
internal object S3TransferManagerBusinessMetricInterceptor : HttpInterceptor {
override suspend fun modifyBeforeSerialization(context: RequestInterceptorContext<Any>): Any {
context.executionContext.emitBusinessMetric(AwsBusinessMetric.S3_TRANSFER)
return context.request
}
}
Loading
Loading