diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapper.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapper.kt index c91b8afb3601..a36c286a03a5 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapper.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapper.kt @@ -50,6 +50,7 @@ interface AirbyteSchemaIdentityMapper { fun mapTimeTypeWithoutTimezone(schema: TimeTypeWithoutTimezone): AirbyteType = schema fun mapTimestampTypeWithTimezone(schema: TimestampTypeWithTimezone): AirbyteType = schema fun mapTimestampTypeWithoutTimezone(schema: TimestampTypeWithoutTimezone): AirbyteType = schema + fun mapUnknown(schema: UnknownType): AirbyteType = schema fun mapField(field: FieldType): FieldType = FieldType(map(field.type), field.nullable) } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt index ecea54bcc7f7..eaab8330f276 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt @@ -46,7 +46,10 @@ open class AirbyteValueIdentityMapper( is TimestampTypeWithoutTimezone -> mapTimestampWithoutTimezone(value as TimestampValue, path) is NullType -> mapNull(path) - is UnknownType -> mapUnknown(value as UnknownValue, path) + is UnknownType -> { + collectFailure(path) + mapNull(path) + } } } catch (e: Exception) { collectFailure(path) @@ -111,6 +114,4 @@ open class AirbyteValueIdentityMapper( value open fun mapNull(path: List): AirbyteValue = NullValue - - open fun mapUnknown(value: UnknownValue, path: List): AirbyteValue = value } diff --git a/airbyte-cdk/bulk/toolkits/load-csv/src/main/kotlin/io/airbyte/cdk/load/file/csv/CSVWriter.kt b/airbyte-cdk/bulk/toolkits/load-csv/src/main/kotlin/io/airbyte/cdk/load/file/csv/CSVWriter.kt index 044b3f8afdf6..60d293470ad9 100644 --- a/airbyte-cdk/bulk/toolkits/load-csv/src/main/kotlin/io/airbyte/cdk/load/file/csv/CSVWriter.kt +++ b/airbyte-cdk/bulk/toolkits/load-csv/src/main/kotlin/io/airbyte/cdk/load/file/csv/CSVWriter.kt @@ -6,9 +6,13 @@ package io.airbyte.cdk.load.file.csv import io.airbyte.cdk.load.data.ObjectType import io.airbyte.cdk.load.data.csv.toCsvHeader -import java.io.Writer +import java.io.OutputStream import org.apache.commons.csv.CSVFormat import org.apache.commons.csv.CSVPrinter -fun ObjectType.toCsvPrinterWithHeader(writer: Writer): CSVPrinter = - CSVFormat.Builder.create().setHeader(*toCsvHeader()).setAutoFlush(true).build().print(writer) +fun ObjectType.toCsvPrinterWithHeader(outputStream: OutputStream): CSVPrinter = + CSVFormat.Builder.create() + .setHeader(*toCsvHeader()) + .setAutoFlush(true) + .build() + .print(outputStream.writer(charset = Charsets.UTF_8)) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt index de5af60f96f0..099d5979a261 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt @@ -16,6 +16,14 @@ interface ObjectStorageClient> { suspend fun get(key: String, block: (InputStream) -> U): U suspend fun put(key: String, bytes: ByteArray): T suspend fun delete(remoteObject: T) + + /** + * Streaming upload should provide an [OutputStream] managed within the lifecycle of [block]. + * The stream should be closed after the block completes, however it should be safe for users of + * the stream to close early (some writers do this by default, especially those that write whole + * files). Specifically, the method should guarantee that no operations will be performed on the + * stream after [block] completes. + */ suspend fun streamingUpload(key: String, block: suspend (OutputStream) -> Unit): T = streamingUpload(key, NoopProcessor, block) suspend fun streamingUpload( diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt new file mode 100644 index 000000000000..99a97afa7e17 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.file.object_storage + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.object_storage.AvroFormatConfiguration +import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration +import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration +import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider +import io.airbyte.cdk.load.command.object_storage.ParquetFormatConfiguration +import io.airbyte.cdk.load.data.DestinationRecordToAirbyteValueWithMeta +import io.airbyte.cdk.load.data.avro.toAvroRecord +import io.airbyte.cdk.load.data.avro.toAvroSchema +import io.airbyte.cdk.load.data.csv.toCsvRecord +import io.airbyte.cdk.load.data.json.toJson +import io.airbyte.cdk.load.file.avro.toAvroWriter +import io.airbyte.cdk.load.file.csv.toCsvPrinterWithHeader +import io.airbyte.cdk.load.file.parquet.toParquetWriter +import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.util.serializeToString +import io.airbyte.cdk.load.util.write +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Singleton +import java.io.Closeable +import java.io.OutputStream + +interface ObjectStorageFormattingWriter : Closeable { + fun accept(record: DestinationRecord) +} + +@Singleton +@Secondary +class ObjectStorageFormattingWriterFactory( + private val recordDecorator: DestinationRecordToAirbyteValueWithMeta, + private val formatConfigProvider: ObjectStorageFormatConfigurationProvider, +) { + fun create( + stream: DestinationStream, + outputStream: OutputStream + ): ObjectStorageFormattingWriter { + return when (formatConfigProvider.objectStorageFormatConfiguration) { + is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator) + is AvroFormatConfiguration -> + AvroFormattingWriter( + stream, + outputStream, + formatConfigProvider.objectStorageFormatConfiguration + as AvroFormatConfiguration, + recordDecorator + ) + is ParquetFormatConfiguration -> + ParquetFormattingWriter( + stream, + outputStream, + formatConfigProvider.objectStorageFormatConfiguration + as ParquetFormatConfiguration, + recordDecorator + ) + is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator) + } + } +} + +class JsonFormattingWriter( + private val outputStream: OutputStream, + private val recordDecorator: DestinationRecordToAirbyteValueWithMeta +) : ObjectStorageFormattingWriter { + override fun accept(record: DestinationRecord) { + outputStream.write(recordDecorator.decorate(record).toJson().serializeToString()) + } + + override fun close() { + outputStream.close() + } +} + +class CSVFormattingWriter( + stream: DestinationStream, + outputStream: OutputStream, + private val recordDecorator: DestinationRecordToAirbyteValueWithMeta +) : ObjectStorageFormattingWriter { + private val printer = stream.schemaWithMeta.toCsvPrinterWithHeader(outputStream) + override fun accept(record: DestinationRecord) { + printer.printRecord(*recordDecorator.decorate(record).toCsvRecord()) + } + override fun close() { + printer.close() + } +} + +class AvroFormattingWriter( + stream: DestinationStream, + outputStream: OutputStream, + formatConfig: AvroFormatConfiguration, + private val recordDecorator: DestinationRecordToAirbyteValueWithMeta +) : ObjectStorageFormattingWriter { + private val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor) + private val writer = + outputStream.toAvroWriter(avroSchema, formatConfig.avroCompressionConfiguration) + override fun accept(record: DestinationRecord) { + writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema)) + } + + override fun close() { + writer.close() + } +} + +class ParquetFormattingWriter( + stream: DestinationStream, + outputStream: OutputStream, + formatConfig: ParquetFormatConfiguration, + private val recordDecorator: DestinationRecordToAirbyteValueWithMeta +) : ObjectStorageFormattingWriter { + private val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor) + private val writer = + outputStream.toParquetWriter(avroSchema, formatConfig.parquetWriterConfiguration) + override fun accept(record: DestinationRecord) { + writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema)) + } + + override fun close() { + writer.close() + } +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index 2b7b01267a9c..8726a1097f18 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.1.12 + dockerImageTag: 0.1.13 dockerRepository: airbyte/destination-s3-v2 githubIssueLabel: destination-s3-v2 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt index 4c7caa97577e..7d028777d2d0 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt @@ -6,26 +6,12 @@ package io.airbyte.integrations.destination.s3_v2 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.command.object_storage.AvroFormatConfiguration -import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration -import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration -import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider -import io.airbyte.cdk.load.command.object_storage.ParquetFormatConfiguration -import io.airbyte.cdk.load.data.DestinationRecordToAirbyteValueWithMeta -import io.airbyte.cdk.load.data.avro.toAvroRecord -import io.airbyte.cdk.load.data.avro.toAvroSchema -import io.airbyte.cdk.load.data.csv.toCsvRecord -import io.airbyte.cdk.load.data.json.toJson -import io.airbyte.cdk.load.file.avro.toAvroWriter -import io.airbyte.cdk.load.file.csv.toCsvPrinterWithHeader +import io.airbyte.cdk.load.file.object_storage.ObjectStorageFormattingWriterFactory import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory -import io.airbyte.cdk.load.file.parquet.toParquetWriter import io.airbyte.cdk.load.file.s3.S3Client import io.airbyte.cdk.load.file.s3.S3Object import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.DestinationRecord -import io.airbyte.cdk.load.util.serializeToString -import io.airbyte.cdk.load.util.write import io.airbyte.cdk.load.write.DestinationWriter import io.airbyte.cdk.load.write.StreamLoader import jakarta.inject.Singleton @@ -35,8 +21,7 @@ import java.util.concurrent.atomic.AtomicLong class S3V2Writer( private val s3Client: S3Client, private val pathFactory: ObjectStoragePathFactory, - private val recordDecorator: DestinationRecordToAirbyteValueWithMeta, - private val formatConfigProvider: ObjectStorageFormatConfigurationProvider + private val writerFactory: ObjectStorageFormattingWriterFactory, ) : DestinationWriter { sealed interface S3V2Batch : Batch data class StagedObject( @@ -49,8 +34,6 @@ class S3V2Writer( val s3Object: S3Object, ) : S3V2Batch - private val formatConfig = formatConfigProvider.objectStorageFormatConfiguration - override fun createStreamLoader(stream: DestinationStream): StreamLoader { return S3V2StreamLoader(stream) } @@ -58,15 +41,6 @@ class S3V2Writer( @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation") inner class S3V2StreamLoader(override val stream: DestinationStream) : StreamLoader { private val partNumber = AtomicLong(0L) // TODO: Get from destination state - private val avroSchema = - if ( - formatConfig is AvroFormatConfiguration || - formatConfig is ParquetFormatConfiguration - ) { - stream.schemaWithMeta.toAvroSchema(stream.descriptor) - } else { - null - } override suspend fun processRecords( records: Iterator, @@ -76,55 +50,8 @@ class S3V2Writer( val key = pathFactory.getPathToFile(stream, partNumber, isStaging = true).toString() val s3Object = s3Client.streamingUpload(key) { outputStream -> - when (formatConfig) { - is JsonFormatConfiguration -> { - records.forEach { - val serialized = - recordDecorator.decorate(it).toJson().serializeToString() - outputStream.write(serialized) - outputStream.write("\n") - } - } - is CSVFormatConfiguration -> { - stream.schemaWithMeta - .toCsvPrinterWithHeader(outputStream.writer()) - .use { printer -> - records.forEach { - printer.printRecord( - *recordDecorator.decorate(it).toCsvRecord() - ) - } - } - } - is AvroFormatConfiguration -> { - outputStream - .toAvroWriter( - avroSchema!!, - formatConfig.avroCompressionConfiguration - ) - .use { writer -> - records.forEach { - writer.write( - recordDecorator.decorate(it).toAvroRecord(avroSchema) - ) - } - } - } - is ParquetFormatConfiguration -> { - outputStream - .toParquetWriter( - avroSchema!!, - formatConfig.parquetWriterConfiguration - ) - .use { writer -> - records.forEach { - writer.write( - recordDecorator.decorate(it).toAvroRecord(avroSchema) - ) - } - } - } - else -> throw IllegalStateException("Unsupported format") + writerFactory.create(stream, outputStream).use { writer -> + records.forEach { writer.accept(it) } } } return StagedObject(s3Object = s3Object, partNumber = partNumber)