Skip to content

Commit

Permalink
Bulk Load CDK: Nop Refactor: Formatted object writes to toolkit (#47382)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Oct 28, 2024
1 parent d33d4a0 commit 9106d24
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ interface AirbyteSchemaIdentityMapper {
fun mapTimeTypeWithoutTimezone(schema: TimeTypeWithoutTimezone): AirbyteType = schema
fun mapTimestampTypeWithTimezone(schema: TimestampTypeWithTimezone): AirbyteType = schema
fun mapTimestampTypeWithoutTimezone(schema: TimestampTypeWithoutTimezone): AirbyteType = schema

fun mapUnknown(schema: UnknownType): AirbyteType = schema
fun mapField(field: FieldType): FieldType = FieldType(map(field.type), field.nullable)
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ open class AirbyteValueIdentityMapper(
is TimestampTypeWithoutTimezone ->
mapTimestampWithoutTimezone(value as TimestampValue, path)
is NullType -> mapNull(path)
is UnknownType -> mapUnknown(value as UnknownValue, path)
is UnknownType -> {
collectFailure(path)
mapNull(path)
}
}
} catch (e: Exception) {
collectFailure(path)
Expand Down Expand Up @@ -111,6 +114,4 @@ open class AirbyteValueIdentityMapper(
value

open fun mapNull(path: List<String>): AirbyteValue = NullValue

open fun mapUnknown(value: UnknownValue, path: List<String>): AirbyteValue = value
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ package io.airbyte.cdk.load.file.csv

import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.csv.toCsvHeader
import java.io.Writer
import java.io.OutputStream
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVPrinter

fun ObjectType.toCsvPrinterWithHeader(writer: Writer): CSVPrinter =
CSVFormat.Builder.create().setHeader(*toCsvHeader()).setAutoFlush(true).build().print(writer)
fun ObjectType.toCsvPrinterWithHeader(outputStream: OutputStream): CSVPrinter =
CSVFormat.Builder.create()
.setHeader(*toCsvHeader())
.setAutoFlush(true)
.build()
.print(outputStream.writer(charset = Charsets.UTF_8))
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ interface ObjectStorageClient<T : RemoteObject<*>> {
suspend fun <U> get(key: String, block: (InputStream) -> U): U
suspend fun put(key: String, bytes: ByteArray): T
suspend fun delete(remoteObject: T)

/**
* Streaming upload should provide an [OutputStream] managed within the lifecycle of [block].
* The stream should be closed after the block completes, however it should be safe for users of
* the stream to close early (some writers do this by default, especially those that write whole
* files). Specifically, the method should guarantee that no operations will be performed on the
* stream after [block] completes.
*/
suspend fun streamingUpload(key: String, block: suspend (OutputStream) -> Unit): T =
streamingUpload(key, NoopProcessor, block)
suspend fun <V : OutputStream> streamingUpload(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.file.object_storage

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.object_storage.AvroFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider
import io.airbyte.cdk.load.command.object_storage.ParquetFormatConfiguration
import io.airbyte.cdk.load.data.DestinationRecordToAirbyteValueWithMeta
import io.airbyte.cdk.load.data.avro.toAvroRecord
import io.airbyte.cdk.load.data.avro.toAvroSchema
import io.airbyte.cdk.load.data.csv.toCsvRecord
import io.airbyte.cdk.load.data.json.toJson
import io.airbyte.cdk.load.file.avro.toAvroWriter
import io.airbyte.cdk.load.file.csv.toCsvPrinterWithHeader
import io.airbyte.cdk.load.file.parquet.toParquetWriter
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.util.serializeToString
import io.airbyte.cdk.load.util.write
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.io.Closeable
import java.io.OutputStream

interface ObjectStorageFormattingWriter : Closeable {
fun accept(record: DestinationRecord)
}

@Singleton
@Secondary
class ObjectStorageFormattingWriterFactory(
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta,
private val formatConfigProvider: ObjectStorageFormatConfigurationProvider,
) {
fun create(
stream: DestinationStream,
outputStream: OutputStream
): ObjectStorageFormattingWriter {
return when (formatConfigProvider.objectStorageFormatConfiguration) {
is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator)
is AvroFormatConfiguration ->
AvroFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as AvroFormatConfiguration,
recordDecorator
)
is ParquetFormatConfiguration ->
ParquetFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as ParquetFormatConfiguration,
recordDecorator
)
is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator)
}
}
}

class JsonFormattingWriter(
private val outputStream: OutputStream,
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta
) : ObjectStorageFormattingWriter {
override fun accept(record: DestinationRecord) {
outputStream.write(recordDecorator.decorate(record).toJson().serializeToString())
}

override fun close() {
outputStream.close()
}
}

class CSVFormattingWriter(
stream: DestinationStream,
outputStream: OutputStream,
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta
) : ObjectStorageFormattingWriter {
private val printer = stream.schemaWithMeta.toCsvPrinterWithHeader(outputStream)
override fun accept(record: DestinationRecord) {
printer.printRecord(*recordDecorator.decorate(record).toCsvRecord())
}
override fun close() {
printer.close()
}
}

class AvroFormattingWriter(
stream: DestinationStream,
outputStream: OutputStream,
formatConfig: AvroFormatConfiguration,
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta
) : ObjectStorageFormattingWriter {
private val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor)
private val writer =
outputStream.toAvroWriter(avroSchema, formatConfig.avroCompressionConfiguration)
override fun accept(record: DestinationRecord) {
writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema))
}

override fun close() {
writer.close()
}
}

class ParquetFormattingWriter(
stream: DestinationStream,
outputStream: OutputStream,
formatConfig: ParquetFormatConfiguration,
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta
) : ObjectStorageFormattingWriter {
private val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor)
private val writer =
outputStream.toParquetWriter(avroSchema, formatConfig.parquetWriterConfiguration)
override fun accept(record: DestinationRecord) {
writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema))
}

override fun close() {
writer.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.1.12
dockerImageTag: 0.1.13
dockerRepository: airbyte/destination-s3-v2
githubIssueLabel: destination-s3-v2
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,12 @@ package io.airbyte.integrations.destination.s3_v2

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.object_storage.AvroFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider
import io.airbyte.cdk.load.command.object_storage.ParquetFormatConfiguration
import io.airbyte.cdk.load.data.DestinationRecordToAirbyteValueWithMeta
import io.airbyte.cdk.load.data.avro.toAvroRecord
import io.airbyte.cdk.load.data.avro.toAvroSchema
import io.airbyte.cdk.load.data.csv.toCsvRecord
import io.airbyte.cdk.load.data.json.toJson
import io.airbyte.cdk.load.file.avro.toAvroWriter
import io.airbyte.cdk.load.file.csv.toCsvPrinterWithHeader
import io.airbyte.cdk.load.file.object_storage.ObjectStorageFormattingWriterFactory
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
import io.airbyte.cdk.load.file.parquet.toParquetWriter
import io.airbyte.cdk.load.file.s3.S3Client
import io.airbyte.cdk.load.file.s3.S3Object
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.util.serializeToString
import io.airbyte.cdk.load.util.write
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import jakarta.inject.Singleton
Expand All @@ -35,8 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
class S3V2Writer(
private val s3Client: S3Client,
private val pathFactory: ObjectStoragePathFactory,
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta,
private val formatConfigProvider: ObjectStorageFormatConfigurationProvider
private val writerFactory: ObjectStorageFormattingWriterFactory,
) : DestinationWriter {
sealed interface S3V2Batch : Batch
data class StagedObject(
Expand All @@ -49,24 +34,13 @@ class S3V2Writer(
val s3Object: S3Object,
) : S3V2Batch

private val formatConfig = formatConfigProvider.objectStorageFormatConfiguration

override fun createStreamLoader(stream: DestinationStream): StreamLoader {
return S3V2StreamLoader(stream)
}

@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation")
inner class S3V2StreamLoader(override val stream: DestinationStream) : StreamLoader {
private val partNumber = AtomicLong(0L) // TODO: Get from destination state
private val avroSchema =
if (
formatConfig is AvroFormatConfiguration ||
formatConfig is ParquetFormatConfiguration
) {
stream.schemaWithMeta.toAvroSchema(stream.descriptor)
} else {
null
}

override suspend fun processRecords(
records: Iterator<DestinationRecord>,
Expand All @@ -76,55 +50,8 @@ class S3V2Writer(
val key = pathFactory.getPathToFile(stream, partNumber, isStaging = true).toString()
val s3Object =
s3Client.streamingUpload(key) { outputStream ->
when (formatConfig) {
is JsonFormatConfiguration -> {
records.forEach {
val serialized =
recordDecorator.decorate(it).toJson().serializeToString()
outputStream.write(serialized)
outputStream.write("\n")
}
}
is CSVFormatConfiguration -> {
stream.schemaWithMeta
.toCsvPrinterWithHeader(outputStream.writer())
.use { printer ->
records.forEach {
printer.printRecord(
*recordDecorator.decorate(it).toCsvRecord()
)
}
}
}
is AvroFormatConfiguration -> {
outputStream
.toAvroWriter(
avroSchema!!,
formatConfig.avroCompressionConfiguration
)
.use { writer ->
records.forEach {
writer.write(
recordDecorator.decorate(it).toAvroRecord(avroSchema)
)
}
}
}
is ParquetFormatConfiguration -> {
outputStream
.toParquetWriter(
avroSchema!!,
formatConfig.parquetWriterConfiguration
)
.use { writer ->
records.forEach {
writer.write(
recordDecorator.decorate(it).toAvroRecord(avroSchema)
)
}
}
}
else -> throw IllegalStateException("Unsupported format")
writerFactory.create(stream, outputStream).use { writer ->
records.forEach { writer.accept(it) }
}
}
return StagedObject(s3Object = s3Object, partNumber = partNumber)
Expand Down

0 comments on commit 9106d24

Please sign in to comment.