55
66package aws.sdk.kotlin.hll.s3transfermanager
77
8- import aws.sdk.kotlin.hll.s3transfermanager.model.MultiPartDownloadType
8+ import aws.sdk.kotlin.hll.s3transfermanager.model.MultipartDownloadType
99import aws.sdk.kotlin.hll.s3transfermanager.model.Part
1010import aws.sdk.kotlin.hll.s3transfermanager.model.UploadFileRequest
1111import aws.sdk.kotlin.hll.s3transfermanager.model.UploadFileResponse
12+ import aws.sdk.kotlin.hll.s3transfermanager.utils.S3TransferManagerException
1213import aws.sdk.kotlin.hll.s3transfermanager.utils.buildCompleteMultipartUploadRequest
1314import aws.sdk.kotlin.hll.s3transfermanager.utils.buildUploadPartRequest
1415import aws.sdk.kotlin.hll.s3transfermanager.utils.ceilDiv
@@ -41,9 +42,9 @@ import kotlinx.coroutines.coroutineScope
4142 */
4243public class S3TransferManager private constructor(
4344 public val client : S3Client ,
44- public val targePartSize : Long ,
45- public val multipartUploadThreshold : Long ,
46- public val multipartDownloadType : MultiPartDownloadType ,
45+ public val partSizeBytes : Long ,
46+ public val multipartUploadThresholdBytes : Long ,
47+ public val multipartDownloadType : MultipartDownloadType ,
4748 public val interceptors : MutableList <TransferInterceptor >,
4849) {
4950 internal var context: TransferContext = TransferContext ()
@@ -55,16 +56,16 @@ public class S3TransferManager private constructor(
5556
5657 public class Builder {
5758 public var client: S3Client ? = null
58- public var targePartSize : Long = 8_000_000
59- public var multipartUploadThreshold : Long = 16_000_000L
60- public var multipartDownloadType: MultiPartDownloadType = Part
59+ public var partSizeBytes : Long = 8_000_000
60+ public var multipartUploadThresholdBytes : Long = 16_000_000L
61+ public var multipartDownloadType: MultipartDownloadType = Part
6162 public var interceptors: MutableList <TransferInterceptor > = mutableListOf ()
6263
6364 internal fun build (): S3TransferManager =
6465 S3TransferManager (
65- client = client?.withConfig { interceptors + = BusinessMetricInterceptor } ? : error(" client must be set" ),
66- targePartSize = targePartSize ,
67- multipartUploadThreshold = multipartUploadThreshold ,
66+ client = client?.withConfig { interceptors + = S3TransferManagerBusinessMetricInterceptor } ? : error(" client must be set" ),
67+ partSizeBytes = partSizeBytes ,
68+ multipartUploadThresholdBytes = multipartUploadThresholdBytes ,
6869 multipartDownloadType = multipartDownloadType,
6970 interceptors = interceptors,
7071 )
@@ -117,17 +118,18 @@ public class S3TransferManager private constructor(
117118 * for large objects as needed.
118119 *
119120 * This function handles the complexity of splitting the data into parts,
120- * uploading each part, and completing the multipart upload. For object smaller than [multipartUploadThreshold ],
121+ * uploading each part, and completing the multipart upload. For object smaller than [multipartUploadThresholdBytes ],
121122 * a standard single-part upload is performed automatically.
122123 *
123- * If the specified [targePartSize ] for multipart uploads is too small to allow
124+ * If the specified [partSizeBytes ] for multipart uploads is too small to allow
124125 * all parts to fit within S3's limit of 10,000 parts, the part size will be
125126 * automatically increased so that exactly 10,000 parts are uploaded.
126127 */
127128 public suspend fun uploadFile (uploadFileRequest : UploadFileRequest ): Deferred <UploadFileResponse > = coroutineScope {
128- val multiPartUpload = uploadFileRequest.contentLength >= multipartUploadThreshold
129+ val contentLength = uploadFileRequest.body?.contentLength ? : throw S3TransferManagerException (" UploadFileRequest.body.contentLength must be set" )
130+ val multiPartUpload = contentLength >= multipartUploadThresholdBytes
129131 val uploadedParts = mutableListOf<CompletedPart >()
130- var mpuUploadId = " null "
132+ lateinit var mpuUploadId: String
131133
132134 val logger = coroutineContext.logger<S3TransferManager >()
133135
@@ -136,7 +138,7 @@ public class S3TransferManager private constructor(
136138 */
137139 suspend fun transferInitiated (multiPartUpload : Boolean ) {
138140 context.transferredBytes = 0L
139- context.transferableBytes = uploadFileRequest. contentLength
141+ context.transferableBytes = contentLength
140142 context.request = if (multiPartUpload) {
141143 uploadFileRequest.toCreateMultiPartUploadRequest()
142144 } else {
@@ -145,7 +147,7 @@ public class S3TransferManager private constructor(
145147 operationHook(TransferInitiated ) {
146148 if (multiPartUpload) {
147149 context.response = client.createMultipartUpload(context.request as CreateMultipartUploadRequest )
148- mpuUploadId = (context.response as CreateMultipartUploadResponse ).uploadId ? : throw Exception (" Missing upload id in create multipart upload response" )
150+ mpuUploadId = (context.response as CreateMultipartUploadResponse ).uploadId ? : throw S3TransferManagerException (" Missing upload id in create multipart upload response" )
149151 }
150152 }
151153 }
@@ -156,22 +158,22 @@ public class S3TransferManager private constructor(
156158 suspend fun transferBytes (multiPartUpload : Boolean ) {
157159 if (multiPartUpload) {
158160 try {
159- val partSize = resolvePartSize(uploadFileRequest , this @S3TransferManager, logger)
160- val numberOfParts = ceilDiv(uploadFileRequest. contentLength, partSize)
161+ val partSize = resolvePartSize(contentLength , this @S3TransferManager, logger)
162+ val numberOfParts = ceilDiv(contentLength, partSize)
161163 val partSource = when (uploadFileRequest.body) {
162164 is ByteStream .Buffer -> uploadFileRequest.body.bytes()
163165 is ByteStream .ChannelStream -> uploadFileRequest.body.readFrom()
164166 is ByteStream .SourceStream -> uploadFileRequest.body.readFrom()
165- else -> error (" Unhandled body type: ${uploadFileRequest.body?.let { it::class .simpleName } ? : " null" } " )
167+ else -> throw S3TransferManagerException (" Unhandled body type: ${uploadFileRequest.body?.let { it::class .simpleName } ? : " null" } " )
166168 }
167169 val partBuffer = SdkBuffer ()
168170 var currentPartNumber = 1L
169171
170172 while (context.transferredBytes!! < context.transferableBytes!! ) {
171173 partBuffer.getNextPart(partSource, partSize, this @S3TransferManager)
172174 if (currentPartNumber != numberOfParts) {
173- check (partBuffer.size = = partSize) {
174- " Part #$currentPartNumber size mismatch detected. Expected $partSize , actual: ${partBuffer.size} "
175+ if (partBuffer.size ! = partSize) {
176+ throw S3TransferManagerException ( " Part #$currentPartNumber size mismatch detected. Expected $partSize , actual: ${partBuffer.size} " )
175177 }
176178 }
177179
@@ -195,8 +197,8 @@ public class S3TransferManager private constructor(
195197 currentPartNumber + = 1
196198 }
197199
198- check (uploadedParts.size = = numberOfParts.toInt()) {
199- " The number of uploaded parts does not match the expected count. Expected $numberOfParts , actual: ${uploadedParts.size} "
200+ if (uploadedParts.size ! = numberOfParts.toInt()) {
201+ throw S3TransferManagerException ( " The number of uploaded parts does not match the expected count. Expected $numberOfParts , actual: ${uploadedParts.size} " )
200202 }
201203 } catch (uploadPartThrowable: Throwable ) {
202204 try {
@@ -207,9 +209,9 @@ public class S3TransferManager private constructor(
207209 requestPayer = uploadFileRequest.requestPayer
208210 uploadId = mpuUploadId
209211 }
210- throw Exception (" Multipart upload failed (ID: $mpuUploadId ). One or more parts could not be uploaded" , uploadPartThrowable)
212+ throw S3TransferManagerException (" Multipart upload failed (ID: $mpuUploadId ). One or more parts could not be uploaded" , uploadPartThrowable)
211213 } catch (abortThrowable: Throwable ) {
212- throw Exception (" Multipart upload failed (ID: $mpuUploadId ). Unable to abort multipart upload." , abortThrowable)
214+ throw S3TransferManagerException (" Multipart upload failed (ID: $mpuUploadId ). Unable to abort multipart upload." , abortThrowable)
213215 }
214216 }
215217 } else {
@@ -232,28 +234,21 @@ public class S3TransferManager private constructor(
232234 try {
233235 context.response = client.completeMultipartUpload(context.request as CompleteMultipartUploadRequest )
234236 } catch (t: Throwable ) {
235- throw Exception (" Unable to complete multipart upload with ID: $mpuUploadId " , t)
237+ throw S3TransferManagerException (" Unable to complete multipart upload with ID: $mpuUploadId " , t)
236238 }
237239 }
238240 }
239241 }
240242
241243 async {
242- checkNotNull(uploadFileRequest.body?.contentLength) {
243- " UploadFileRequest.body.contentLength must be set"
244- }
245- check(uploadFileRequest.body.contentLength == uploadFileRequest.contentLength) {
246- " contentLength mismatch. uploadFileRequest: ${uploadFileRequest.contentLength} , uploadFileRequest.body.contentLength: ${uploadFileRequest.body.contentLength} "
247- }
248-
249244 transferInitiated(multiPartUpload)
250245 transferBytes(multiPartUpload)
251246 transferComplete(multiPartUpload)
252247
253248 when (context.response) {
254249 is PutObjectResponse -> (context.response as PutObjectResponse ).toUploadFileResponse()
255250 is CompleteMultipartUploadResponse -> (context.response as CompleteMultipartUploadResponse ).toUploadFileResponse()
256- else -> error (" Unexpected response: ${context.response?.let { it::class .simpleName } ? : " null" } " )
251+ else -> throw S3TransferManagerException (" Unexpected response: ${context.response?.let { it::class .simpleName } ? : " null" } " )
257252 }
258253 }
259254 }
@@ -263,10 +258,10 @@ public class S3TransferManager private constructor(
263258 * for large objects as needed.
264259 *
265260 * This function handles the complexity of splitting the data into parts,
266- * uploading each part, and completing the multipart upload. For object smaller than [multipartUploadThreshold ],
261+ * uploading each part, and completing the multipart upload. For object smaller than [multipartUploadThresholdBytes ],
267262 * a standard single-part upload is performed automatically.
268263 *
269- * If the specified [targePartSize ] for multipart uploads is too small to allow
264+ * If the specified [partSizeBytes ] for multipart uploads is too small to allow
270265 * all parts to fit within S3's limit of 10,000 parts, the part size will be
271266 * automatically increased so that exactly 10,000 parts are uploaded.
272267 */
0 commit comments