From 807867431250b43ee866b51f08be931343f9e21f Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Sat, 23 Mar 2024 15:03:45 -0700 Subject: [PATCH] fix compiler errors --- .../destination/jdbc/copy/StreamCopier.kt | 2 +- .../java/airbyte-cdk/gcs-destinations/build.gradle | 12 ++++++++++++ .../destination/gcs/BaseGcsDestination.kt | 3 ++- .../destination/gcs/GcsDestinationConfig.kt | 10 +++++----- .../destination/gcs/avro/GcsAvroWriter.kt | 3 ++- .../gcs/credential/GcsCredentialConfig.kt | 2 +- .../gcs/credential/GcsHmacKeyCredentialConfig.kt | 2 +- .../destination/gcs/csv/GcsCsvWriter.kt | 3 ++- .../destination/jdbc/copy/gcs/GcsStreamCopier.kt | 5 ----- .../jdbc/copy/gcs/GcsStreamCopierFactory.kt | 2 +- .../gcs/avro/GcsAvroFormatConfigTest.kt | 5 +++-- .../destination/gcs/csv/GcsCsvFormatConfigTest.kt | 5 +++-- .../gcs/jsonl/GcsJsonlFormatConfigTest.kt | 5 +++-- .../gcs/GcsAvroParquetDestinationAcceptanceTest.kt | 4 +--- .../gcs/GcsBaseAvroDestinationAcceptanceTest.kt | 3 +-- .../gcs/GcsBaseCsvDestinationAcceptanceTest.kt | 3 +-- .../gcs/GcsBaseCsvGzipDestinationAcceptanceTest.kt | 1 - .../gcs/GcsBaseJsonlDestinationAcceptanceTest.kt | 1 - .../GcsBaseJsonlGzipDestinationAcceptanceTest.kt | 1 - .../gcs/GcsBaseParquetDestinationAcceptanceTest.kt | 1 - .../gcs/GcsDestinationAcceptanceTest.kt | 10 +++++----- .../destination/s3/S3DestinationConfig.kt | 14 ++++++++------ 22 files changed, 52 insertions(+), 45 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/StreamCopier.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/StreamCopier.kt index f17214751644..76ae3262702f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/StreamCopier.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/StreamCopier.kt @@ -65,5 +65,5 @@ interface StreamCopier { fun prepareStagingFile(): String? /** @return current staging file name */ - val currentFile: String + val currentFile: String? } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle index b7f5b88f9132..c4a2d83106ab 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle @@ -8,6 +8,18 @@ java { } } +compileKotlin { + compilerOptions { + allWarningsAsErrors = false + } +} + +compileTestFixturesKotlin { + compilerOptions { + allWarningsAsErrors = false + } +} + dependencies { implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies') implementation project(':airbyte-cdk:java:airbyte-cdk:core') diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/BaseGcsDestination.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/BaseGcsDestination.kt index 66b6ac380cd1..bd557e1aa53e 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/BaseGcsDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/BaseGcsDestination.kt @@ -12,6 +12,7 @@ import io.airbyte.cdk.integrations.base.Destination import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage import io.airbyte.cdk.integrations.destination.NamingConventionTransformer import io.airbyte.cdk.integrations.destination.record_buffer.BufferStorage +import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.testMultipartUpload import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.testSingleUpload import io.airbyte.cdk.integrations.destination.s3.S3ConsumerFactory @@ -64,7 +65,7 @@ abstract class BaseGcsDestination : BaseConnector(), Destination { outputRecordCollector, GcsStorageOperations(nameTransformer, gcsConfig.s3Client, gcsConfig), nameTransformer, - getCreateFunction(gcsConfig, Function { fileExtension: String? -> FileBuffer(fileExtension) }), + getCreateFunction(gcsConfig, Function { fileExtension: String -> FileBuffer(fileExtension) }), gcsConfig, configuredCatalog) } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt index b8796d6378bf..53465157d658 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt @@ -26,23 +26,23 @@ import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations class GcsDestinationConfig(bucketName: String?, bucketPath: String?, bucketRegion: String?, - val gcsCredentialConfig: GcsCredentialConfig?, + val gcsCredentialConfig: GcsCredentialConfig, formatConfig: S3FormatConfig?) : S3DestinationConfig(GCS_ENDPOINT, bucketName!!, bucketPath!!, bucketRegion, S3DestinationConstants.DEFAULT_PATH_FORMAT, - gcsCredentialConfig.getS3CredentialConfig().orElseThrow(), + gcsCredentialConfig.s3CredentialConfig.orElseThrow(), formatConfig!!, null, null, false, S3StorageOperations.DEFAULT_UPLOAD_THREADS) { override fun createS3Client(): AmazonS3 { - when (gcsCredentialConfig!!.credentialType) { + when (gcsCredentialConfig.credentialType) { GcsCredentialType.HMAC_KEY -> { - val hmacKeyCredential = gcsCredentialConfig as GcsHmacKeyCredentialConfig? - val awsCreds = BasicAWSCredentials(hmacKeyCredential.getHmacKeyAccessId(), hmacKeyCredential.getHmacKeySecret()) + val hmacKeyCredential = gcsCredentialConfig as GcsHmacKeyCredentialConfig + val awsCreds = BasicAWSCredentials(hmacKeyCredential.hmacKeyAccessId, hmacKeyCredential.hmacKeySecret) return AmazonS3ClientBuilder.standard() .withEndpointConfiguration( diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.kt index 1b91fec59671..78a2004857cd 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.kt @@ -14,6 +14,7 @@ import io.airbyte.cdk.integrations.destination.s3.S3Format import io.airbyte.cdk.integrations.destination.s3.avro.AvroRecordFactory import io.airbyte.cdk.integrations.destination.s3.avro.JsonToAvroSchemaConverter import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig +import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter import io.airbyte.protocol.models.v0.AirbyteRecordMessage @@ -57,7 +58,7 @@ class GcsAvroWriter @JvmOverloads constructor(config: GcsDestinationConfig, this.avroRecordFactory = AvroRecordFactory(schema, converter) this.uploadManager = create(config.bucketName, outputPath, s3Client) - .setPartSize(DEFAULT_PART_SIZE_MB.toLong()) + .setPartSize(StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB.toLong()) .get() // We only need one output stream as we only have one input stream. This is reasonably performant. this.outputStream = uploadManager.multiPartOutputStreams[0] diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfig.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfig.kt index 6dccd873059c..1899cb84b52a 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfig.kt @@ -8,5 +8,5 @@ import io.airbyte.cdk.integrations.destination.s3.credential.S3CredentialConfig import java.util.* interface GcsCredentialConfig : BlobStorageCredentialConfig { - val s3CredentialConfig: Optional + val s3CredentialConfig: Optional } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.kt index 71ba77cf4ac1..cbe8b32f6541 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.kt @@ -25,6 +25,6 @@ class GcsHmacKeyCredentialConfig : GcsCredentialConfig { override val credentialType: GcsCredentialType get() = GcsCredentialType.HMAC_KEY - override val s3CredentialConfig: Optional + override val s3CredentialConfig: Optional get() = Optional.of(S3AccessKeyCredentialConfig(hmacKeyAccessId, hmacKeySecret)) } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.kt index d5c4e90446c1..6d8f2524d56a 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.kt @@ -13,6 +13,7 @@ import io.airbyte.cdk.integrations.destination.s3.S3Format import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator.Factory.create import io.airbyte.cdk.integrations.destination.s3.csv.S3CsvFormatConfig +import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter import io.airbyte.protocol.models.v0.AirbyteRecordMessage @@ -51,7 +52,7 @@ class GcsCsvWriter(config: GcsDestinationConfig, outputPath) this.uploadManager = create(config.bucketName, outputPath, s3Client) - .setPartSize(DEFAULT_PART_SIZE_MB.toLong()) + .setPartSize(StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB.toLong()) .get() // We only need one output stream as we only have one input stream. This is reasonably performant. this.outputStream = uploadManager.multiPartOutputStreams[0] diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt index 1ea9dc8537be..857fc51b384c 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt @@ -164,11 +164,6 @@ abstract class GcsStreamCopier(protected val stagingFolder: String, get() =// TODO need to update this method when updating whole class for using GcsWriter null - @VisibleForTesting - fun getGcsStagingFiles(): Set { - return gcsStagingFiles - } - @Throws(SQLException::class) abstract fun copyGcsCsvFileIntoTable(database: JdbcDatabase?, gcsFileLocation: String?, diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.kt index aab2ca4f7b22..4fdd124dc916 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.kt @@ -22,7 +22,7 @@ abstract class GcsStreamCopierFactory : StreamCopierFactory { /** * Used by the copy consumer. */ - override fun create(configuredSchema: String?, + fun create(configuredSchema: String?, gcsConfig: GcsConfig, stagingFolder: String?, configuredStream: ConfiguredAirbyteStream?, diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.kt index fd6266998a7b..bb6bf3156afe 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.kt @@ -8,6 +8,7 @@ import com.google.common.collect.Lists import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig.Companion.parseCodecConfig +import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create import io.airbyte.commons.json.Jsons import org.apache.avro.file.DataFileConstants @@ -108,7 +109,7 @@ internal class GcsAvroFormatConfigTest { .get() val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int - Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes) + Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes) } @Test @@ -126,6 +127,6 @@ internal class GcsAvroFormatConfigTest { .get() val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int - Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes) + Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes) } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.kt index b863e001c814..6962d89f3322 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.kt @@ -8,6 +8,7 @@ import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils import io.airbyte.cdk.integrations.destination.s3.util.Flattening import io.airbyte.cdk.integrations.destination.s3.util.Flattening.Companion.fromValue +import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create import io.airbyte.commons.json.Jsons import org.apache.commons.lang3.reflect.FieldUtils @@ -45,7 +46,7 @@ class GcsCsvFormatConfigTest { .get() val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int - Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes) + Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes) } @Test @@ -63,6 +64,6 @@ class GcsCsvFormatConfigTest { .get() val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int - Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes) + Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes) } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/jsonl/GcsJsonlFormatConfigTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/jsonl/GcsJsonlFormatConfigTest.kt index aa592a3496f6..65bc58382cca 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/jsonl/GcsJsonlFormatConfigTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/jsonl/GcsJsonlFormatConfigTest.kt @@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.destination.gcs.jsonl import com.amazonaws.services.s3.internal.Constants import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils +import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create import io.airbyte.commons.json.Jsons import org.apache.commons.lang3.reflect.FieldUtils @@ -33,7 +34,7 @@ class GcsJsonlFormatConfigTest { .get() val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int - Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes) + Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes) } @Test @@ -51,6 +52,6 @@ class GcsJsonlFormatConfigTest { .get() val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int - Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes) + Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes) } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt index e6b4042eed73..67b681d258bc 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt @@ -6,7 +6,6 @@ package io.airbyte.cdk.integrations.destination.gcs import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.integrations.destination.s3.S3Format import io.airbyte.cdk.integrations.destination.s3.avro.JsonSchemaType -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.cdk.integrations.standardtest.destination.argproviders.NumberDataTypeTestArgumentProvider import io.airbyte.commons.json.Jsons @@ -102,8 +101,7 @@ abstract class GcsAvroParquetDestinationAcceptanceTest(s3Format: S3Format) : Gcs @Throws(IOException::class) private fun readMessagesFromFile(messagesFilename: String): List { - return MoreResources.readResource(messagesFilename).lines() - .map(Function { record: String? -> Jsons.deserialize(record, AirbyteMessage::class.java) }).collect, Any>(Collectors.toList()) + return MoreResources.readResource(messagesFilename).lines().map { Jsons.deserialize(it, AirbyteMessage::class.java) } } @Throws(Exception::class) diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseAvroDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseAvroDestinationAcceptanceTest.kt index ec57d165fd9f..90b9ba441f9e 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseAvroDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseAvroDestinationAcceptanceTest.kt @@ -9,7 +9,6 @@ import io.airbyte.cdk.integrations.destination.s3.S3Format import io.airbyte.cdk.integrations.destination.s3.avro.AvroConstants import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.getFieldNameUpdater import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.pruneAirbyteJson -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator import io.airbyte.commons.json.Jsons @@ -46,7 +45,7 @@ abstract class GcsBaseAvroDestinationAcceptanceTest : GcsAvroParquetDestinationA DataFileReader( SeekableByteArrayInput(`object`.objectContent.readAllBytes()), GenericDatumReader()).use { dataFileReader -> - val jsonReader: ObjectReader = GcsDestinationAcceptanceTest.Companion.MAPPER.reader() + val jsonReader: ObjectReader = MAPPER.reader() while (dataFileReader.hasNext()) { val record = dataFileReader.next() val jsonBytes = AvroConstants.JSON_CONVERTER.convertToJson(record) diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvDestinationAcceptanceTest.kt index 82fa50964a2b..e93b14a779a6 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvDestinationAcceptanceTest.kt @@ -9,7 +9,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode import io.airbyte.cdk.integrations.base.JavaBaseConstants import io.airbyte.cdk.integrations.destination.s3.S3Format import io.airbyte.cdk.integrations.destination.s3.util.Flattening -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.commons.json.Jsons import org.apache.commons.csv.CSVFormat @@ -82,7 +81,7 @@ abstract class GcsBaseCsvDestinationAcceptanceTest : GcsDestinationAcceptanceTes } private fun getJsonNode(input: Map, fieldTypes: Map): JsonNode { - val json: ObjectNode = GcsDestinationAcceptanceTest.Companion.MAPPER.createObjectNode() + val json: ObjectNode = MAPPER.createObjectNode() if (input.containsKey(JavaBaseConstants.COLUMN_NAME_DATA)) { return Jsons.deserialize(input[JavaBaseConstants.COLUMN_NAME_DATA]) diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvGzipDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvGzipDestinationAcceptanceTest.kt index d2f1eb876e25..12e48b5b24bb 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvGzipDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvGzipDestinationAcceptanceTest.kt @@ -6,7 +6,6 @@ package io.airbyte.cdk.integrations.destination.gcs import com.amazonaws.services.s3.model.S3Object import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.integrations.destination.s3.util.Flattening -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.commons.json.Jsons import java.io.IOException diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlDestinationAcceptanceTest.kt index e14a03ab791e..246d2c78c4c3 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlDestinationAcceptanceTest.kt @@ -7,7 +7,6 @@ import com.amazonaws.services.s3.model.S3Object import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.integrations.base.JavaBaseConstants import io.airbyte.cdk.integrations.destination.s3.S3Format -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.commons.json.Jsons import java.io.BufferedReader diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlGzipDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlGzipDestinationAcceptanceTest.kt index 96b55404189f..03e80f206a8f 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlGzipDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlGzipDestinationAcceptanceTest.kt @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.destination.gcs import com.amazonaws.services.s3.model.S3Object import com.fasterxml.jackson.databind.JsonNode -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.commons.json.Jsons import java.io.BufferedReader diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseParquetDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseParquetDestinationAcceptanceTest.kt index 76236cdea83e..a0496011fa9b 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseParquetDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseParquetDestinationAcceptanceTest.kt @@ -11,7 +11,6 @@ import io.airbyte.cdk.integrations.destination.s3.avro.AvroConstants import io.airbyte.cdk.integrations.destination.s3.parquet.S3ParquetWriter.Companion.getHadoopConfig import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.getFieldNameUpdater import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.pruneAirbyteJson -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator import io.airbyte.commons.json.Jsons diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationAcceptanceTest.kt index 31d7ff7751b5..29c234a1f966 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationAcceptanceTest.kt @@ -13,7 +13,6 @@ import com.google.common.collect.ImmutableMap import io.airbyte.cdk.integrations.destination.NamingConventionTransformer import io.airbyte.cdk.integrations.destination.s3.S3Format import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.cdk.integrations.standardtest.destination.comparator.AdvancedTestDataComparator @@ -46,9 +45,9 @@ import java.util.stream.Collectors */ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format) : DestinationAcceptanceTest() { protected var configJson: JsonNode? = null - protected var config: GcsDestinationConfig? = null - protected var s3Client: AmazonS3? = null - protected var nameTransformer: NamingConventionTransformer? = null + protected lateinit var config: GcsDestinationConfig + protected lateinit var s3Client: AmazonS3 + protected lateinit var nameTransformer: NamingConventionTransformer protected var s3StorageOperations: S3StorageOperations? = null protected val baseConfigJson: JsonNode @@ -62,7 +61,7 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format return configJson!! } - override fun getDefaultSchema(config: JsonNode): String { + override fun getDefaultSchema(config: JsonNode): String? { if (config.has("gcs_bucket_path")) { return config["gcs_bucket_path"].asText() } @@ -245,6 +244,7 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format companion object { protected val LOGGER: Logger = LoggerFactory.getLogger(GcsDestinationAcceptanceTest::class.java) + @JvmStatic protected val MAPPER: ObjectMapper = MoreMappers.initMapper() protected const val SECRET_FILE_PATH: String = "secrets/config.json" diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfig.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfig.kt index 5ab98274589f..5815ff4f5114 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfig.kt @@ -44,12 +44,14 @@ open class S3DestinationConfig { private val lock = Any() var s3Client: AmazonS3 get() { - synchronized(lock) { - if (s3Client == null) { - return resetS3Client() + if (s3Client == null) { + synchronized(lock) { + if (s3Client == null) { + s3Client = resetS3Client() + } } - return s3Client } + return s3Client } private set @@ -84,7 +86,7 @@ open class S3DestinationConfig { pathFormat: String, credentialConfig: S3CredentialConfig, formatConfig: S3FormatConfig, - s3Client: AmazonS3, + s3Client: AmazonS3?, fileNamePattern: String?, checkIntegrity: Boolean, uploadThreadsCount: Int) { @@ -95,7 +97,7 @@ open class S3DestinationConfig { this.pathFormat = pathFormat this.s3CredentialConfig = credentialConfig this.formatConfig = formatConfig - this.s3Client = s3Client + this.s3Client = s3Client ?: resetS3Client() this.fileNamePattern = fileNamePattern this.isCheckIntegrity = checkIntegrity this.uploadThreadsCount = uploadThreadsCount