Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert missing s3-destinations tests #36570

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ abstract class BaseGcsDestination : BaseConnector(), Destination {
val s3Client = destinationConfig.getS3Client()

// Test single upload (for small files) permissions
testSingleUpload(s3Client, destinationConfig.bucketName, destinationConfig.bucketPath)
testSingleUpload(s3Client, destinationConfig.bucketName, destinationConfig.bucketPath!!)

// Test multipart upload with stream transfer manager
testMultipartUpload(
s3Client,
destinationConfig.bucketName,
destinationConfig.bucketPath
destinationConfig.bucketPath!!
)

return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format
namespaceStr,
streamNameStr,
DateTime.now(DateTimeZone.UTC),
config!!.pathFormat
config!!.pathFormat!!
)
// the child folder contains a non-deterministic epoch timestamp, so use the parent folder
val parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected constructor(
S3BaseChecks.testSingleUpload(
s3Client,
destinationConfig.bucketName,
destinationConfig.bucketPath
destinationConfig.bucketPath!!
)
S3BaseChecks.testMultipartUpload(
s3Client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object S3BaseChecks {
fun attemptS3WriteAndDelete(
storageOperations: S3StorageOperations,
s3Config: S3DestinationConfig,
bucketPath: String
bucketPath: String?
) {
attemptS3WriteAndDelete(storageOperations, s3Config, bucketPath, s3Config.getS3Client())
}
Expand Down Expand Up @@ -104,11 +104,11 @@ object S3BaseChecks {
fun attemptS3WriteAndDelete(
storageOperations: S3StorageOperations,
s3Config: S3DestinationConfig,
bucketPath: String,
bucketPath: String?,
s3: AmazonS3
) {
val prefix =
if (Strings.isNullOrEmpty(bucketPath)) {
if (bucketPath.isNullOrEmpty()) {
""
} else if (bucketPath.endsWith("/")) {
bucketPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import org.slf4j.LoggerFactory
*/
open class S3DestinationConfig {
@JvmField val endpoint: String?
@JvmField val bucketName: String
@JvmField val bucketPath: String
@JvmField val bucketName: String?
@JvmField val bucketPath: String?
@JvmField val bucketRegion: String?
@JvmField val pathFormat: String
val s3CredentialConfig: S3CredentialConfig
@JvmField val pathFormat: String?
val s3CredentialConfig: S3CredentialConfig?
@JvmField val formatConfig: S3FormatConfig?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to convert these to property ? since they are accessed using getters. Or as a subsequent PR.

var fileNamePattern: String? = null
private set
Expand All @@ -55,10 +55,10 @@ open class S3DestinationConfig {
constructor(
endpoint: String?,
bucketName: String,
bucketPath: String,
bucketPath: String?,
bucketRegion: String?,
pathFormat: String,
credentialConfig: S3CredentialConfig,
pathFormat: String?,
credentialConfig: S3CredentialConfig?,
formatConfig: S3FormatConfig?,
s3Client: AmazonS3
) {
Expand All @@ -74,11 +74,11 @@ open class S3DestinationConfig {

constructor(
endpoint: String?,
bucketName: String,
bucketPath: String,
bucketName: String?,
bucketPath: String?,
bucketRegion: String?,
pathFormat: String,
credentialConfig: S3CredentialConfig,
pathFormat: String?,
credentialConfig: S3CredentialConfig?,
formatConfig: S3FormatConfig?,
s3Client: AmazonS3?,
fileNamePattern: String?,
Expand Down Expand Up @@ -110,8 +110,8 @@ open class S3DestinationConfig {
protected open fun createS3Client(): AmazonS3 {
LOGGER.info("Creating S3 client...")

val credentialsProvider = s3CredentialConfig.s3CredentialsProvider
val credentialType = s3CredentialConfig.credentialType
val credentialsProvider = s3CredentialConfig!!.s3CredentialsProvider
val credentialType = s3CredentialConfig!!.credentialType

if (S3CredentialType.DEFAULT_PROFILE == credentialType) {
return AmazonS3ClientBuilder.standard()
Expand Down Expand Up @@ -173,7 +173,7 @@ open class S3DestinationConfig {
}

class Builder(
private var bucketName: String,
private var bucketName: String?,
private var bucketPath: String,
private var bucketRegion: String?
) {
Expand Down Expand Up @@ -271,15 +271,15 @@ open class S3DestinationConfig {
private const val R2_INSTANCE_URL = "https://%s.r2.cloudflarestorage.com"

@JvmStatic
fun create(bucketName: String, bucketPath: String, bucketRegion: String?): Builder {
fun create(bucketName: String?, bucketPath: String, bucketRegion: String?): Builder {
return Builder(bucketName, bucketPath, bucketRegion)
}

@JvmStatic
fun create(config: S3DestinationConfig): Builder {
return Builder(config.bucketName, config.bucketPath, config.bucketRegion)
return Builder(config.bucketName, config.bucketPath!!, config.bucketRegion)
.withEndpoint(config.endpoint)
.withCredentialConfig(config.s3CredentialConfig)
.withCredentialConfig(config.s3CredentialConfig!!)
.withFormatConfig(config.formatConfig)
}

Expand Down Expand Up @@ -358,9 +358,9 @@ open class S3DestinationConfig {
return builder.get()
}

private fun getProperty(config: JsonNode, @Nonnull key: String): String {
val node = config.get(key)
return node.asText()
private fun getProperty(config: JsonNode, @Nonnull key: String): String? {
val node: JsonNode? = config.get(key)
return node?.asText()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ open class S3StorageOperations(

/** Create a directory object at the specified location. Creates the bucket if necessary. */
override fun createBucketIfNotExists() {
val bucket: String = s3Config.bucketName
val bucket: String? = s3Config.bucketName
if (!doesBucketExist(bucket)) {
logger.info { "Bucket $bucket does not exist; creating..." }
s3Client.createBucket(bucket)
Expand Down Expand Up @@ -170,7 +170,7 @@ open class S3StorageOperations(
@Throws(IOException::class)
private fun loadDataIntoBucket(objectPath: String, recordsData: SerializableBuffer): String {
val partSize: Long = DEFAULT_PART_SIZE.toLong()
val bucket: String = s3Config.bucketName
val bucket: String? = s3Config.bucketName
val partId: String = getPartId(objectPath)
val fileExtension: String = getExtension(recordsData.filename)
val fullObjectKey: String =
Expand Down Expand Up @@ -261,7 +261,7 @@ open class S3StorageOperations(
var objects: ObjectListing?
var objectCount = 0

val bucket: String = s3Config.bucketName
val bucket: String? = s3Config.bucketName
objects = s3Client.listObjects(bucket, objectPath)

if (objects != null) {
Expand Down Expand Up @@ -296,7 +296,7 @@ open class S3StorageOperations(
objectPath: String,
pathFormat: String
) {
val bucket: String = s3Config.bucketName
val bucket: String? = s3Config.bucketName
var objects: ObjectListing =
s3Client.listObjects(
ListObjectsRequest()
Expand Down Expand Up @@ -360,7 +360,7 @@ open class S3StorageOperations(
}

override fun cleanUpBucketObject(objectPath: String, stagedFiles: List<String>) {
val bucket: String = s3Config.bucketName
val bucket: String? = s3Config.bucketName
var objects: ObjectListing = s3Client.listObjects(bucket, objectPath)
while (objects.objectSummaries.size > 0) {
val keysToDelete: List<DeleteObjectsRequest.KeyVersion> =
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.integrations.destination.s3

import java.util.Map
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

class BlobDecoratorTest {
@Test
fun testOverwriteMetadata() {
val metadata: MutableMap<String, String> = HashMap()
metadata["amz-foo"] = "oldValue"

BlobDecorator.insertMetadata(metadata, Map.of("foo", "amz-foo"), "foo", "newValue")

Assertions.assertEquals(Map.of("amz-foo", "newValue"), metadata)
}

@Test
fun testNewMetadata() {
val metadata: MutableMap<String, String> = HashMap()
metadata["amz-foo"] = "oldValue"

BlobDecorator.insertMetadata(metadata, Map.of("bar", "amz-bar"), "bar", "newValue")

Assertions.assertEquals(Map.of("amz-foo", "oldValue", "amz-bar", "newValue"), metadata)
}

@Test
fun testSkipMetadata() {
val metadata: MutableMap<String, String> = HashMap()
metadata["amz-foo"] = "oldValue"

BlobDecorator.insertMetadata(metadata, Map.of("foo", "amz-foo"), "bar", "newValue")

Assertions.assertEquals(Map.of("amz-foo", "oldValue"), metadata)
}
}

This file was deleted.

Loading
Loading