Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Load CDK: Nop Refactor: Formatted object writes to toolkit #47382

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ interface AirbyteSchemaIdentityMapper {
fun mapTimeTypeWithoutTimezone(schema: TimeTypeWithoutTimezone): AirbyteType = schema
fun mapTimestampTypeWithTimezone(schema: TimestampTypeWithTimezone): AirbyteType = schema
fun mapTimestampTypeWithoutTimezone(schema: TimestampTypeWithoutTimezone): AirbyteType = schema

fun mapUnknown(schema: UnknownType): AirbyteType = schema
fun mapField(field: FieldType): FieldType = FieldType(map(field.type), field.nullable)
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ open class AirbyteValueIdentityMapper(
is TimestampTypeWithoutTimezone ->
mapTimestampWithoutTimezone(value as TimestampValue, path)
is NullType -> mapNull(path)
is UnknownType -> mapUnknown(value as UnknownValue, path)
is UnknownType -> {
collectFailure(path)
mapNull(path)
}
}
} catch (e: Exception) {
collectFailure(path)
Expand Down Expand Up @@ -111,6 +114,4 @@ open class AirbyteValueIdentityMapper(
value

open fun mapNull(path: List<String>): AirbyteValue = NullValue

open fun mapUnknown(value: UnknownValue, path: List<String>): AirbyteValue = value
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ package io.airbyte.cdk.load.file.csv

import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.csv.toCsvHeader
import java.io.Writer
import java.io.OutputStream
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVPrinter

fun ObjectType.toCsvPrinterWithHeader(writer: Writer): CSVPrinter =
CSVFormat.Builder.create().setHeader(*toCsvHeader()).setAutoFlush(true).build().print(writer)
fun ObjectType.toCsvPrinterWithHeader(outputStream: OutputStream): CSVPrinter =
CSVFormat.Builder.create()
.setHeader(*toCsvHeader())
.setAutoFlush(true)
.build()
.print(outputStream.writer(charset = Charsets.UTF_8))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious: was the default charset breaking something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I know of, but have we even migrated the tests with special strings yet? Regardless, I'm erring on the side of being explicit and also following what we do in the old CDK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/airbytehq/airbyte/pull/46954/files#diff-d2107d506de1477d1d0df842ecf42a54dda1290922cb348991816aa14598222aR420 went in last week

(though I think we're almost at the point where you can copy the old DATs to s2-v2? which would give us equivalent coverage with the old connector, plus the extra stuff in the new test suite)

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ interface ObjectStorageClient<T : RemoteObject<*>> {
suspend fun <U> get(key: String, block: (InputStream) -> U): U
suspend fun put(key: String, bytes: ByteArray): T
suspend fun delete(remoteObject: T)

/**
* Streaming upload should provide an [OutputStream] managed within the lifecycle of [block].
* The stream should be closed after the block completes, however it should be safe for users of
* the stream to close early (some writers do this by default, especially those that write whole
* files). Specifically, the method should guarantee that no operations will be performed on the
* stream after [block] completes.
*/
suspend fun streamingUpload(key: String, block: suspend (OutputStream) -> Unit): T =
streamingUpload(key, NoopProcessor, block)
suspend fun <V : OutputStream> streamingUpload(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.file.object_storage

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.object_storage.AvroFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider
import io.airbyte.cdk.load.command.object_storage.ParquetFormatConfiguration
import io.airbyte.cdk.load.data.DestinationRecordToAirbyteValueWithMeta
import io.airbyte.cdk.load.data.avro.toAvroRecord
import io.airbyte.cdk.load.data.avro.toAvroSchema
import io.airbyte.cdk.load.data.csv.toCsvRecord
import io.airbyte.cdk.load.data.json.toJson
import io.airbyte.cdk.load.file.avro.toAvroWriter
import io.airbyte.cdk.load.file.csv.toCsvPrinterWithHeader
import io.airbyte.cdk.load.file.parquet.toParquetWriter
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.util.serializeToString
import io.airbyte.cdk.load.util.write
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.io.Closeable
import java.io.OutputStream

interface ObjectStorageFormattingWriter : Closeable {
fun accept(record: DestinationRecord)
}

@Singleton
@Secondary
class ObjectStorageFormattingWriterFactory(
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta,
private val formatConfigProvider: ObjectStorageFormatConfigurationProvider,
) {
fun create(
stream: DestinationStream,
outputStream: OutputStream
): ObjectStorageFormattingWriter {
return when (formatConfigProvider.objectStorageFormatConfiguration) {
is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator)
is AvroFormatConfiguration ->
AvroFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as AvroFormatConfiguration,
recordDecorator
)
is ParquetFormatConfiguration ->
ParquetFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as ParquetFormatConfiguration,
recordDecorator
)
is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator)
}
Comment on lines +43 to +62
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure exhaustive when expression to handle all format configurations.

The when expression in the create function does not include an else branch. If objectStorageFormatConfiguration is of an unexpected type, the code may throw an exception at runtime. Consider adding an else branch to handle unforeseen formats and provide a meaningful error message.

Apply this diff to handle unexpected formats:

         return when (formatConfigProvider.objectStorageFormatConfiguration) {
             is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator)
             is AvroFormatConfiguration ->
                 AvroFormattingWriter(
                     stream,
                     outputStream,
                     formatConfigProvider.objectStorageFormatConfiguration
                         as AvroFormatConfiguration,
                     recordDecorator
                 )
             is ParquetFormatConfiguration ->
                 ParquetFormattingWriter(
                     stream,
                     outputStream,
                     formatConfigProvider.objectStorageFormatConfiguration
                         as ParquetFormatConfiguration,
                     recordDecorator
                 )
             is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator)
+            else -> throw IllegalArgumentException("Unsupported format configuration: ${formatConfigProvider.objectStorageFormatConfiguration::class.simpleName}")
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return when (formatConfigProvider.objectStorageFormatConfiguration) {
is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator)
is AvroFormatConfiguration ->
AvroFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as AvroFormatConfiguration,
recordDecorator
)
is ParquetFormatConfiguration ->
ParquetFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as ParquetFormatConfiguration,
recordDecorator
)
is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator)
}
return when (formatConfigProvider.objectStorageFormatConfiguration) {
is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator)
is AvroFormatConfiguration ->
AvroFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as AvroFormatConfiguration,
recordDecorator
)
is ParquetFormatConfiguration ->
ParquetFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as ParquetFormatConfiguration,
recordDecorator
)
is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator)
else -> throw IllegalArgumentException("Unsupported format configuration: ${formatConfigProvider.objectStorageFormatConfiguration::class.simpleName}")
}

}
}

class JsonFormattingWriter(
private val outputStream: OutputStream,
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta
) : ObjectStorageFormattingWriter {
override fun accept(record: DestinationRecord) {
outputStream.write(recordDecorator.decorate(record).toJson().serializeToString())
}

override fun close() {
outputStream.close()
}
}

class CSVFormattingWriter(
stream: DestinationStream,
outputStream: OutputStream,
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta
) : ObjectStorageFormattingWriter {
private val printer = stream.schemaWithMeta.toCsvPrinterWithHeader(outputStream)
override fun accept(record: DestinationRecord) {
printer.printRecord(*recordDecorator.decorate(record).toCsvRecord())
}
override fun close() {
printer.close()
}
}

class AvroFormattingWriter(
stream: DestinationStream,
outputStream: OutputStream,
formatConfig: AvroFormatConfiguration,
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta
) : ObjectStorageFormattingWriter {
private val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor)
private val writer =
outputStream.toAvroWriter(avroSchema, formatConfig.avroCompressionConfiguration)
override fun accept(record: DestinationRecord) {
writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema))
}

override fun close() {
writer.close()
}
}

class ParquetFormattingWriter(
stream: DestinationStream,
outputStream: OutputStream,
formatConfig: ParquetFormatConfiguration,
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta
) : ObjectStorageFormattingWriter {
private val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor)
private val writer =
outputStream.toParquetWriter(avroSchema, formatConfig.parquetWriterConfiguration)
override fun accept(record: DestinationRecord) {
writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema))
}

override fun close() {
writer.close()
}
}
Comment on lines +93 to +127
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor to reduce duplication between AvroFormattingWriter and ParquetFormattingWriter.

Both AvroFormattingWriter and ParquetFormattingWriter share similar code for schema creation and record writing. To enhance maintainability and reduce duplication, consider creating a common abstract base class or utilizing composition to share the common functionality.

Here's an example of how you might abstract the common code:

abstract class AvroBasedFormattingWriter(
    stream: DestinationStream,
    outputStream: OutputStream,
    formatConfig: Any,
    private val recordDecorator: DestinationRecordToAirbyteValueWithMeta
) : ObjectStorageFormattingWriter {
    protected val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor)
    protected val writer: SpecificDataWriter = createWriter(outputStream, formatConfig)

    abstract fun createWriter(outputStream: OutputStream, formatConfig: Any): SpecificDataWriter

    override fun accept(record: DestinationRecord) {
        writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema))
    }

    override fun close() {
        writer.close()
    }
}

Then, AvroFormattingWriter and ParquetFormattingWriter can extend AvroBasedFormattingWriter and implement the createWriter method accordingly.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.1.12
dockerImageTag: 0.1.13
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Missing changelog for version bump

The version has been bumped from 0.1.12 to 0.1.13, but there is no CHANGELOG.md file in the destination-s3-v2 connector directory to document these changes. A changelog entry should be added to track:

  • The internal refactoring of formatting writers
  • Moving formatting writers to toolkit
  • Changes to S3V2Writer.kt
🔗 Analysis chain

LGTM! Version bump is appropriate.

The increment from 0.1.12 to 0.1.13 aligns with the internal refactoring changes to move formatting writers into the toolkit.

Let's verify the changelog entry exists for this version:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if a changelog entry exists for version 0.1.13
# Expected: Find an entry documenting the formatting writers refactoring

rg -A 5 "0\.1\.13" airbyte-integrations/connectors/destination-s3-v2/CHANGELOG.md || echo "No changelog entry found"

Length of output: 245


Script:

#!/bin/bash
# Let's verify if the changelog exists in a different location or with a different name
fd -g "*CHANGELOG*" -g "!node_modules" airbyte-integrations/connectors/destination-s3-v2/

Length of output: 151


Script:

#!/bin/bash
# Let's try a different approach to find any changelog files
fd CHANGELOG --type f

Length of output: 310

dockerRepository: airbyte/destination-s3-v2
githubIssueLabel: destination-s3-v2
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,12 @@ package io.airbyte.integrations.destination.s3_v2

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.object_storage.AvroFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider
import io.airbyte.cdk.load.command.object_storage.ParquetFormatConfiguration
import io.airbyte.cdk.load.data.DestinationRecordToAirbyteValueWithMeta
import io.airbyte.cdk.load.data.avro.toAvroRecord
import io.airbyte.cdk.load.data.avro.toAvroSchema
import io.airbyte.cdk.load.data.csv.toCsvRecord
import io.airbyte.cdk.load.data.json.toJson
import io.airbyte.cdk.load.file.avro.toAvroWriter
import io.airbyte.cdk.load.file.csv.toCsvPrinterWithHeader
import io.airbyte.cdk.load.file.object_storage.ObjectStorageFormattingWriterFactory
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
import io.airbyte.cdk.load.file.parquet.toParquetWriter
import io.airbyte.cdk.load.file.s3.S3Client
import io.airbyte.cdk.load.file.s3.S3Object
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.util.serializeToString
import io.airbyte.cdk.load.util.write
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import jakarta.inject.Singleton
Expand All @@ -35,8 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
class S3V2Writer(
private val s3Client: S3Client,
private val pathFactory: ObjectStoragePathFactory,
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta,
private val formatConfigProvider: ObjectStorageFormatConfigurationProvider
private val writerFactory: ObjectStorageFormattingWriterFactory,
) : DestinationWriter {
sealed interface S3V2Batch : Batch
data class StagedObject(
Expand All @@ -49,24 +34,13 @@ class S3V2Writer(
val s3Object: S3Object,
) : S3V2Batch

private val formatConfig = formatConfigProvider.objectStorageFormatConfiguration

override fun createStreamLoader(stream: DestinationStream): StreamLoader {
return S3V2StreamLoader(stream)
}

@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation")
inner class S3V2StreamLoader(override val stream: DestinationStream) : StreamLoader {
private val partNumber = AtomicLong(0L) // TODO: Get from destination state
private val avroSchema =
if (
formatConfig is AvroFormatConfiguration ||
formatConfig is ParquetFormatConfiguration
) {
stream.schemaWithMeta.toAvroSchema(stream.descriptor)
} else {
null
}

override suspend fun processRecords(
records: Iterator<DestinationRecord>,
Expand All @@ -76,55 +50,8 @@ class S3V2Writer(
val key = pathFactory.getPathToFile(stream, partNumber, isStaging = true).toString()
val s3Object =
s3Client.streamingUpload(key) { outputStream ->
when (formatConfig) {
is JsonFormatConfiguration -> {
records.forEach {
val serialized =
recordDecorator.decorate(it).toJson().serializeToString()
outputStream.write(serialized)
outputStream.write("\n")
}
}
is CSVFormatConfiguration -> {
stream.schemaWithMeta
.toCsvPrinterWithHeader(outputStream.writer())
.use { printer ->
records.forEach {
printer.printRecord(
*recordDecorator.decorate(it).toCsvRecord()
)
}
}
}
is AvroFormatConfiguration -> {
outputStream
.toAvroWriter(
avroSchema!!,
formatConfig.avroCompressionConfiguration
)
.use { writer ->
records.forEach {
writer.write(
recordDecorator.decorate(it).toAvroRecord(avroSchema)
)
}
}
}
is ParquetFormatConfiguration -> {
outputStream
.toParquetWriter(
avroSchema!!,
formatConfig.parquetWriterConfiguration
)
.use { writer ->
records.forEach {
writer.write(
recordDecorator.decorate(it).toAvroRecord(avroSchema)
)
}
}
}
else -> throw IllegalStateException("Unsupported format")
writerFactory.create(stream, outputStream).use { writer ->
records.forEach { writer.accept(it) }
Comment on lines +53 to +54
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle exceptions during record processing to enhance robustness

While the use block ensures that the writer resource is properly closed, exceptions during writer.accept(it) could lead to incomplete data processing or unexpected behavior. Consider adding exception handling to manage potential errors during record processing.

Apply this diff to wrap the record processing in a try-catch block:

 writerFactory.create(stream, outputStream).use { writer ->
+    try {
         records.forEach { writer.accept(it) }
+    } catch (e: Exception) {
+        // Handle the exception appropriately, e.g., log and rethrow
+        throw e
+    }
 }

This addition ensures that any exceptions are caught and can be handled or logged, preventing silent failures and improving the reliability of the data writing process.

Committable suggestion was skipped due to low confidence.

}
}
return StagedObject(s3Object = s3Object, partNumber = partNumber)
Expand Down
Loading