Skip to content

Commit

Permalink
convert missing s3-destinations tests (airbytehq#36570)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte authored and nurikk-sa committed Apr 4, 2024
1 parent 8bafd63 commit 247e46d
Show file tree
Hide file tree
Showing 37 changed files with 1,391 additions and 1,272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ abstract class BaseGcsDestination : BaseConnector(), Destination {
val s3Client = destinationConfig.getS3Client()

// Test single upload (for small files) permissions
testSingleUpload(s3Client, destinationConfig.bucketName, destinationConfig.bucketPath)
testSingleUpload(s3Client, destinationConfig.bucketName, destinationConfig.bucketPath!!)

// Test multipart upload with stream transfer manager
testMultipartUpload(
s3Client,
destinationConfig.bucketName,
destinationConfig.bucketPath
destinationConfig.bucketPath!!
)

return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format
namespaceStr,
streamNameStr,
DateTime.now(DateTimeZone.UTC),
config!!.pathFormat
config!!.pathFormat!!
)
// the child folder contains a non-deterministic epoch timestamp, so use the parent folder
val parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected constructor(
S3BaseChecks.testSingleUpload(
s3Client,
destinationConfig.bucketName,
destinationConfig.bucketPath
destinationConfig.bucketPath!!
)
S3BaseChecks.testMultipartUpload(
s3Client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object S3BaseChecks {
fun attemptS3WriteAndDelete(
storageOperations: S3StorageOperations,
s3Config: S3DestinationConfig,
bucketPath: String
bucketPath: String?
) {
attemptS3WriteAndDelete(storageOperations, s3Config, bucketPath, s3Config.getS3Client())
}
Expand Down Expand Up @@ -104,11 +104,11 @@ object S3BaseChecks {
fun attemptS3WriteAndDelete(
storageOperations: S3StorageOperations,
s3Config: S3DestinationConfig,
bucketPath: String,
bucketPath: String?,
s3: AmazonS3
) {
val prefix =
if (Strings.isNullOrEmpty(bucketPath)) {
if (bucketPath.isNullOrEmpty()) {
""
} else if (bucketPath.endsWith("/")) {
bucketPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import org.slf4j.LoggerFactory
*/
open class S3DestinationConfig {
@JvmField val endpoint: String?
@JvmField val bucketName: String
@JvmField val bucketPath: String
@JvmField val bucketName: String?
@JvmField val bucketPath: String?
@JvmField val bucketRegion: String?
@JvmField val pathFormat: String
val s3CredentialConfig: S3CredentialConfig
@JvmField val pathFormat: String?
val s3CredentialConfig: S3CredentialConfig?
@JvmField val formatConfig: S3FormatConfig?
var fileNamePattern: String? = null
private set
Expand All @@ -55,10 +55,10 @@ open class S3DestinationConfig {
constructor(
endpoint: String?,
bucketName: String,
bucketPath: String,
bucketPath: String?,
bucketRegion: String?,
pathFormat: String,
credentialConfig: S3CredentialConfig,
pathFormat: String?,
credentialConfig: S3CredentialConfig?,
formatConfig: S3FormatConfig?,
s3Client: AmazonS3
) {
Expand All @@ -74,11 +74,11 @@ open class S3DestinationConfig {

constructor(
endpoint: String?,
bucketName: String,
bucketPath: String,
bucketName: String?,
bucketPath: String?,
bucketRegion: String?,
pathFormat: String,
credentialConfig: S3CredentialConfig,
pathFormat: String?,
credentialConfig: S3CredentialConfig?,
formatConfig: S3FormatConfig?,
s3Client: AmazonS3?,
fileNamePattern: String?,
Expand Down Expand Up @@ -110,8 +110,8 @@ open class S3DestinationConfig {
protected open fun createS3Client(): AmazonS3 {
LOGGER.info("Creating S3 client...")

val credentialsProvider = s3CredentialConfig.s3CredentialsProvider
val credentialType = s3CredentialConfig.credentialType
val credentialsProvider = s3CredentialConfig!!.s3CredentialsProvider
val credentialType = s3CredentialConfig!!.credentialType

if (S3CredentialType.DEFAULT_PROFILE == credentialType) {
return AmazonS3ClientBuilder.standard()
Expand Down Expand Up @@ -173,7 +173,7 @@ open class S3DestinationConfig {
}

class Builder(
private var bucketName: String,
private var bucketName: String?,
private var bucketPath: String,
private var bucketRegion: String?
) {
Expand Down Expand Up @@ -271,15 +271,15 @@ open class S3DestinationConfig {
private const val R2_INSTANCE_URL = "https://%s.r2.cloudflarestorage.com"

@JvmStatic
fun create(bucketName: String, bucketPath: String, bucketRegion: String?): Builder {
fun create(bucketName: String?, bucketPath: String, bucketRegion: String?): Builder {
return Builder(bucketName, bucketPath, bucketRegion)
}

@JvmStatic
fun create(config: S3DestinationConfig): Builder {
return Builder(config.bucketName, config.bucketPath, config.bucketRegion)
return Builder(config.bucketName, config.bucketPath!!, config.bucketRegion)
.withEndpoint(config.endpoint)
.withCredentialConfig(config.s3CredentialConfig)
.withCredentialConfig(config.s3CredentialConfig!!)
.withFormatConfig(config.formatConfig)
}

Expand Down Expand Up @@ -358,9 +358,9 @@ open class S3DestinationConfig {
return builder.get()
}

private fun getProperty(config: JsonNode, @Nonnull key: String): String {
val node = config.get(key)
return node.asText()
private fun getProperty(config: JsonNode, @Nonnull key: String): String? {
val node: JsonNode? = config.get(key)
return node?.asText()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ open class S3StorageOperations(

/** Create a directory object at the specified location. Creates the bucket if necessary. */
override fun createBucketIfNotExists() {
val bucket: String = s3Config.bucketName
val bucket: String? = s3Config.bucketName
if (!doesBucketExist(bucket)) {
logger.info { "Bucket $bucket does not exist; creating..." }
s3Client.createBucket(bucket)
Expand Down Expand Up @@ -170,7 +170,7 @@ open class S3StorageOperations(
@Throws(IOException::class)
private fun loadDataIntoBucket(objectPath: String, recordsData: SerializableBuffer): String {
val partSize: Long = DEFAULT_PART_SIZE.toLong()
val bucket: String = s3Config.bucketName
val bucket: String? = s3Config.bucketName
val partId: String = getPartId(objectPath)
val fileExtension: String = getExtension(recordsData.filename)
val fullObjectKey: String =
Expand Down Expand Up @@ -261,7 +261,7 @@ open class S3StorageOperations(
var objects: ObjectListing?
var objectCount = 0

val bucket: String = s3Config.bucketName
val bucket: String? = s3Config.bucketName
objects = s3Client.listObjects(bucket, objectPath)

if (objects != null) {
Expand Down Expand Up @@ -296,7 +296,7 @@ open class S3StorageOperations(
objectPath: String,
pathFormat: String
) {
val bucket: String = s3Config.bucketName
val bucket: String? = s3Config.bucketName
var objects: ObjectListing =
s3Client.listObjects(
ListObjectsRequest()
Expand Down Expand Up @@ -360,7 +360,7 @@ open class S3StorageOperations(
}

override fun cleanUpBucketObject(objectPath: String, stagedFiles: List<String>) {
val bucket: String = s3Config.bucketName
val bucket: String? = s3Config.bucketName
var objects: ObjectListing = s3Client.listObjects(bucket, objectPath)
while (objects.objectSummaries.size > 0) {
val keysToDelete: List<DeleteObjectsRequest.KeyVersion> =
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.integrations.destination.s3

import java.util.Map
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

class BlobDecoratorTest {
@Test
fun testOverwriteMetadata() {
val metadata: MutableMap<String, String> = HashMap()
metadata["amz-foo"] = "oldValue"

BlobDecorator.insertMetadata(metadata, Map.of("foo", "amz-foo"), "foo", "newValue")

Assertions.assertEquals(Map.of("amz-foo", "newValue"), metadata)
}

@Test
fun testNewMetadata() {
val metadata: MutableMap<String, String> = HashMap()
metadata["amz-foo"] = "oldValue"

BlobDecorator.insertMetadata(metadata, Map.of("bar", "amz-bar"), "bar", "newValue")

Assertions.assertEquals(Map.of("amz-foo", "oldValue", "amz-bar", "newValue"), metadata)
}

@Test
fun testSkipMetadata() {
val metadata: MutableMap<String, String> = HashMap()
metadata["amz-foo"] = "oldValue"

BlobDecorator.insertMetadata(metadata, Map.of("foo", "amz-foo"), "bar", "newValue")

Assertions.assertEquals(Map.of("amz-foo", "oldValue"), metadata)
}
}

This file was deleted.

Loading

0 comments on commit 247e46d

Please sign in to comment.