From a37358d2e78729c1977688e2045328dd1797368b Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Sat, 23 Mar 2024 15:03:45 -0700 Subject: [PATCH] fix compiler errors --- .../src/main/resources/version.properties | 2 +- .../airbyte-cdk/gcs-destinations/build.gradle | 12 ++ .../destination/gcs/avro/GcsAvroWriter.java | 0 .../destination/gcs/csv/GcsCsvWriter.java | 0 .../gcs/parquet/GcsParquetWriter.java | 0 .../destination/gcs/BaseGcsDestination.kt | 65 +++++--- .../destination/gcs/GcsDestinationConfig.kt | 55 ++++--- .../destination/gcs/GcsStorageOperations.kt | 22 +-- .../destination/gcs/avro/GcsAvroWriter.kt | 63 +++++--- .../gcs/credential/GcsCredentialConfig.kt | 2 +- .../gcs/credential/GcsCredentialConfigs.kt | 5 +- .../credential/GcsHmacKeyCredentialConfig.kt | 2 +- .../destination/gcs/csv/GcsCsvWriter.kt | 47 +++--- .../destination/gcs/jsonl/GcsJsonlWriter.kt | 30 ++-- .../gcs/parquet/GcsParquetWriter.kt | 40 +++-- .../destination/gcs/util/GcsS3FileSystem.kt | 10 +- .../destination/gcs/util/GcsUtils.kt | 52 +++++-- .../destination/gcs/writer/BaseGcsWriter.kt | 60 ++++---- .../destination/jdbc/copy/gcs/GcsConfig.kt | 7 +- .../jdbc/copy/gcs/GcsStreamCopier.kt | 135 +++++++++++------ .../jdbc/copy/gcs/GcsStreamCopierFactory.kt | 62 ++++---- .../gcs/GcsDestinationConfigTest.java | 0 .../gcs/GcsDestinationConfigTest.kt | 2 +- .../gcs/avro/GcsAvroFormatConfigTest.kt | 65 ++++---- .../destination/gcs/avro/GcsAvroWriterTest.kt | 37 +++-- .../gcs/csv/GcsCsvFormatConfigTest.kt | 37 +++-- .../gcs/jsonl/GcsJsonlFormatConfigTest.kt | 31 ++-- .../destination/gcs/util/ConfigTestUtils.kt | 6 +- ...GcsAvroParquetDestinationAcceptanceTest.kt | 130 ++++++++++------ .../gcs/GcsAvroTestDataComparator.kt | 22 ++- .../GcsBaseAvroDestinationAcceptanceTest.kt | 62 ++++---- .../GcsBaseCsvDestinationAcceptanceTest.kt | 66 ++++---- ...GcsBaseCsvGzipDestinationAcceptanceTest.kt | 9 +- .../GcsBaseJsonlDestinationAcceptanceTest.kt | 26 ++-- ...sBaseJsonlGzipDestinationAcceptanceTest.kt | 9 +- ...GcsBaseParquetDestinationAcceptanceTest.kt | 69 +++++---- .../gcs/GcsDestinationAcceptanceTest.kt | 141 +++++++++++------- 37 files changed, 866 insertions(+), 517 deletions(-) delete mode 100644 airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.java delete mode 100644 airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.java delete mode 100644 airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/parquet/GcsParquetWriter.java delete mode 100644 airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/java/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfigTest.java diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index f324ef7f66bf..3a7b1b095571 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.28.9 +version=0.28.10 diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle index b7f5b88f9132..c4a2d83106ab 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle @@ -8,6 +8,18 @@ java { } } +compileKotlin { + compilerOptions { + allWarningsAsErrors = false + } +} + +compileTestFixturesKotlin { + compilerOptions { + allWarningsAsErrors = false + } +} + dependencies { implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies') implementation project(':airbyte-cdk:java:airbyte-cdk:core') diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.java b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.java deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.java b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.java deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/parquet/GcsParquetWriter.java b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/parquet/GcsParquetWriter.java deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/BaseGcsDestination.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/BaseGcsDestination.kt index 66b6ac380cd1..249e5ce09a28 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/BaseGcsDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/BaseGcsDestination.kt @@ -12,6 +12,7 @@ import io.airbyte.cdk.integrations.base.Destination import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage import io.airbyte.cdk.integrations.destination.NamingConventionTransformer import io.airbyte.cdk.integrations.destination.record_buffer.BufferStorage +import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.testMultipartUpload import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.testSingleUpload import io.airbyte.cdk.integrations.destination.s3.S3ConsumerFactory @@ -19,24 +20,29 @@ import io.airbyte.cdk.integrations.destination.s3.SerializedBufferFactory.Compan import io.airbyte.protocol.models.v0.AirbyteConnectionStatus import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog -import org.slf4j.Logger -import org.slf4j.LoggerFactory import java.util.function.Consumer import java.util.function.Function +import org.slf4j.Logger +import org.slf4j.LoggerFactory abstract class BaseGcsDestination : BaseConnector(), Destination { private val nameTransformer: NamingConventionTransformer = GcsNameTransformer() override fun check(config: JsonNode): AirbyteConnectionStatus? { try { - val destinationConfig: GcsDestinationConfig = GcsDestinationConfig.Companion.getGcsDestinationConfig(config) - val s3Client = destinationConfig.s3Client + val destinationConfig: GcsDestinationConfig = + GcsDestinationConfig.Companion.getGcsDestinationConfig(config) + val s3Client = destinationConfig.getS3Client() // Test single upload (for small files) permissions testSingleUpload(s3Client, destinationConfig.bucketName, destinationConfig.bucketPath) // Test multipart upload with stream transfer manager - testMultipartUpload(s3Client, destinationConfig.bucketName, destinationConfig.bucketPath) + testMultipartUpload( + s3Client, + destinationConfig.bucketName, + destinationConfig.bucketPath + ) return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED) } catch (e: AmazonS3Exception) { @@ -44,34 +50,51 @@ abstract class BaseGcsDestination : BaseConnector(), Destination { val message = getErrorMessage(e.errorCode, 0, e.message, e) emitConfigErrorTrace(e, message) return AirbyteConnectionStatus() - .withStatus(AirbyteConnectionStatus.Status.FAILED) - .withMessage(message) + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage(message) } catch (e: Exception) { - LOGGER.error("Exception attempting to access the Gcs bucket: {}. Please make sure you account has all of these roles: " + EXPECTED_ROLES, e) + LOGGER.error( + "Exception attempting to access the Gcs bucket: {}. Please make sure you account has all of these roles: " + + EXPECTED_ROLES, + e + ) emitConfigErrorTrace(e, e.message) return AirbyteConnectionStatus() - .withStatus(AirbyteConnectionStatus.Status.FAILED) - .withMessage("Could not connect to the Gcs bucket with the provided configuration. \n" + e - .message) + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage( + "Could not connect to the Gcs bucket with the provided configuration. \n" + + e.message + ) } } - override fun getConsumer(config: JsonNode, - configuredCatalog: ConfiguredAirbyteCatalog?, - outputRecordCollector: Consumer?): AirbyteMessageConsumer? { - val gcsConfig: GcsDestinationConfig = GcsDestinationConfig.Companion.getGcsDestinationConfig(config) - return S3ConsumerFactory().create( + override fun getConsumer( + config: JsonNode, + configuredCatalog: ConfiguredAirbyteCatalog, + outputRecordCollector: Consumer? + ): AirbyteMessageConsumer? { + val gcsConfig: GcsDestinationConfig = + GcsDestinationConfig.Companion.getGcsDestinationConfig(config) + return S3ConsumerFactory() + .create( outputRecordCollector, - GcsStorageOperations(nameTransformer, gcsConfig.s3Client, gcsConfig), + GcsStorageOperations(nameTransformer, gcsConfig.getS3Client(), gcsConfig), nameTransformer, - getCreateFunction(gcsConfig, Function { fileExtension: String? -> FileBuffer(fileExtension) }), + getCreateFunction( + gcsConfig, + Function { fileExtension: String -> + FileBuffer(fileExtension) + } + ), gcsConfig, - configuredCatalog) + configuredCatalog + ) } companion object { private val LOGGER: Logger = LoggerFactory.getLogger(BaseGcsDestination::class.java) - const val EXPECTED_ROLES: String = ("storage.multipartUploads.abort, storage.multipartUploads.create, " - + "storage.objects.create, storage.objects.delete, storage.objects.get, storage.objects.list") + const val EXPECTED_ROLES: String = + ("storage.multipartUploads.abort, storage.multipartUploads.create, " + + "storage.objects.create, storage.objects.delete, storage.objects.get, storage.objects.list") } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt index b8796d6378bf..77795cdf06c5 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt @@ -23,35 +23,47 @@ import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations * Currently we always reuse the S3 client for GCS. So the GCS config extends from the S3 config. * This may change in the future. */ -class GcsDestinationConfig(bucketName: String?, - bucketPath: String?, - bucketRegion: String?, - val gcsCredentialConfig: GcsCredentialConfig?, - formatConfig: S3FormatConfig?) : S3DestinationConfig(GCS_ENDPOINT, +class GcsDestinationConfig( + bucketName: String, + bucketPath: String, + bucketRegion: String?, + val gcsCredentialConfig: GcsCredentialConfig, + formatConfig: S3FormatConfig +) : + S3DestinationConfig( + GCS_ENDPOINT, bucketName!!, bucketPath!!, bucketRegion, S3DestinationConstants.DEFAULT_PATH_FORMAT, - gcsCredentialConfig.getS3CredentialConfig().orElseThrow(), + gcsCredentialConfig.s3CredentialConfig.orElseThrow(), formatConfig!!, null, null, false, - S3StorageOperations.DEFAULT_UPLOAD_THREADS) { + S3StorageOperations.DEFAULT_UPLOAD_THREADS + ) { override fun createS3Client(): AmazonS3 { - when (gcsCredentialConfig!!.credentialType) { + when (gcsCredentialConfig.credentialType) { GcsCredentialType.HMAC_KEY -> { - val hmacKeyCredential = gcsCredentialConfig as GcsHmacKeyCredentialConfig? - val awsCreds = BasicAWSCredentials(hmacKeyCredential.getHmacKeyAccessId(), hmacKeyCredential.getHmacKeySecret()) + val hmacKeyCredential = gcsCredentialConfig as GcsHmacKeyCredentialConfig + val awsCreds = + BasicAWSCredentials( + hmacKeyCredential.hmacKeyAccessId, + hmacKeyCredential.hmacKeySecret + ) return AmazonS3ClientBuilder.standard() - .withEndpointConfiguration( - AwsClientBuilder.EndpointConfiguration(GCS_ENDPOINT, bucketRegion)) - .withCredentials(AWSStaticCredentialsProvider(awsCreds)) - .build() + .withEndpointConfiguration( + AwsClientBuilder.EndpointConfiguration(GCS_ENDPOINT, bucketRegion) + ) + .withCredentials(AWSStaticCredentialsProvider(awsCreds)) + .build() } - - else -> throw IllegalArgumentException("Unsupported credential type: " + gcsCredentialConfig.credentialType!!.name) + else -> + throw IllegalArgumentException( + "Unsupported credential type: " + gcsCredentialConfig.credentialType!!.name + ) } } @@ -60,11 +72,12 @@ class GcsDestinationConfig(bucketName: String?, fun getGcsDestinationConfig(config: JsonNode): GcsDestinationConfig { return GcsDestinationConfig( - config["gcs_bucket_name"].asText(), - config["gcs_bucket_path"].asText(), - config["gcs_bucket_region"].asText(), - GcsCredentialConfigs.getCredentialConfig(config), - getS3FormatConfig(config)) + config["gcs_bucket_name"].asText(), + config["gcs_bucket_path"].asText(), + config["gcs_bucket_region"].asText(), + GcsCredentialConfigs.getCredentialConfig(config), + getS3FormatConfig(config) + ) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsStorageOperations.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsStorageOperations.kt index 70c8cb4e874d..c1948134cba6 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsStorageOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsStorageOperations.kt @@ -11,21 +11,25 @@ import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations import org.slf4j.Logger import org.slf4j.LoggerFactory -class GcsStorageOperations(nameTransformer: NamingConventionTransformer?, - s3Client: AmazonS3?, - s3Config: S3DestinationConfig?) : S3StorageOperations(nameTransformer!!, s3Client!!, s3Config!!) { - /** - * GCS only supports the legacy AmazonS3#doesBucketExist method. - */ +class GcsStorageOperations( + nameTransformer: NamingConventionTransformer, + s3Client: AmazonS3, + s3Config: S3DestinationConfig +) : S3StorageOperations(nameTransformer!!, s3Client!!, s3Config!!) { + /** GCS only supports the legacy AmazonS3#doesBucketExist method. */ override fun doesBucketExist(bucket: String?): Boolean { return s3Client.doesBucketExist(bucket) } /** - * This method is overridden because GCS doesn't accept request to delete multiple objects. The only - * difference is that the AmazonS3#deleteObjects method is replaced with AmazonS3#deleteObject. + * This method is overridden because GCS doesn't accept request to delete multiple objects. The + * only difference is that the AmazonS3#deleteObjects method is replaced with + * AmazonS3#deleteObject. */ - override fun cleanUpObjects(bucket: String?, keysToDelete: List) { + override fun cleanUpObjects( + bucket: String?, + keysToDelete: List + ) { for (keyToDelete in keysToDelete) { LOGGER.info("Deleting object {}", keyToDelete.key) s3Client.deleteObject(bucket, keyToDelete.key) diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.kt index 1b91fec59671..c22c96e32697 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.kt @@ -14,26 +14,31 @@ import io.airbyte.cdk.integrations.destination.s3.S3Format import io.airbyte.cdk.integrations.destination.s3.avro.AvroRecordFactory import io.airbyte.cdk.integrations.destination.s3.avro.JsonToAvroSchemaConverter import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig +import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter import io.airbyte.protocol.models.v0.AirbyteRecordMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream +import java.io.IOException +import java.sql.Timestamp +import java.util.* import org.apache.avro.file.DataFileWriter import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericDatumWriter import org.slf4j.Logger import org.slf4j.LoggerFactory import tech.allegro.schema.json2avro.converter.JsonAvroConverter -import java.io.IOException -import java.sql.Timestamp -import java.util.* -class GcsAvroWriter @JvmOverloads constructor(config: GcsDestinationConfig, - s3Client: AmazonS3, - configuredStream: ConfiguredAirbyteStream, - uploadTimestamp: Timestamp, - converter: JsonAvroConverter?, - jsonSchema: JsonNode? = null) : BaseGcsWriter(config, s3Client, configuredStream), DestinationFileWriter { +class GcsAvroWriter +@JvmOverloads +constructor( + config: GcsDestinationConfig, + s3Client: AmazonS3, + configuredStream: ConfiguredAirbyteStream, + uploadTimestamp: Timestamp, + converter: JsonAvroConverter?, + jsonSchema: JsonNode? = null +) : BaseGcsWriter(config, s3Client, configuredStream), DestinationFileWriter { private val avroRecordFactory: AvroRecordFactory private val uploadManager: StreamTransferManager private val outputStream: MultiPartOutputStream @@ -42,30 +47,48 @@ class GcsAvroWriter @JvmOverloads constructor(config: GcsDestinationConfig, override val outputPath: String init { - val schema = if (jsonSchema == null - ) GcsUtils.getDefaultAvroSchema(stream.name, stream.namespace, true, false) - else JsonToAvroSchemaConverter().getAvroSchema(jsonSchema, stream.name, - stream.namespace, true, false, false, true) + val schema = + if (jsonSchema == null) + GcsUtils.getDefaultAvroSchema(stream.name, stream.namespace, true, false) + else + JsonToAvroSchemaConverter() + .getAvroSchema( + jsonSchema, + stream.name, + stream.namespace, + true, + false, + false, + true + ) LOGGER.info("Avro schema for stream {}: {}", stream.name, schema!!.toString(false)) - val outputFilename: String = BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.AVRO) + val outputFilename: String = + BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.AVRO) outputPath = java.lang.String.join("/", outputPrefix, outputFilename) fileLocation = String.format("gs://%s/%s", config.bucketName, outputPath) - LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.name, config.bucketName, - outputPath) + LOGGER.info( + "Full GCS path for stream '{}': {}/{}", + stream.name, + config.bucketName, + outputPath + ) this.avroRecordFactory = AvroRecordFactory(schema, converter) - this.uploadManager = create(config.bucketName, outputPath, s3Client) - .setPartSize(DEFAULT_PART_SIZE_MB.toLong()) + this.uploadManager = + create(config.bucketName, outputPath, s3Client) + .setPartSize(StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB.toLong()) .get() - // We only need one output stream as we only have one input stream. This is reasonably performant. + // We only need one output stream as we only have one input stream. This is reasonably + // performant. this.outputStream = uploadManager.multiPartOutputStreams[0] val formatConfig = config.formatConfig as S3AvroFormatConfig // The DataFileWriter always uses binary encoding. // If json encoding is needed in the future, use the GenericDatumWriter directly. - this.dataFileWriter = DataFileWriter(GenericDatumWriter()) + this.dataFileWriter = + DataFileWriter(GenericDatumWriter()) .setCodec(formatConfig.codecFactory) .create(schema, outputStream) } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfig.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfig.kt index 6dccd873059c..1899cb84b52a 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfig.kt @@ -8,5 +8,5 @@ import io.airbyte.cdk.integrations.destination.s3.credential.S3CredentialConfig import java.util.* interface GcsCredentialConfig : BlobStorageCredentialConfig { - val s3CredentialConfig: Optional + val s3CredentialConfig: Optional } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfigs.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfigs.kt index 1006c84f8a43..62a2349df970 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfigs.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfigs.kt @@ -10,7 +10,10 @@ import java.util.* object GcsCredentialConfigs { fun getCredentialConfig(config: JsonNode): GcsCredentialConfig { val credentialConfig = config["credential"] - val credentialType = GcsCredentialType.valueOf(credentialConfig["credential_type"].asText().uppercase(Locale.getDefault())) + val credentialType = + GcsCredentialType.valueOf( + credentialConfig["credential_type"].asText().uppercase(Locale.getDefault()) + ) if (credentialType == GcsCredentialType.HMAC_KEY) { return GcsHmacKeyCredentialConfig(credentialConfig) diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.kt index 71ba77cf4ac1..cbe8b32f6541 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.kt @@ -25,6 +25,6 @@ class GcsHmacKeyCredentialConfig : GcsCredentialConfig { override val credentialType: GcsCredentialType get() = GcsCredentialType.HMAC_KEY - override val s3CredentialConfig: Optional + override val s3CredentialConfig: Optional get() = Optional.of(S3AccessKeyCredentialConfig(hmacKeyAccessId, hmacKeySecret)) } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.kt index d5c4e90446c1..ac85087a0ea2 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.kt @@ -13,25 +13,28 @@ import io.airbyte.cdk.integrations.destination.s3.S3Format import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator.Factory.create import io.airbyte.cdk.integrations.destination.s3.csv.S3CsvFormatConfig +import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter import io.airbyte.protocol.models.v0.AirbyteRecordMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream -import org.apache.commons.csv.CSVFormat -import org.apache.commons.csv.CSVPrinter -import org.apache.commons.csv.QuoteMode -import org.slf4j.Logger -import org.slf4j.LoggerFactory import java.io.IOException import java.io.PrintWriter import java.nio.charset.StandardCharsets import java.sql.Timestamp import java.util.* +import org.apache.commons.csv.CSVFormat +import org.apache.commons.csv.CSVPrinter +import org.apache.commons.csv.QuoteMode +import org.slf4j.Logger +import org.slf4j.LoggerFactory -class GcsCsvWriter(config: GcsDestinationConfig, - s3Client: AmazonS3, - configuredStream: ConfiguredAirbyteStream, - uploadTimestamp: Timestamp) : BaseGcsWriter(config, s3Client, configuredStream), DestinationFileWriter { +class GcsCsvWriter( + config: GcsDestinationConfig, + s3Client: AmazonS3, + configuredStream: ConfiguredAirbyteStream, + uploadTimestamp: Timestamp +) : BaseGcsWriter(config, s3Client, configuredStream), DestinationFileWriter { private val csvSheetGenerator: CsvSheetGenerator private val uploadManager: StreamTransferManager private val outputStream: MultiPartOutputStream @@ -43,21 +46,31 @@ class GcsCsvWriter(config: GcsDestinationConfig, val formatConfig = config.formatConfig as S3CsvFormatConfig this.csvSheetGenerator = create(configuredStream.stream.jsonSchema, formatConfig) - val outputFilename: String = BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.CSV) + val outputFilename: String = + BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.CSV) outputPath = java.lang.String.join("/", outputPrefix, outputFilename) fileLocation = String.format("gs://%s/%s", config.bucketName, outputPath) - LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.name, config.bucketName, - outputPath) + LOGGER.info( + "Full GCS path for stream '{}': {}/{}", + stream.name, + config.bucketName, + outputPath + ) - this.uploadManager = create(config.bucketName, outputPath, s3Client) - .setPartSize(DEFAULT_PART_SIZE_MB.toLong()) + this.uploadManager = + create(config.bucketName, outputPath, s3Client) + .setPartSize(StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB.toLong()) .get() - // We only need one output stream as we only have one input stream. This is reasonably performant. + // We only need one output stream as we only have one input stream. This is reasonably + // performant. this.outputStream = uploadManager.multiPartOutputStreams[0] - this.csvPrinter = CSVPrinter(PrintWriter(outputStream, true, StandardCharsets.UTF_8), + this.csvPrinter = + CSVPrinter( + PrintWriter(outputStream, true, StandardCharsets.UTF_8), CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL) - .withHeader(*csvSheetGenerator.headerRow.toTypedArray())) + .withHeader(*csvSheetGenerator.getHeaderRow().toTypedArray()) + ) } @Throws(IOException::class) diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/jsonl/GcsJsonlWriter.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/jsonl/GcsJsonlWriter.kt index 9a3f6aeb6009..0cd765543d46 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/jsonl/GcsJsonlWriter.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/jsonl/GcsJsonlWriter.kt @@ -18,18 +18,20 @@ import io.airbyte.commons.jackson.MoreMappers import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.AirbyteRecordMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream -import org.slf4j.Logger -import org.slf4j.LoggerFactory import java.io.IOException import java.io.PrintWriter import java.nio.charset.StandardCharsets import java.sql.Timestamp import java.util.* +import org.slf4j.Logger +import org.slf4j.LoggerFactory -class GcsJsonlWriter(config: GcsDestinationConfig, - s3Client: AmazonS3, - configuredStream: ConfiguredAirbyteStream, - uploadTimestamp: Timestamp) : BaseGcsWriter(config, s3Client, configuredStream), DestinationFileWriter { +class GcsJsonlWriter( + config: GcsDestinationConfig, + s3Client: AmazonS3, + configuredStream: ConfiguredAirbyteStream, + uploadTimestamp: Timestamp +) : BaseGcsWriter(config, s3Client, configuredStream), DestinationFileWriter { private val uploadManager: StreamTransferManager private val outputStream: MultiPartOutputStream private val printWriter: PrintWriter @@ -37,16 +39,22 @@ class GcsJsonlWriter(config: GcsDestinationConfig, override val outputPath: String init { - val outputFilename: String = BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.JSONL) + val outputFilename: String = + BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.JSONL) outputPath = java.lang.String.join("/", outputPrefix, outputFilename) fileLocation = String.format("gs://%s/%s", config.bucketName, outputPath) - LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.name, config.bucketName, outputPath) + LOGGER.info( + "Full GCS path for stream '{}': {}/{}", + stream.name, + config.bucketName, + outputPath + ) - this.uploadManager = create(config.bucketName, outputPath, s3Client) - .get() + this.uploadManager = create(config.bucketName, outputPath, s3Client).get() - // We only need one output stream as we only have one input stream. This is reasonably performant. + // We only need one output stream as we only have one input stream. This is reasonably + // performant. this.outputStream = uploadManager.multiPartOutputStreams[0] this.printWriter = PrintWriter(outputStream, true, StandardCharsets.UTF_8) } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/parquet/GcsParquetWriter.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/parquet/GcsParquetWriter.kt index 62e548e4daf7..1cc78d4f7511 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/parquet/GcsParquetWriter.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/parquet/GcsParquetWriter.kt @@ -16,6 +16,10 @@ import io.airbyte.cdk.integrations.destination.s3.parquet.S3ParquetFormatConfig import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter import io.airbyte.protocol.models.v0.AirbyteRecordMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream +import java.io.IOException +import java.net.URI +import java.sql.Timestamp +import java.util.* import org.apache.avro.Schema import org.apache.avro.generic.GenericData import org.apache.hadoop.conf.Configuration @@ -26,28 +30,33 @@ import org.apache.parquet.hadoop.util.HadoopOutputFile import org.slf4j.Logger import org.slf4j.LoggerFactory import tech.allegro.schema.json2avro.converter.JsonAvroConverter -import java.io.IOException -import java.net.URI -import java.sql.Timestamp -import java.util.* -class GcsParquetWriter(config: GcsDestinationConfig, - s3Client: AmazonS3, - configuredStream: ConfiguredAirbyteStream, - uploadTimestamp: Timestamp, - schema: Schema?, - converter: JsonAvroConverter?) : BaseGcsWriter(config, s3Client, configuredStream), DestinationFileWriter { +class GcsParquetWriter( + config: GcsDestinationConfig, + s3Client: AmazonS3, + configuredStream: ConfiguredAirbyteStream, + uploadTimestamp: Timestamp, + schema: Schema?, + converter: JsonAvroConverter? +) : BaseGcsWriter(config, s3Client, configuredStream), DestinationFileWriter { private val parquetWriter: ParquetWriter private val avroRecordFactory: AvroRecordFactory override val fileLocation: String override val outputPath: String init { - val outputFilename: String = BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.PARQUET) + val outputFilename: String = + BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.PARQUET) outputPath = java.lang.String.join("/", outputPrefix, outputFilename) - LOGGER.info("Storage path for stream '{}': {}/{}", stream.name, config.bucketName, outputPath) + LOGGER.info( + "Storage path for stream '{}': {}/{}", + stream.name, + config.bucketName, + outputPath + ) - fileLocation = String.format("s3a://%s/%s/%s", config.bucketName, outputPrefix, outputFilename) + fileLocation = + String.format("s3a://%s/%s/%s", config.bucketName, outputPrefix, outputFilename) val uri = URI(fileLocation) val path = Path(uri) @@ -55,7 +64,10 @@ class GcsParquetWriter(config: GcsDestinationConfig, val formatConfig = config.formatConfig as S3ParquetFormatConfig val hadoopConfig = getHadoopConfig(config) - this.parquetWriter = AvroParquetWriter.builder(HadoopOutputFile.fromPath(path, hadoopConfig)) + this.parquetWriter = + AvroParquetWriter.builder( + HadoopOutputFile.fromPath(path, hadoopConfig) + ) .withSchema(schema) .withCompressionCodec(formatConfig.compressionCodec) .withRowGroupSize(formatConfig.blockSize) diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/util/GcsS3FileSystem.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/util/GcsS3FileSystem.kt index 9620b7e8a0ee..8c2ad5ae911c 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/util/GcsS3FileSystem.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/util/GcsS3FileSystem.kt @@ -3,17 +3,15 @@ */ package io.airbyte.cdk.integrations.destination.gcs.util +import java.io.IOException import org.apache.hadoop.fs.s3a.Retries import org.apache.hadoop.fs.s3a.S3AFileSystem -import java.io.IOException -/** - * Patch [S3AFileSystem] to make it work for GCS. - */ +/** Patch [S3AFileSystem] to make it work for GCS. */ class GcsS3FileSystem : S3AFileSystem() { /** - * Method `doesBucketExistV2` used in the [S3AFileSystem.verifyBucketExistsV2] does not - * work for GCS. + * Method `doesBucketExistV2` used in the [S3AFileSystem.verifyBucketExistsV2] does not work for + * GCS. */ @Retries.RetryTranslated @Throws(IOException::class) diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/util/GcsUtils.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/util/GcsUtils.kt index 8b2461dba105..fb65d0b98f83 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/util/GcsUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/util/GcsUtils.kt @@ -13,14 +13,19 @@ import org.slf4j.LoggerFactory object GcsUtils { private val LOGGER: Logger = LoggerFactory.getLogger(GcsUtils::class.java) - private val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) - private val TIMESTAMP_MILLIS_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) - private val NULLABLE_TIMESTAMP_MILLIS: Schema = SchemaBuilder.builder().unionOf().nullType().and().type(TIMESTAMP_MILLIS_SCHEMA).endUnion() + private val UUID_SCHEMA: Schema = + LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) + private val TIMESTAMP_MILLIS_SCHEMA: Schema = + LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) + private val NULLABLE_TIMESTAMP_MILLIS: Schema = + SchemaBuilder.builder().unionOf().nullType().and().type(TIMESTAMP_MILLIS_SCHEMA).endUnion() - fun getDefaultAvroSchema(name: String?, - namespace: String?, - appendAirbyteFields: Boolean, - useDestinationsV2Columns: Boolean): Schema? { + fun getDefaultAvroSchema( + name: String, + namespace: String, + appendAirbyteFields: Boolean, + useDestinationsV2Columns: Boolean + ): Schema? { LOGGER.info("Default schema.") val stdName = AvroConstants.NAME_TRANSFORMER.getIdentifier(name!!) val stdNamespace = AvroConstants.NAME_TRANSFORMER.getNamespace(namespace!!) @@ -36,17 +41,38 @@ object GcsUtils { var assembler = builder.fields() if (useDestinationsV2Columns) { if (appendAirbyteFields) { - assembler = assembler.name(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID).type(UUID_SCHEMA).noDefault() - assembler = assembler.name(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT).type(TIMESTAMP_MILLIS_SCHEMA).noDefault() - assembler = assembler.name(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT).type(NULLABLE_TIMESTAMP_MILLIS).withDefault(null) + assembler = + assembler + .name(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID) + .type(UUID_SCHEMA) + .noDefault() + assembler = + assembler + .name(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT) + .type(TIMESTAMP_MILLIS_SCHEMA) + .noDefault() + assembler = + assembler + .name(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT) + .type(NULLABLE_TIMESTAMP_MILLIS) + .withDefault(null) } } else { if (appendAirbyteFields) { - assembler = assembler.name(JavaBaseConstants.COLUMN_NAME_AB_ID).type(UUID_SCHEMA).noDefault() - assembler = assembler.name(JavaBaseConstants.COLUMN_NAME_EMITTED_AT).type(TIMESTAMP_MILLIS_SCHEMA).noDefault() + assembler = + assembler + .name(JavaBaseConstants.COLUMN_NAME_AB_ID) + .type(UUID_SCHEMA) + .noDefault() + assembler = + assembler + .name(JavaBaseConstants.COLUMN_NAME_EMITTED_AT) + .type(TIMESTAMP_MILLIS_SCHEMA) + .noDefault() } } - assembler = assembler.name(JavaBaseConstants.COLUMN_NAME_DATA).type().stringType().noDefault() + assembler = + assembler.name(JavaBaseConstants.COLUMN_NAME_DATA).type().stringType().noDefault() return assembler.endRecord() } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/writer/BaseGcsWriter.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/writer/BaseGcsWriter.kt index 64e7379c4d75..636345ece2fa 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/writer/BaseGcsWriter.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/writer/BaseGcsWriter.kt @@ -14,33 +14,34 @@ import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream import io.airbyte.protocol.models.v0.DestinationSyncMode -import org.slf4j.Logger -import org.slf4j.LoggerFactory import java.io.IOException import java.sql.Timestamp import java.text.DateFormat import java.text.SimpleDateFormat import java.util.* +import org.slf4j.Logger +import org.slf4j.LoggerFactory /** * The base implementation takes care of the following: * - * * Create shared instance variables. - * * Create the bucket and prepare the bucket path. - * + * * Create shared instance variables. + * * Create the bucket and prepare the bucket path. */ -abstract class BaseGcsWriter protected constructor(protected val config: GcsDestinationConfig, - protected val s3Client: AmazonS3, - configuredStream: ConfiguredAirbyteStream) : DestinationFileWriter { +abstract class BaseGcsWriter +protected constructor( + protected val config: GcsDestinationConfig, + protected val s3Client: AmazonS3, + configuredStream: ConfiguredAirbyteStream +) : DestinationFileWriter { protected val stream: AirbyteStream = configuredStream.stream - protected val syncMode: DestinationSyncMode = configuredStream.destinationSyncMode + protected val syncMode: DestinationSyncMode? = configuredStream.destinationSyncMode protected val outputPrefix: String = getOutputPrefix(config.bucketPath, stream) /** * - * * 1. Create bucket if necessary. - * * 2. Under OVERWRITE mode, delete all objects with the output prefix. - * + * * 1. Create bucket if necessary. + * * 2. Under OVERWRITE mode, delete all objects with the output prefix. */ @Throws(IOException::class) override fun initialize() { @@ -55,20 +56,25 @@ abstract class BaseGcsWriter protected constructor(protected val config: GcsDest if (syncMode == DestinationSyncMode.OVERWRITE) { LOGGER.info("Overwrite mode") val keysToDelete: MutableList = LinkedList() - val objects = s3Client.listObjects(bucket, outputPrefix) - .objectSummaries + val objects = s3Client.listObjects(bucket, outputPrefix).objectSummaries for (`object` in objects) { keysToDelete.add(DeleteObjectsRequest.KeyVersion(`object`.key)) } if (keysToDelete.size > 0) { - LOGGER.info("Purging non-empty output path for stream '{}' under OVERWRITE mode...", stream.name) + LOGGER.info( + "Purging non-empty output path for stream '{}' under OVERWRITE mode...", + stream.name + ) // Google Cloud Storage doesn't accept request to delete multiple objects for (keyToDelete in keysToDelete) { s3Client.deleteObject(bucket, keyToDelete.key) } - LOGGER.info("Deleted {} file(s) for stream '{}'.", keysToDelete.size, - stream.name) + LOGGER.info( + "Deleted {} file(s) for stream '{}'.", + keysToDelete.size, + stream.name + ) } LOGGER.info("Overwrite is finished") } @@ -106,17 +112,13 @@ abstract class BaseGcsWriter protected constructor(protected val config: GcsDest } } - /** - * Operations that will run when the write succeeds. - */ + /** Operations that will run when the write succeeds. */ @Throws(IOException::class) protected open fun closeWhenSucceed() { // Do nothing by default } - /** - * Operations that will run when the write fails. - */ + /** Operations that will run when the write fails. */ @Throws(IOException::class) protected open fun closeWhenFail() { // Do nothing by default @@ -127,13 +129,15 @@ abstract class BaseGcsWriter protected constructor(protected val config: GcsDest // Filename: __0. fun getOutputFilename(timestamp: Timestamp, format: S3Format): String { - val formatter: DateFormat = SimpleDateFormat(S3DestinationConstants.YYYY_MM_DD_FORMAT_STRING) + val formatter: DateFormat = + SimpleDateFormat(S3DestinationConstants.YYYY_MM_DD_FORMAT_STRING) formatter.timeZone = TimeZone.getTimeZone("UTC") return String.format( - "%s_%d_0.%s", - formatter.format(timestamp), - timestamp.time, - format.fileExtension) + "%s_%d_0.%s", + formatter.format(timestamp), + timestamp.time, + format.fileExtension + ) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsConfig.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsConfig.kt index 4919c9696a70..f33bf8266d0d 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsConfig.kt @@ -9,9 +9,10 @@ class GcsConfig(val projectId: String, val bucketName: String, val credentialsJs companion object { fun getGcsConfig(config: JsonNode): GcsConfig { return GcsConfig( - config["loading_method"]["project_id"].asText(), - config["loading_method"]["bucket_name"].asText(), - config["loading_method"]["credentials_json"].asText()) + config["loading_method"]["project_id"].asText(), + config["loading_method"]["bucket_name"].asText(), + config["loading_method"]["credentials_json"].asText() + ) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt index 1ea9dc8537be..b6f23623eff6 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt @@ -19,10 +19,6 @@ import io.airbyte.cdk.integrations.destination.jdbc.copy.StreamCopier import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.AirbyteRecordMessage import io.airbyte.protocol.models.v0.DestinationSyncMode -import org.apache.commons.csv.CSVFormat -import org.apache.commons.csv.CSVPrinter -import org.slf4j.Logger -import org.slf4j.LoggerFactory import java.io.ByteArrayInputStream import java.io.IOException import java.io.InputStream @@ -33,25 +29,39 @@ import java.sql.SQLException import java.sql.Timestamp import java.time.Instant import java.util.* +import org.apache.commons.csv.CSVFormat +import org.apache.commons.csv.CSVPrinter +import org.slf4j.Logger +import org.slf4j.LoggerFactory -abstract class GcsStreamCopier(protected val stagingFolder: String, - private val destSyncMode: DestinationSyncMode, - protected val schemaName: String, - protected val streamName: String, - private val storageClient: Storage, - protected val db: JdbcDatabase, - protected val gcsConfig: GcsConfig, - private val nameTransformer: StandardNameTransformer, - private val sqlOperations: SqlOperations) : StreamCopier { - @get:VisibleForTesting - val tmpTableName: String = nameTransformer.getTmpTableName(streamName) +abstract class GcsStreamCopier( + protected val stagingFolder: String, + private val destSyncMode: DestinationSyncMode, + protected val schemaName: String, + protected val streamName: String, + private val storageClient: Storage, + protected val db: JdbcDatabase, + protected val gcsConfig: GcsConfig, + private val nameTransformer: StandardNameTransformer, + private val sqlOperations: SqlOperations +) : StreamCopier { + @get:VisibleForTesting val tmpTableName: String = nameTransformer.getTmpTableName(streamName) protected val gcsStagingFiles: MutableSet = HashSet() - protected var filenameGenerator: StagingFilenameGenerator = StagingFilenameGenerator(streamName, GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES.toLong()) + protected var filenameGenerator: StagingFilenameGenerator = + StagingFilenameGenerator( + streamName, + GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES.toLong() + ) private val channels = HashMap() private val csvPrinters = HashMap() private fun prepareGcsStagingFile(): String { - return java.lang.String.join("/", stagingFolder, schemaName, filenameGenerator.stagingFilename) + return java.lang.String.join( + "/", + stagingFolder, + schemaName, + filenameGenerator.stagingFilename + ) } override fun prepareStagingFile(): String? { @@ -78,9 +88,11 @@ abstract class GcsStreamCopier(protected val stagingFolder: String, @Throws(Exception::class) override fun write(id: UUID?, recordMessage: AirbyteRecordMessage?, gcsFileName: String?) { if (csvPrinters.containsKey(gcsFileName)) { - csvPrinters[gcsFileName]!!.printRecord(id, - Jsons.serialize(recordMessage!!.data), - Timestamp.from(Instant.ofEpochMilli(recordMessage.emittedAt))) + csvPrinters[gcsFileName]!!.printRecord( + id, + Jsons.serialize(recordMessage!!.data), + Timestamp.from(Instant.ofEpochMilli(recordMessage.emittedAt)) + ) } } @@ -103,11 +115,26 @@ abstract class GcsStreamCopier(protected val stagingFolder: String, @Throws(Exception::class) override fun copyStagingFileToTemporaryTable() { - LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}.", tmpTableName, streamName, schemaName) + LOGGER.info( + "Starting copy to tmp table: {} in destination for stream: {}, schema: {}.", + tmpTableName, + streamName, + schemaName + ) for (gcsStagingFile in gcsStagingFiles) { - copyGcsCsvFileIntoTable(db, getFullGcsPath(gcsConfig.bucketName, gcsStagingFile), schemaName, tmpTableName, gcsConfig) + copyGcsCsvFileIntoTable( + db, + getFullGcsPath(gcsConfig.bucketName, gcsStagingFile), + schemaName, + tmpTableName, + gcsConfig + ) } - LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName) + LOGGER.info( + "Copy to tmp table {} in destination for stream {} complete.", + tmpTableName, + streamName + ) } @Throws(Exception::class) @@ -134,7 +161,12 @@ abstract class GcsStreamCopier(protected val stagingFolder: String, @Throws(Exception::class) override fun createTemporaryTable() { - LOGGER.info("Preparing tmp table in destination for stream: {}, schema: {}, tmp table name: {}.", streamName, schemaName, tmpTableName) + LOGGER.info( + "Preparing tmp table in destination for stream: {}, schema: {}, tmp table name: {}.", + streamName, + schemaName, + tmpTableName + ) sqlOperations.createTableIfNotExists(db, schemaName, tmpTableName) } @@ -150,40 +182,48 @@ abstract class GcsStreamCopier(protected val stagingFolder: String, @Throws(Exception::class) override fun generateMergeStatement(destTableName: String?): String? { - LOGGER.info("Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.", tmpTableName, destTableName, schemaName) + LOGGER.info( + "Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.", + tmpTableName, + destTableName, + schemaName + ) val queries = StringBuilder() if (destSyncMode == DestinationSyncMode.OVERWRITE) { queries.append(sqlOperations.truncateTableQuery(db, schemaName, destTableName)) - LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, schema: {}, will be truncated.", destTableName, schemaName) + LOGGER.info( + "Destination OVERWRITE mode detected. Dest table: {}, schema: {}, will be truncated.", + destTableName, + schemaName + ) } queries.append(sqlOperations.insertTableQuery(db, schemaName, tmpTableName, destTableName)) return queries.toString() } override val currentFile: String? - get() =// TODO need to update this method when updating whole class for using GcsWriter - null - - @VisibleForTesting - fun getGcsStagingFiles(): Set { - return gcsStagingFiles - } + get() = // TODO need to update this method when updating whole class for using GcsWriter + null @Throws(SQLException::class) - abstract fun copyGcsCsvFileIntoTable(database: JdbcDatabase?, - gcsFileLocation: String?, - schema: String?, - tableName: String?, - gcsConfig: GcsConfig?) + abstract fun copyGcsCsvFileIntoTable( + database: JdbcDatabase?, + gcsFileLocation: String?, + schema: String?, + tableName: String?, + gcsConfig: GcsConfig? + ) companion object { private val LOGGER: Logger = LoggerFactory.getLogger(GcsStreamCopier::class.java) - // It is optimal to write every 10,000,000 records (BATCH_SIZE * MAX_PER_FILE_PART_COUNT) to a new + // It is optimal to write every 10,000,000 records (BATCH_SIZE * MAX_PER_FILE_PART_COUNT) to + // a new // file. // The BATCH_SIZE is defined in CopyConsumerFactory. // The average size of such a file will be about 1 GB. - // This will make it easier to work with files and speed up the recording of large amounts of data. + // This will make it easier to work with files and speed up the recording of large amounts + // of data. // In addition, for a large number of records, we will not get a drop in the copy request to // QUERY_TIMEOUT when // the records from the file are copied to the staging table. @@ -195,7 +235,9 @@ abstract class GcsStreamCopier(protected val stagingFolder: String, @Throws(IOException::class) fun attemptWriteToPersistence(gcsConfig: GcsConfig) { - val outputTableName = "_airbyte_connection_test_" + UUID.randomUUID().toString().replace("-".toRegex(), "") + val outputTableName = + "_airbyte_connection_test_" + + UUID.randomUUID().toString().replace("-".toRegex(), "") attemptWriteAndDeleteGcsObject(gcsConfig, outputTableName) } @@ -211,13 +253,14 @@ abstract class GcsStreamCopier(protected val stagingFolder: String, @Throws(IOException::class) fun getStorageClient(gcsConfig: GcsConfig): Storage { - val credentialsInputStream: InputStream = ByteArrayInputStream(gcsConfig.credentialsJson.toByteArray(StandardCharsets.UTF_8)) + val credentialsInputStream: InputStream = + ByteArrayInputStream(gcsConfig.credentialsJson.toByteArray(StandardCharsets.UTF_8)) val credentials = GoogleCredentials.fromStream(credentialsInputStream) return StorageOptions.newBuilder() - .setCredentials(credentials) - .setProjectId(gcsConfig.projectId) - .build() - .service + .setCredentials(credentials) + .setProjectId(gcsConfig.projectId) + .build() + .service } } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.kt index aab2ca4f7b22..e397e7022749 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.kt @@ -19,46 +19,58 @@ import java.io.InputStream import java.nio.charset.StandardCharsets abstract class GcsStreamCopierFactory : StreamCopierFactory { - /** - * Used by the copy consumer. - */ - override fun create(configuredSchema: String?, - gcsConfig: GcsConfig, - stagingFolder: String?, - configuredStream: ConfiguredAirbyteStream?, - nameTransformer: StandardNameTransformer?, - db: JdbcDatabase?, - sqlOperations: SqlOperations?): StreamCopier? { + /** Used by the copy consumer. */ + fun create( + configuredSchema: String?, + gcsConfig: GcsConfig, + stagingFolder: String?, + configuredStream: ConfiguredAirbyteStream?, + nameTransformer: StandardNameTransformer?, + db: JdbcDatabase?, + sqlOperations: SqlOperations? + ): StreamCopier? { try { val stream = configuredStream!!.stream val syncMode = configuredStream.destinationSyncMode val schema = getSchema(stream.namespace, configuredSchema!!, nameTransformer!!) - val credentialsInputStream: InputStream = ByteArrayInputStream(gcsConfig.credentialsJson.toByteArray(StandardCharsets.UTF_8)) + val credentialsInputStream: InputStream = + ByteArrayInputStream(gcsConfig.credentialsJson.toByteArray(StandardCharsets.UTF_8)) val credentials = GoogleCredentials.fromStream(credentialsInputStream) - val storageClient = StorageOptions.newBuilder() + val storageClient = + StorageOptions.newBuilder() .setCredentials(credentials) .setProjectId(gcsConfig.projectId) .build() .service - return create(stagingFolder, syncMode, schema, stream.name, storageClient, db, gcsConfig, nameTransformer, sqlOperations) + return create( + stagingFolder, + syncMode, + schema, + stream.name, + storageClient, + db, + gcsConfig, + nameTransformer, + sqlOperations + ) } catch (e: Exception) { throw RuntimeException(e) } } - /** - * For specific copier suppliers to implement. - */ + /** For specific copier suppliers to implement. */ @Throws(Exception::class) - abstract fun create(stagingFolder: String?, - syncMode: DestinationSyncMode?, - schema: String?, - streamName: String?, - storageClient: Storage?, - db: JdbcDatabase?, - gcsConfig: GcsConfig?, - nameTransformer: StandardNameTransformer?, - sqlOperations: SqlOperations?): StreamCopier? + abstract fun create( + stagingFolder: String?, + syncMode: DestinationSyncMode?, + schema: String?, + streamName: String?, + storageClient: Storage?, + db: JdbcDatabase?, + gcsConfig: GcsConfig?, + nameTransformer: StandardNameTransformer?, + sqlOperations: SqlOperations? + ): StreamCopier? } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/java/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfigTest.java b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/java/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfigTest.java deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfigTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfigTest.kt index 934cb22f8815..7ced8b69ac8e 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfigTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfigTest.kt @@ -7,9 +7,9 @@ import io.airbyte.cdk.integrations.destination.gcs.credential.GcsHmacKeyCredenti import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig import io.airbyte.commons.json.Jsons import io.airbyte.commons.resources.MoreResources +import java.io.IOException import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test -import java.io.IOException internal class GcsDestinationConfigTest { @Test diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.kt index fd6266998a7b..0f8713a11362 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.kt @@ -8,6 +8,7 @@ import com.google.common.collect.Lists import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig.Companion.parseCodecConfig +import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create import io.airbyte.commons.json.Jsons import org.apache.avro.file.DataFileConstants @@ -18,24 +19,27 @@ import org.junit.jupiter.api.Test internal class GcsAvroFormatConfigTest { @Test fun testParseCodecConfigNull() { - val nullConfigs: List = Lists.newArrayList("{}", "{ \"codec\": \"no compression\" }") + val nullConfigs: List = + Lists.newArrayList("{}", "{ \"codec\": \"no compression\" }") for (nullConfig in nullConfigs) { Assertions.assertEquals( - DataFileConstants.NULL_CODEC, - parseCodecConfig(Jsons.deserialize(nullConfig)).toString()) + DataFileConstants.NULL_CODEC, + parseCodecConfig(Jsons.deserialize(nullConfig)).toString() + ) } } @Test fun testParseCodecConfigDeflate() { // default compression level 0 - val codecFactory1 = parseCodecConfig( - Jsons.deserialize("{ \"codec\": \"deflate\" }")) + val codecFactory1 = parseCodecConfig(Jsons.deserialize("{ \"codec\": \"deflate\" }")) Assertions.assertEquals("deflate-0", codecFactory1.toString()) // compression level 5 - val codecFactory2 = parseCodecConfig( - Jsons.deserialize("{ \"codec\": \"deflate\", \"compression_level\": 5 }")) + val codecFactory2 = + parseCodecConfig( + Jsons.deserialize("{ \"codec\": \"deflate\", \"compression_level\": 5 }") + ) Assertions.assertEquals("deflate-5", codecFactory2.toString()) } @@ -49,28 +53,29 @@ internal class GcsAvroFormatConfigTest { @Test fun testParseCodecConfigXz() { // default compression level 6 - val codecFactory1 = parseCodecConfig( - Jsons.deserialize("{ \"codec\": \"xz\" }")) + val codecFactory1 = parseCodecConfig(Jsons.deserialize("{ \"codec\": \"xz\" }")) Assertions.assertEquals("xz-6", codecFactory1.toString()) // compression level 7 - val codecFactory2 = parseCodecConfig( - Jsons.deserialize("{ \"codec\": \"xz\", \"compression_level\": 7 }")) + val codecFactory2 = + parseCodecConfig(Jsons.deserialize("{ \"codec\": \"xz\", \"compression_level\": 7 }")) Assertions.assertEquals("xz-7", codecFactory2.toString()) } @Test fun testParseCodecConfigZstandard() { // default compression level 3 - val codecFactory1 = parseCodecConfig( - Jsons.deserialize("{ \"codec\": \"zstandard\" }")) + val codecFactory1 = parseCodecConfig(Jsons.deserialize("{ \"codec\": \"zstandard\" }")) // There is no way to verify the checksum; all relevant methods are private or protected... Assertions.assertEquals("zstandard[3]", codecFactory1.toString()) // compression level 20 - val codecFactory2 = parseCodecConfig( + val codecFactory2 = + parseCodecConfig( Jsons.deserialize( - "{ \"codec\": \"zstandard\", \"compression_level\": 20, \"include_checksum\": true }")) + "{ \"codec\": \"zstandard\", \"compression_level\": 20, \"include_checksum\": true }" + ) + ) // There is no way to verify the checksum; all relevant methods are private or protected... Assertions.assertEquals("zstandard[20]", codecFactory2.toString()) } @@ -93,39 +98,43 @@ internal class GcsAvroFormatConfigTest { @Test @Throws(IllegalAccessException::class) fun testHandlePartSizeConfig() { - val config = ConfigTestUtils.getBaseConfig(Jsons.deserialize("""{ + val config = + ConfigTestUtils.getBaseConfig(Jsons.deserialize("""{ "format_type": "AVRO" }""")) - val gcsDestinationConfig = GcsDestinationConfig - .getGcsDestinationConfig(config) + val gcsDestinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config) ConfigTestUtils.assertBaseConfig(gcsDestinationConfig) - val formatConfig = gcsDestinationConfig.formatConfig + val formatConfig = gcsDestinationConfig.formatConfig!! Assertions.assertEquals("AVRO", formatConfig.format.name) // Assert that is set properly in config - val streamTransferManager = create(gcsDestinationConfig.bucketName, "objectKey", null) - .get() + val streamTransferManager = create(gcsDestinationConfig.bucketName, "objectKey", null).get() val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int - Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes) + Assertions.assertEquals( + Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, + partSizeBytes + ) } @Test @Throws(IllegalAccessException::class) fun testHandleAbsenceOfPartSizeConfig() { - val config = ConfigTestUtils.getBaseConfig(Jsons.deserialize("""{ + val config = + ConfigTestUtils.getBaseConfig(Jsons.deserialize("""{ "format_type": "AVRO" }""")) - val gcsDestinationConfig = GcsDestinationConfig - .getGcsDestinationConfig(config) + val gcsDestinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config) ConfigTestUtils.assertBaseConfig(gcsDestinationConfig) - val streamTransferManager = create(gcsDestinationConfig.bucketName, "objectKey", null) - .get() + val streamTransferManager = create(gcsDestinationConfig.bucketName, "objectKey", null).get() val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int - Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes) + Assertions.assertEquals( + Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, + partSizeBytes + ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriterTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriterTest.kt index c436f0c0d34d..c5473698ef90 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriterTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriterTest.kt @@ -14,12 +14,12 @@ import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream import io.airbyte.protocol.models.v0.SyncMode -import org.junit.jupiter.api.Assertions -import org.junit.jupiter.api.Test -import org.mockito.Mockito import java.io.IOException import java.sql.Timestamp import java.time.Instant +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.mockito.Mockito internal class GcsAvroWriterTest { @Test @@ -27,21 +27,30 @@ internal class GcsAvroWriterTest { fun generatesCorrectObjectPath() { initialize(Jsons.deserialize("{}")) - val writer = GcsAvroWriter( + val writer = + GcsAvroWriter( GcsDestinationConfig( - "fake-bucket", - "fake-bucketPath", - "fake-bucketRegion", - GcsHmacKeyCredentialConfig("fake-access-id", "fake-secret"), - S3AvroFormatConfig(ObjectMapper().createObjectNode())), + "fake-bucket", + "fake-bucketPath", + "fake-bucketRegion", + GcsHmacKeyCredentialConfig("fake-access-id", "fake-secret"), + S3AvroFormatConfig(ObjectMapper().createObjectNode()) + ), Mockito.mock(AmazonS3::class.java, Mockito.RETURNS_DEEP_STUBS), ConfiguredAirbyteStream() - .withStream(AirbyteStream() - .withNamespace("fake-namespace") - .withName("fake-stream").withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH))), + .withStream( + AirbyteStream() + .withNamespace("fake-namespace") + .withName("fake-stream") + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH)) + ), Timestamp.from(Instant.ofEpochMilli(1234)), - null) + null + ) - Assertions.assertEquals("fake-bucketPath/fake-namespace/fake-stream/1970_01_01_1234_0.avro", writer.outputPath) + Assertions.assertEquals( + "fake-bucketPath/fake-namespace/fake-stream/1970_01_01_1234_0.avro", + writer.outputPath + ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.kt index b863e001c814..74abb6ee1925 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.kt @@ -8,6 +8,7 @@ import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils import io.airbyte.cdk.integrations.destination.s3.util.Flattening import io.airbyte.cdk.integrations.destination.s3.util.Flattening.Companion.fromValue +import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create import io.airbyte.commons.json.Jsons import org.apache.commons.lang3.reflect.FieldUtils @@ -30,39 +31,53 @@ class GcsCsvFormatConfigTest { @Test @Throws(IllegalAccessException::class) fun testHandlePartSizeConfig() { - val config = ConfigTestUtils.getBaseConfig(Jsons.deserialize("""{ + val config = + ConfigTestUtils.getBaseConfig( + Jsons.deserialize( + """{ "format_type": "CSV", "flattening": "Root level flattening" -}""")) +}""" + ) + ) val gcsDestinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config) ConfigTestUtils.assertBaseConfig(gcsDestinationConfig) - val formatConfig = gcsDestinationConfig.formatConfig + val formatConfig = gcsDestinationConfig.formatConfig!! Assertions.assertEquals("CSV", formatConfig.format.name) // Assert that is set properly in config - val streamTransferManager = create(gcsDestinationConfig.bucketName, "objectKey", null) - .get() + val streamTransferManager = create(gcsDestinationConfig.bucketName, "objectKey", null).get() val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int - Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes) + Assertions.assertEquals( + Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, + partSizeBytes + ) } @Test @Throws(IllegalAccessException::class) fun testHandleAbsenceOfPartSizeConfig() { - val config = ConfigTestUtils.getBaseConfig(Jsons.deserialize("""{ + val config = + ConfigTestUtils.getBaseConfig( + Jsons.deserialize( + """{ "format_type": "CSV", "flattening": "Root level flattening" -}""")) +}""" + ) + ) val gcsDestinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config) ConfigTestUtils.assertBaseConfig(gcsDestinationConfig) - val streamTransferManager = create(gcsDestinationConfig.bucketName, "objectKey", null) - .get() + val streamTransferManager = create(gcsDestinationConfig.bucketName, "objectKey", null).get() val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int - Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes) + Assertions.assertEquals( + Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, + partSizeBytes + ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/jsonl/GcsJsonlFormatConfigTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/jsonl/GcsJsonlFormatConfigTest.kt index aa592a3496f6..0b0bc3c12247 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/jsonl/GcsJsonlFormatConfigTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/jsonl/GcsJsonlFormatConfigTest.kt @@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.destination.gcs.jsonl import com.amazonaws.services.s3.internal.Constants import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils +import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create import io.airbyte.commons.json.Jsons import org.apache.commons.lang3.reflect.FieldUtils @@ -17,40 +18,44 @@ class GcsJsonlFormatConfigTest { @Test @Throws(IllegalAccessException::class) fun testHandlePartSizeConfig() { - val config = ConfigTestUtils.getBaseConfig(Jsons.deserialize("""{ + val config = + ConfigTestUtils.getBaseConfig(Jsons.deserialize("""{ "format_type": "JSONL" }""")) - val gcsDestinationConfig = GcsDestinationConfig - .getGcsDestinationConfig(config) + val gcsDestinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config) ConfigTestUtils.assertBaseConfig(gcsDestinationConfig) - val formatConfig = gcsDestinationConfig.formatConfig + val formatConfig = gcsDestinationConfig.formatConfig!! Assertions.assertEquals("JSONL", formatConfig.format.name) // Assert that is set properly in config - val streamTransferManager = create(gcsDestinationConfig.bucketName, "objectKey", null) - .get() + val streamTransferManager = create(gcsDestinationConfig.bucketName, "objectKey", null).get() val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int - Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes) + Assertions.assertEquals( + Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, + partSizeBytes + ) } @Test @Throws(IllegalAccessException::class) fun testHandleAbsenceOfPartSizeConfig() { - val config = ConfigTestUtils.getBaseConfig(Jsons.deserialize("""{ + val config = + ConfigTestUtils.getBaseConfig(Jsons.deserialize("""{ "format_type": "JSONL" }""")) - val gcsDestinationConfig = GcsDestinationConfig - .getGcsDestinationConfig(config) + val gcsDestinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config) ConfigTestUtils.assertBaseConfig(gcsDestinationConfig) - val streamTransferManager = create(gcsDestinationConfig.bucketName, "objectKey", null) - .get() + val streamTransferManager = create(gcsDestinationConfig.bucketName, "objectKey", null).get() val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int - Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes) + Assertions.assertEquals( + Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, + partSizeBytes + ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/util/ConfigTestUtils.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/util/ConfigTestUtils.kt index b81d5309042f..85e00456c134 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/util/ConfigTestUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/gcs/util/ConfigTestUtils.kt @@ -10,14 +10,16 @@ import org.junit.jupiter.api.Assertions object ConfigTestUtils { fun getBaseConfig(formatConfig: JsonNode): JsonNode { - return Jsons.deserialize("""{ + return Jsons.deserialize( + """{ "gcs_bucket_name": "test-bucket-name", "gcs_bucket_path": "test_path", "gcs_bucket_region": "us-east-2", "credential": { "credential_type": "HMAC_KEY", "hmac_key_access_id": "some_hmac_key", "hmac_key_secret": "some_key_secret" - }, "format": $formatConfig}""") + }, "format": $formatConfig}""" + ) } fun assertBaseConfig(gcsDestinationConfig: GcsDestinationConfig) { diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt index e6b4042eed73..aebf1d02bf07 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt @@ -6,7 +6,6 @@ package io.airbyte.cdk.integrations.destination.gcs import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.integrations.destination.s3.S3Format import io.airbyte.cdk.integrations.destination.s3.avro.JsonSchemaType -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.cdk.integrations.standardtest.destination.argproviders.NumberDataTypeTestArgumentProvider import io.airbyte.commons.json.Jsons @@ -15,18 +14,19 @@ import io.airbyte.protocol.models.v0.AirbyteCatalog import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.CatalogHelpers -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData -import org.junit.jupiter.api.Assertions -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ArgumentsSource import java.io.IOException import java.util.* import java.util.function.Function import java.util.stream.Collectors import java.util.stream.StreamSupport +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ArgumentsSource -abstract class GcsAvroParquetDestinationAcceptanceTest(s3Format: S3Format) : GcsDestinationAcceptanceTest(s3Format) { +abstract class GcsAvroParquetDestinationAcceptanceTest(s3Format: S3Format) : + GcsDestinationAcceptanceTest(s3Format) { override fun getProtocolVersion(): ProtocolVersion { return ProtocolVersion.V1 } @@ -56,17 +56,25 @@ abstract class GcsAvroParquetDestinationAcceptanceTest(s3Format: S3Format) : Gcs private fun retrieveExpectedDataTypes(stream: AirbyteStream): Map> { val iterableNames = Iterable { stream.jsonSchema["properties"].fieldNames() } - val nameToNode = StreamSupport.stream(iterableNames.spliterator(), false) - .collect(Collectors.toMap( + val nameToNode = + StreamSupport.stream(iterableNames.spliterator(), false) + .collect( + Collectors.toMap( Function.identity(), - Function { name: String -> getJsonNode(stream, name) })) - - return nameToNode - .entries - .stream() - .collect(Collectors.toMap( - Function { obj: Map.Entry -> obj.key }, - Function { entry: Map.Entry -> getExpectedSchemaType(entry.value) })) + Function { name: String -> getJsonNode(stream, name) } + ) + ) + + return nameToNode.entries + .stream() + .collect( + Collectors.toMap( + Function { obj: Map.Entry -> obj.key }, + Function { entry: Map.Entry -> + getExpectedSchemaType(entry.value) + } + ) + ) } private fun getJsonNode(stream: AirbyteStream, name: String): JsonNode { @@ -79,16 +87,24 @@ abstract class GcsAvroParquetDestinationAcceptanceTest(s3Format: S3Format) : Gcs private fun getExpectedSchemaType(fieldDefinition: JsonNode): Set { // The $ref is a migration to V1 data type protocol see well_known_types.yaml - val typeProperty = if (fieldDefinition["type"] == null) fieldDefinition["\$ref"] else fieldDefinition["type"] + val typeProperty = + if (fieldDefinition["type"] == null) fieldDefinition["\$ref"] + else fieldDefinition["type"] val airbyteTypeProperty = fieldDefinition["airbyte_type"] val airbyteTypePropertyText = airbyteTypeProperty?.asText() return Arrays.stream(JsonSchemaType.entries.toTypedArray()) - .filter { value: JsonSchemaType -> value.jsonSchemaType == typeProperty.asText() && compareAirbyteTypes(airbyteTypePropertyText, value) } - .map(JsonSchemaType::avroType) - .collect(Collectors.toSet()) + .filter { value: JsonSchemaType -> + value.jsonSchemaType == typeProperty.asText() && + compareAirbyteTypes(airbyteTypePropertyText, value) + } + .map(JsonSchemaType::avroType) + .collect(Collectors.toSet()) } - private fun compareAirbyteTypes(airbyteTypePropertyText: String?, value: JsonSchemaType): Boolean { + private fun compareAirbyteTypes( + airbyteTypePropertyText: String?, + value: JsonSchemaType + ): Boolean { if (airbyteTypePropertyText == null) { return value.jsonSchemaAirbyteType == null } @@ -97,48 +113,68 @@ abstract class GcsAvroParquetDestinationAcceptanceTest(s3Format: S3Format) : Gcs @Throws(IOException::class) private fun readCatalogFromFile(catalogFilename: String): AirbyteCatalog { - return Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog::class.java) + return Jsons.deserialize( + MoreResources.readResource(catalogFilename), + AirbyteCatalog::class.java + ) } @Throws(IOException::class) private fun readMessagesFromFile(messagesFilename: String): List { - return MoreResources.readResource(messagesFilename).lines() - .map(Function { record: String? -> Jsons.deserialize(record, AirbyteMessage::class.java) }).collect, Any>(Collectors.toList()) + return MoreResources.readResource(messagesFilename).lines().map { + Jsons.deserialize(it, AirbyteMessage::class.java) + } } @Throws(Exception::class) - protected abstract fun retrieveDataTypesFromPersistedFiles(streamName: String?, namespace: String?): Map?> + protected abstract fun retrieveDataTypesFromPersistedFiles( + streamName: String?, + namespace: String? + ): Map?> protected fun getTypes(record: GenericData.Record): Map> { - val fieldList = record - .schema - .fields + val fieldList = + record.schema.fields .stream() .filter { field: Schema.Field -> !field.name().startsWith("_airbyte") } .toList() return if (fieldList.size == 1) { fieldList - .stream() - .collect( - Collectors.toMap( - Function { obj: Schema.Field -> obj.name() }, - Function { field: Schema.Field -> - field.schema().types.stream().map { obj: Schema -> obj.type }.filter { type: Schema.Type -> type != Schema.Type.NULL } - .collect(Collectors.toSet()) - })) + .stream() + .collect( + Collectors.toMap( + Function { obj: Schema.Field -> obj.name() }, + Function { field: Schema.Field -> + field + .schema() + .types + .stream() + .map { obj: Schema -> obj.type } + .filter { type: Schema.Type -> type != Schema.Type.NULL } + .collect(Collectors.toSet()) + } + ) + ) } else { fieldList - .stream() - .collect( - Collectors.toMap( - Function { obj: Schema.Field -> obj.name() }, - Function { field: Schema.Field -> - field.schema().types - .stream().filter { type: Schema -> type.type != Schema.Type.NULL } - .flatMap { type: Schema -> type.elementType.types.stream() }.map { obj: Schema -> obj.type }.filter { type: Schema.Type -> type != Schema.Type.NULL } - .collect(Collectors.toSet()) - })) + .stream() + .collect( + Collectors.toMap( + Function { obj: Schema.Field -> obj.name() }, + Function { field: Schema.Field -> + field + .schema() + .types + .stream() + .filter { type: Schema -> type.type != Schema.Type.NULL } + .flatMap { type: Schema -> type.elementType.types.stream() } + .map { obj: Schema -> obj.type } + .filter { type: Schema.Type -> type != Schema.Type.NULL } + .collect(Collectors.toSet()) + } + ) + ) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroTestDataComparator.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroTestDataComparator.kt index a201a635f32a..7792e2c88996 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroTestDataComparator.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroTestDataComparator.kt @@ -13,7 +13,8 @@ import java.util.* class GcsAvroTestDataComparator : AdvancedTestDataComparator() { override fun compareDateValues(expectedValue: String, actualValue: String): Boolean { val destinationDate = LocalDate.ofEpochDay(actualValue.toLong()) - val expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT)) + val expectedDate = + LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT)) return expectedDate == destinationDate } @@ -25,21 +26,30 @@ class GcsAvroTestDataComparator : AdvancedTestDataComparator() { return ZonedDateTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC) } - override fun compareDateTimeValues(airbyteMessageValue: String, destinationValue: String): Boolean { + override fun compareDateTimeValues( + airbyteMessageValue: String, + destinationValue: String + ): Boolean { val format = DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT) - val dateTime = LocalDateTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC) + val dateTime = + LocalDateTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC) return super.compareDateTimeValues(airbyteMessageValue, format.format(dateTime)) } - override fun compareTimeWithoutTimeZone(airbyteMessageValue: String, destinationValue: String): Boolean { - val destinationDate = LocalTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC) + override fun compareTimeWithoutTimeZone( + airbyteMessageValue: String, + destinationValue: String + ): Boolean { + val destinationDate = + LocalTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC) val expectedDate = LocalTime.parse(airbyteMessageValue, DateTimeFormatter.ISO_TIME) return expectedDate == destinationDate } override fun compareString(expectedValue: JsonNode, actualValue: JsonNode): Boolean { // to handle base64 encoded strings - return expectedValue.asText() == actualValue.asText() || decodeBase64(expectedValue.asText()) == actualValue.asText() + return expectedValue.asText() == actualValue.asText() || + decodeBase64(expectedValue.asText()) == actualValue.asText() } private fun decodeBase64(string: String): String { diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseAvroDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseAvroDestinationAcceptanceTest.kt index ec57d165fd9f..0ca10fc74aa9 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseAvroDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseAvroDestinationAcceptanceTest.kt @@ -9,33 +9,38 @@ import io.airbyte.cdk.integrations.destination.s3.S3Format import io.airbyte.cdk.integrations.destination.s3.avro.AvroConstants import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.getFieldNameUpdater import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.pruneAirbyteJson -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator import io.airbyte.commons.json.Jsons +import java.util.* import org.apache.avro.Schema import org.apache.avro.file.DataFileReader import org.apache.avro.file.SeekableByteArrayInput import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericDatumReader -import java.util.* -abstract class GcsBaseAvroDestinationAcceptanceTest : GcsAvroParquetDestinationAcceptanceTest(S3Format.AVRO) { +abstract class GcsBaseAvroDestinationAcceptanceTest : + GcsAvroParquetDestinationAcceptanceTest(S3Format.AVRO) { override val formatConfig: JsonNode? - get() = Jsons.deserialize("""{ + get() = + Jsons.deserialize( + """{ "format_type": "Avro", "compression_codec": { "codec": "no compression", "compression_level": 5, "include_checksum": true } -}""") +}""" + ) override fun getTestDataComparator(): TestDataComparator { return GcsAvroTestDataComparator() } @Throws(Exception::class) - override fun retrieveRecords(testEnv: TestDestinationEnv, - streamName: String, - namespace: String, - streamSchema: JsonNode): List { + override fun retrieveRecords( + testEnv: TestDestinationEnv, + streamName: String, + namespace: String, + streamSchema: JsonNode + ): List { val nameUpdater = getFieldNameUpdater(streamName, namespace, streamSchema) val objectSummaries = getAllSyncedObjects(streamName, namespace) @@ -45,23 +50,28 @@ abstract class GcsBaseAvroDestinationAcceptanceTest : GcsAvroParquetDestinationA val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key) DataFileReader( SeekableByteArrayInput(`object`.objectContent.readAllBytes()), - GenericDatumReader()).use { dataFileReader -> - val jsonReader: ObjectReader = GcsDestinationAcceptanceTest.Companion.MAPPER.reader() - while (dataFileReader.hasNext()) { - val record = dataFileReader.next() - val jsonBytes = AvroConstants.JSON_CONVERTER.convertToJson(record) - var jsonRecord = jsonReader.readTree(jsonBytes) - jsonRecord = nameUpdater.getJsonWithOriginalFieldNames(jsonRecord!!) - jsonRecords.add(pruneAirbyteJson(jsonRecord)) + GenericDatumReader() + ) + .use { dataFileReader -> + val jsonReader: ObjectReader = MAPPER.reader() + while (dataFileReader.hasNext()) { + val record = dataFileReader.next() + val jsonBytes = AvroConstants.JSON_CONVERTER.convertToJson(record) + var jsonRecord = jsonReader.readTree(jsonBytes) + jsonRecord = nameUpdater.getJsonWithOriginalFieldNames(jsonRecord!!) + jsonRecords.add(pruneAirbyteJson(jsonRecord)) + } } - } } return jsonRecords } @Throws(Exception::class) - override fun retrieveDataTypesFromPersistedFiles(streamName: String?, namespace: String?): Map?> { + override fun retrieveDataTypesFromPersistedFiles( + streamName: String?, + namespace: String? + ): Map?> { val objectSummaries = getAllSyncedObjects(streamName, namespace) val resultDataTypes: MutableMap?> = HashMap() @@ -69,13 +79,15 @@ abstract class GcsBaseAvroDestinationAcceptanceTest : GcsAvroParquetDestinationA val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key) DataFileReader( SeekableByteArrayInput(`object`.objectContent.readAllBytes()), - GenericDatumReader()).use { dataFileReader -> - while (dataFileReader.hasNext()) { - val record = dataFileReader.next() - val actualDataTypes = getTypes(record) - resultDataTypes.putAll(actualDataTypes!!) + GenericDatumReader() + ) + .use { dataFileReader -> + while (dataFileReader.hasNext()) { + val record = dataFileReader.next() + val actualDataTypes = getTypes(record) + resultDataTypes.putAll(actualDataTypes!!) + } } - } } return resultDataTypes } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvDestinationAcceptanceTest.kt index 82fa50964a2b..22ac5401041e 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvDestinationAcceptanceTest.kt @@ -9,18 +9,17 @@ import com.fasterxml.jackson.databind.node.ObjectNode import io.airbyte.cdk.integrations.base.JavaBaseConstants import io.airbyte.cdk.integrations.destination.s3.S3Format import io.airbyte.cdk.integrations.destination.s3.util.Flattening -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.commons.json.Jsons -import org.apache.commons.csv.CSVFormat -import org.apache.commons.csv.CSVRecord -import org.apache.commons.csv.QuoteMode import java.io.IOException import java.io.InputStreamReader import java.io.Reader import java.nio.charset.StandardCharsets import java.util.* import java.util.stream.StreamSupport +import org.apache.commons.csv.CSVFormat +import org.apache.commons.csv.CSVRecord +import org.apache.commons.csv.QuoteMode abstract class GcsBaseCsvDestinationAcceptanceTest : GcsDestinationAcceptanceTest(S3Format.CSV) { override fun getProtocolVersion(): ProtocolVersion { @@ -28,16 +27,25 @@ abstract class GcsBaseCsvDestinationAcceptanceTest : GcsDestinationAcceptanceTes } override val formatConfig: JsonNode? - get() = Jsons.jsonNode(java.util.Map.of( - "format_type", outputFormat, - "flattening", Flattening.ROOT_LEVEL.value, - "compression", Jsons.jsonNode(java.util.Map.of("compression_type", "No Compression")))) + get() = + Jsons.jsonNode( + java.util.Map.of( + "format_type", + outputFormat, + "flattening", + Flattening.ROOT_LEVEL.value, + "compression", + Jsons.jsonNode(java.util.Map.of("compression_type", "No Compression")) + ) + ) @Throws(IOException::class) - override fun retrieveRecords(testEnv: TestDestinationEnv, - streamName: String, - namespace: String, - streamSchema: JsonNode): List { + override fun retrieveRecords( + testEnv: TestDestinationEnv, + streamName: String, + namespace: String, + streamSchema: JsonNode + ): List { val objectSummaries = getAllSyncedObjects(streamName, namespace) val fieldTypes = getFieldTypes(streamSchema) @@ -46,12 +54,13 @@ abstract class GcsBaseCsvDestinationAcceptanceTest : GcsDestinationAcceptanceTes for (objectSummary in objectSummaries!!) { s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key).use { `object` -> getReader(`object`).use { `in` -> - val records: Iterable = CSVFormat.DEFAULT - .withQuoteMode(QuoteMode.NON_NUMERIC) + val records: Iterable = + CSVFormat.DEFAULT.withQuoteMode(QuoteMode.NON_NUMERIC) .withFirstRecordAsHeader() .parse(`in`) - StreamSupport.stream(records.spliterator(), false) - .forEach { r: CSVRecord -> jsonRecords.add(getJsonNode(r.toMap(), fieldTypes)) } + StreamSupport.stream(records.spliterator(), false).forEach { r: CSVRecord -> + jsonRecords.add(getJsonNode(r.toMap(), fieldTypes)) + } } } } @@ -65,9 +74,7 @@ abstract class GcsBaseCsvDestinationAcceptanceTest : GcsDestinationAcceptanceTes } companion object { - /** - * Convert json_schema to a map from field name to field types. - */ + /** Convert json_schema to a map from field name to field types. */ private fun getFieldTypes(streamSchema: JsonNode): Map { val fieldTypes: MutableMap = HashMap() val fieldDefinitions = streamSchema["properties"] @@ -75,22 +82,28 @@ abstract class GcsBaseCsvDestinationAcceptanceTest : GcsDestinationAcceptanceTes while (iterator.hasNext()) { val entry = iterator.next() val fieldValue = entry.value - val typeValue = if (fieldValue["type"] == null) fieldValue["\$ref"] else fieldValue["type"] + val typeValue = + if (fieldValue["type"] == null) fieldValue["\$ref"] else fieldValue["type"] fieldTypes[entry.key] = typeValue.asText() } return fieldTypes } - private fun getJsonNode(input: Map, fieldTypes: Map): JsonNode { - val json: ObjectNode = GcsDestinationAcceptanceTest.Companion.MAPPER.createObjectNode() + private fun getJsonNode( + input: Map, + fieldTypes: Map + ): JsonNode { + val json: ObjectNode = MAPPER.createObjectNode() if (input.containsKey(JavaBaseConstants.COLUMN_NAME_DATA)) { return Jsons.deserialize(input[JavaBaseConstants.COLUMN_NAME_DATA]) } for ((key, value) in input) { - if (key == JavaBaseConstants.COLUMN_NAME_AB_ID || (key - == JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) { + if ( + key == JavaBaseConstants.COLUMN_NAME_AB_ID || + (key == JavaBaseConstants.COLUMN_NAME_EMITTED_AT) + ) { continue } if (value == null || value == "") { @@ -109,7 +122,10 @@ abstract class GcsBaseCsvDestinationAcceptanceTest : GcsDestinationAcceptanceTes } private fun addNoTypeValue(json: ObjectNode, key: String, value: String?) { - if (value != null && (value.matches("^\\[.*\\]$".toRegex())) || value!!.matches("^\\{.*\\}$".toRegex())) { + if ( + value != null && (value.matches("^\\[.*\\]$".toRegex())) || + value!!.matches("^\\{.*\\}$".toRegex()) + ) { val newNode = Jsons.deserialize(value) json.set(key, newNode) } else { diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvGzipDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvGzipDestinationAcceptanceTest.kt index d2f1eb876e25..1c436609cf33 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvGzipDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseCsvGzipDestinationAcceptanceTest.kt @@ -6,7 +6,6 @@ package io.airbyte.cdk.integrations.destination.gcs import com.amazonaws.services.s3.model.S3Object import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.integrations.destination.s3.util.Flattening -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.commons.json.Jsons import java.io.IOException @@ -22,10 +21,10 @@ abstract class GcsBaseCsvGzipDestinationAcceptanceTest : GcsBaseCsvDestinationAc } override val formatConfig: JsonNode? - get() =// config without compression defaults to GZIP - Jsons.jsonNode(Map.of( - "format_type", outputFormat, - "flattening", Flattening.ROOT_LEVEL.value)) + get() = // config without compression defaults to GZIP + Jsons.jsonNode( + Map.of("format_type", outputFormat, "flattening", Flattening.ROOT_LEVEL.value) + ) @Throws(IOException::class) override fun getReader(s3Object: S3Object): Reader { diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlDestinationAcceptanceTest.kt index e14a03ab791e..eb87cf87a238 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlDestinationAcceptanceTest.kt @@ -7,7 +7,6 @@ import com.amazonaws.services.s3.model.S3Object import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.integrations.base.JavaBaseConstants import io.airbyte.cdk.integrations.destination.s3.S3Format -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.commons.json.Jsons import java.io.BufferedReader @@ -19,21 +18,30 @@ import java.util.Map import kotlin.collections.List import kotlin.collections.MutableList -abstract class GcsBaseJsonlDestinationAcceptanceTest : GcsDestinationAcceptanceTest(S3Format.JSONL) { +abstract class GcsBaseJsonlDestinationAcceptanceTest : + GcsDestinationAcceptanceTest(S3Format.JSONL) { override fun getProtocolVersion(): ProtocolVersion { return ProtocolVersion.V1 } override val formatConfig: JsonNode? - get() = Jsons.jsonNode(Map.of( - "format_type", outputFormat, - "compression", Jsons.jsonNode(Map.of("compression_type", "No Compression")))) + get() = + Jsons.jsonNode( + Map.of( + "format_type", + outputFormat, + "compression", + Jsons.jsonNode(Map.of("compression_type", "No Compression")) + ) + ) @Throws(IOException::class) - override fun retrieveRecords(testEnv: TestDestinationEnv, - streamName: String, - namespace: String, - streamSchema: JsonNode): List { + override fun retrieveRecords( + testEnv: TestDestinationEnv, + streamName: String, + namespace: String, + streamSchema: JsonNode + ): List { val objectSummaries = getAllSyncedObjects(streamName, namespace) val jsonRecords: MutableList = LinkedList() diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlGzipDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlGzipDestinationAcceptanceTest.kt index 96b55404189f..8c57e6926010 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlGzipDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseJsonlGzipDestinationAcceptanceTest.kt @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.destination.gcs import com.amazonaws.services.s3.model.S3Object import com.fasterxml.jackson.databind.JsonNode -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.commons.json.Jsons import java.io.BufferedReader @@ -21,11 +20,13 @@ abstract class GcsBaseJsonlGzipDestinationAcceptanceTest : GcsBaseJsonlDestinati } override val formatConfig: JsonNode? - get() =// config without compression defaults to GZIP - Jsons.jsonNode(Map.of("format_type", outputFormat)) + get() = // config without compression defaults to GZIP + Jsons.jsonNode(Map.of("format_type", outputFormat)) @Throws(IOException::class) override fun getReader(s3Object: S3Object): BufferedReader { - return BufferedReader(InputStreamReader(GZIPInputStream(s3Object.objectContent), StandardCharsets.UTF_8)) + return BufferedReader( + InputStreamReader(GZIPInputStream(s3Object.objectContent), StandardCharsets.UTF_8) + ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseParquetDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseParquetDestinationAcceptanceTest.kt index 76236cdea83e..f5849f7e6b81 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseParquetDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsBaseParquetDestinationAcceptanceTest.kt @@ -11,39 +11,40 @@ import io.airbyte.cdk.integrations.destination.s3.avro.AvroConstants import io.airbyte.cdk.integrations.destination.s3.parquet.S3ParquetWriter.Companion.getHadoopConfig import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.getFieldNameUpdater import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.pruneAirbyteJson -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator import io.airbyte.commons.json.Jsons +import java.io.IOException +import java.net.URI +import java.net.URISyntaxException +import java.util.* import org.apache.avro.Schema import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroReadSupport import org.apache.parquet.hadoop.ParquetReader -import java.io.IOException -import java.net.URI -import java.net.URISyntaxException -import java.util.* -abstract class GcsBaseParquetDestinationAcceptanceTest : GcsAvroParquetDestinationAcceptanceTest(S3Format.PARQUET) { +abstract class GcsBaseParquetDestinationAcceptanceTest : + GcsAvroParquetDestinationAcceptanceTest(S3Format.PARQUET) { override fun getProtocolVersion(): ProtocolVersion { return ProtocolVersion.V1 } override val formatConfig: JsonNode? - get() = Jsons.jsonNode(java.util.Map.of( - "format_type", "Parquet", - "compression_codec", "GZIP")) + get() = + Jsons.jsonNode(java.util.Map.of("format_type", "Parquet", "compression_codec", "GZIP")) override fun getTestDataComparator(): TestDataComparator { return GcsAvroTestDataComparator() } @Throws(IOException::class, URISyntaxException::class) - override fun retrieveRecords(testEnv: TestDestinationEnv, - streamName: String, - namespace: String, - streamSchema: JsonNode): List { + override fun retrieveRecords( + testEnv: TestDestinationEnv, + streamName: String, + namespace: String, + streamSchema: JsonNode + ): List { val nameUpdater = getFieldNameUpdater(streamName, namespace, streamSchema) val objectSummaries = getAllSyncedObjects(streamName, namespace) @@ -56,24 +57,29 @@ abstract class GcsBaseParquetDestinationAcceptanceTest : GcsAvroParquetDestinati val hadoopConfig = GcsParquetWriter.getHadoopConfig(config) ParquetReader.builder(AvroReadSupport(), path) - .withConf(hadoopConfig) - .build().use { parquetReader -> - val jsonReader: ObjectReader = GcsDestinationAcceptanceTest.Companion.MAPPER.reader() - var record: GenericData.Record? - while ((parquetReader.read().also { record = it }) != null) { - val jsonBytes = AvroConstants.JSON_CONVERTER.convertToJson(record) - var jsonRecord = jsonReader.readTree(jsonBytes) - jsonRecord = nameUpdater.getJsonWithOriginalFieldNames(jsonRecord!!) - jsonRecords.add(pruneAirbyteJson(jsonRecord)) - } + .withConf(hadoopConfig) + .build() + .use { parquetReader -> + val jsonReader: ObjectReader = + GcsDestinationAcceptanceTest.Companion.MAPPER.reader() + var record: GenericData.Record? + while ((parquetReader.read().also { record = it }) != null) { + val jsonBytes = AvroConstants.JSON_CONVERTER.convertToJson(record) + var jsonRecord = jsonReader.readTree(jsonBytes) + jsonRecord = nameUpdater.getJsonWithOriginalFieldNames(jsonRecord!!) + jsonRecords.add(pruneAirbyteJson(jsonRecord)) } + } } return jsonRecords } @Throws(Exception::class) - override fun retrieveDataTypesFromPersistedFiles(streamName: String?, namespace: String?): Map?> { + override fun retrieveDataTypesFromPersistedFiles( + streamName: String?, + namespace: String? + ): Map?> { val objectSummaries = getAllSyncedObjects(streamName, namespace) val resultDataTypes: MutableMap?> = HashMap() @@ -84,14 +90,15 @@ abstract class GcsBaseParquetDestinationAcceptanceTest : GcsAvroParquetDestinati val hadoopConfig = getHadoopConfig(config!!) ParquetReader.builder(AvroReadSupport(), path) - .withConf(hadoopConfig) - .build().use { parquetReader -> - var record: GenericData.Record? - while ((parquetReader.read().also { record = it }) != null) { - val actualDataTypes = getTypes(record!!) - resultDataTypes.putAll(actualDataTypes!!) - } + .withConf(hadoopConfig) + .build() + .use { parquetReader -> + var record: GenericData.Record? + while ((parquetReader.read().also { record = it }) != null) { + val actualDataTypes = getTypes(record!!) + resultDataTypes.putAll(actualDataTypes!!) } + } } return resultDataTypes diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationAcceptanceTest.kt index 31d7ff7751b5..926b26ec59df 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationAcceptanceTest.kt @@ -13,7 +13,6 @@ import com.google.common.collect.ImmutableMap import io.airbyte.cdk.integrations.destination.NamingConventionTransformer import io.airbyte.cdk.integrations.destination.s3.S3Format import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations -import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion import io.airbyte.cdk.integrations.standardtest.destination.comparator.AdvancedTestDataComparator @@ -23,6 +22,9 @@ import io.airbyte.commons.jackson.MoreMappers import io.airbyte.commons.json.Jsons import io.airbyte.configoss.StandardCheckConnectionOutput import io.airbyte.protocol.models.v0.AirbyteConnectionStatus +import java.nio.file.Path +import java.util.* +import java.util.stream.Collectors import org.apache.commons.lang3.RandomStringUtils import org.joda.time.DateTime import org.joda.time.DateTimeZone @@ -30,25 +32,23 @@ import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import org.slf4j.Logger import org.slf4j.LoggerFactory -import java.nio.file.Path -import java.util.* -import java.util.stream.Collectors /** * When adding a new GCS destination acceptance test, extend this class and do the following: - * * Implement [.getFormatConfig] that returns a [S3FormatConfig] - * * Implement [.retrieveRecords] that returns the Json records for the test + * * Implement [.getFormatConfig] that returns a [S3FormatConfig] + * * Implement [.retrieveRecords] that returns the Json records for the test * * Under the hood, a [GcsDestinationConfig] is constructed as follows: - * * Retrieve the secrets from "secrets/config.json" - * * Get the GCS bucket path from the constructor - * * Get the format config from [.getFormatConfig] + * * Retrieve the secrets from "secrets/config.json" + * * Get the GCS bucket path from the constructor + * * Get the format config from [.getFormatConfig] */ -abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format) : DestinationAcceptanceTest() { +abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format) : + DestinationAcceptanceTest() { protected var configJson: JsonNode? = null - protected var config: GcsDestinationConfig? = null - protected var s3Client: AmazonS3? = null - protected var nameTransformer: NamingConventionTransformer? = null + protected lateinit var config: GcsDestinationConfig + protected lateinit var s3Client: AmazonS3 + protected lateinit var nameTransformer: NamingConventionTransformer protected var s3StorageOperations: S3StorageOperations? = null protected val baseConfigJson: JsonNode @@ -62,7 +62,7 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format return configJson!! } - override fun getDefaultSchema(config: JsonNode): String { + override fun getDefaultSchema(config: JsonNode): String? { if (config.has("gcs_bucket_path")) { return config["gcs_bucket_path"].asText() } @@ -94,20 +94,24 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format return failCheckJson } - /** - * Helper method to retrieve all synced objects inside the configured bucket path. - */ - protected fun getAllSyncedObjects(streamName: String?, namespace: String?): List { + /** Helper method to retrieve all synced objects inside the configured bucket path. */ + protected fun getAllSyncedObjects( + streamName: String?, + namespace: String? + ): List { val namespaceStr = nameTransformer!!.getNamespace(namespace!!) val streamNameStr = nameTransformer!!.getIdentifier(streamName!!) - val outputPrefix = s3StorageOperations!!.getBucketObjectPath( + val outputPrefix = + s3StorageOperations!!.getBucketObjectPath( namespaceStr, streamNameStr, DateTime.now(DateTimeZone.UTC), - config!!.pathFormat) + config!!.pathFormat + ) // the child folder contains a non-deterministic epoch timestamp, so use the parent folder val parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1) - val objectSummaries = s3Client + val objectSummaries = + s3Client .listObjects(config!!.bucketName, parentFolder) .objectSummaries .stream() @@ -115,8 +119,12 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format .sorted(Comparator.comparingLong { o: S3ObjectSummary -> o.lastModified.time }) .collect(Collectors.toList()) LOGGER.info( - "All objects: {}", - objectSummaries.stream().map { o: S3ObjectSummary -> String.format("%s/%s", o.bucketName, o.key) }.collect(Collectors.toList())) + "All objects: {}", + objectSummaries + .stream() + .map { o: S3ObjectSummary -> String.format("%s/%s", o.bucketName, o.key) } + .collect(Collectors.toList()) + ) return objectSummaries } @@ -125,44 +133,45 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format /** * This method does the following: - * * Construct the GCS destination config. - * * Construct the GCS client. + * * Construct the GCS destination config. + * * Construct the GCS client. */ override fun setup(testEnv: TestDestinationEnv, TEST_SCHEMAS: HashSet) { val baseConfigJson = baseConfigJson // Set a random GCS bucket path for each integration test val configJson = Jsons.clone(baseConfigJson) - val testBucketPath = String.format( + val testBucketPath = + String.format( "%s_test_%s", outputFormat.name.lowercase(), - RandomStringUtils.randomAlphanumeric(5)) + RandomStringUtils.randomAlphanumeric(5) + ) (configJson as ObjectNode) - .put("gcs_bucket_path", testBucketPath) - .set("format", formatConfig) + .put("gcs_bucket_path", testBucketPath) + .set("format", formatConfig) this.configJson = configJson this.config = GcsDestinationConfig.getGcsDestinationConfig(configJson) LOGGER.info("Test full path: {}/{}", config.bucketName, config.bucketPath) - this.s3Client = config.s3Client + this.s3Client = config.getS3Client() this.nameTransformer = GcsNameTransformer() this.s3StorageOperations = S3StorageOperations(nameTransformer, s3Client!!, config) } - /** - * Remove all the S3 output from the tests. - */ + /** Remove all the S3 output from the tests. */ override fun tearDown(testEnv: TestDestinationEnv) { val keysToDelete: MutableList = LinkedList() - val objects = s3Client - .listObjects(config!!.bucketName, config!!.bucketPath) - .objectSummaries + val objects = s3Client.listObjects(config!!.bucketName, config!!.bucketPath).objectSummaries for (`object` in objects) { keysToDelete.add(DeleteObjectsRequest.KeyVersion(`object`.key)) } if (keysToDelete.size > 0) { - LOGGER.info("Tearing down test bucket path: {}/{}", config!!.bucketName, - config!!.bucketPath) + LOGGER.info( + "Tearing down test bucket path: {}/{}", + config!!.bucketName, + config!!.bucketPath + ) // Google Cloud Storage doesn't accept request to delete multiple objects for (keyToDelete in keysToDelete) { s3Client!!.deleteObject(config!!.bucketName, keyToDelete.key) @@ -179,30 +188,38 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format @Test @Throws(Exception::class) fun testCheckConnectionInsufficientRoles() { - val baseConfigJson = Jsons.deserialize(IOs.readFile(Path.of( - SECRET_FILE_PATH_INSUFFICIENT_ROLES))) + val baseConfigJson = + Jsons.deserialize(IOs.readFile(Path.of(SECRET_FILE_PATH_INSUFFICIENT_ROLES))) // Set a random GCS bucket path for each integration test val configJson = Jsons.clone(baseConfigJson) - val testBucketPath = String.format( + val testBucketPath = + String.format( "%s_test_%s", outputFormat.name.lowercase(), - RandomStringUtils.randomAlphanumeric(5)) + RandomStringUtils.randomAlphanumeric(5) + ) (configJson as ObjectNode) - .put("gcs_bucket_path", testBucketPath) - .set("format", formatConfig) + .put("gcs_bucket_path", testBucketPath) + .set("format", formatConfig) - Assertions.assertEquals(StandardCheckConnectionOutput.Status.FAILED, runCheck(configJson).status) + Assertions.assertEquals( + StandardCheckConnectionOutput.Status.FAILED, + runCheck(configJson).status + ) } @Test fun testCheckIncorrectHmacKeyAccessIdCredential() { val baseJson = baseConfigJson - val credential = Jsons.jsonNode(ImmutableMap.builder() - .put("credential_type", "HMAC_KEY") - .put("hmac_key_access_id", "fake-key") - .put("hmac_key_secret", baseJson["credential"]["hmac_key_secret"].asText()) - .build()) + val credential = + Jsons.jsonNode( + ImmutableMap.builder() + .put("credential_type", "HMAC_KEY") + .put("hmac_key_access_id", "fake-key") + .put("hmac_key_secret", baseJson["credential"]["hmac_key_secret"].asText()) + .build() + ) (baseJson as ObjectNode).put("credential", credential) baseJson.set("format", formatConfig) @@ -216,11 +233,17 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format @Test fun testCheckIncorrectHmacKeySecretCredential() { val baseJson = baseConfigJson - val credential = Jsons.jsonNode(ImmutableMap.builder() - .put("credential_type", "HMAC_KEY") - .put("hmac_key_access_id", baseJson["credential"]["hmac_key_access_id"].asText()) - .put("hmac_key_secret", "fake-secret") - .build()) + val credential = + Jsons.jsonNode( + ImmutableMap.builder() + .put("credential_type", "HMAC_KEY") + .put( + "hmac_key_access_id", + baseJson["credential"]["hmac_key_access_id"].asText() + ) + .put("hmac_key_secret", "fake-secret") + .build() + ) (baseJson as ObjectNode).put("credential", credential) baseJson.set("format", formatConfig) @@ -244,10 +267,12 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format } companion object { - protected val LOGGER: Logger = LoggerFactory.getLogger(GcsDestinationAcceptanceTest::class.java) - protected val MAPPER: ObjectMapper = MoreMappers.initMapper() + protected val LOGGER: Logger = + LoggerFactory.getLogger(GcsDestinationAcceptanceTest::class.java) + @JvmStatic protected val MAPPER: ObjectMapper = MoreMappers.initMapper() protected const val SECRET_FILE_PATH: String = "secrets/config.json" - protected const val SECRET_FILE_PATH_INSUFFICIENT_ROLES: String = "secrets/insufficient_roles_config.json" + protected const val SECRET_FILE_PATH_INSUFFICIENT_ROLES: String = + "secrets/insufficient_roles_config.json" } }