diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt index 31f80de484d5..60ab054a2fb6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt @@ -63,7 +63,7 @@ class BufferDequeue( // otherwise pull records until we hit the memory limit. val newSize: Long = (memoryItem.size) + bytesRead.get() - if (newSize <= optimalBytesToRead) { + if (newSize <= optimalBytesToRead || output.isEmpty()) { memoryItem.size.let { bytesRead.addAndGet(it) } queue.poll()?.item?.let { output.add(it) } } else { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/deser/AirbyteMessageDeserializer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/deser/AirbyteMessageDeserializer.kt index 6fa8bce200d9..b3dd2f349f84 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/deser/AirbyteMessageDeserializer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/deser/AirbyteMessageDeserializer.kt @@ -95,4 +95,35 @@ class AirbyteMessageDeserializer( return partial } + + fun checkRecordMessageValid(partial: PartialAirbyteMessage) { + if (partial.record?.data != null) { + if (hasSeenRecordsWithFile) { + throw RuntimeException() + } else { + synchronized(javaClass) { + if (hasSeenRecordsWithFile) { + throw RuntimeException() + } + hasSeenRecordsWithData = true + } + } + } else { + if (hasSeenRecordsWithData) { + throw RuntimeException() + } else { + synchronized(javaClass) { + if (hasSeenRecordsWithData) { + throw RuntimeException() + } + hasSeenRecordsWithFile = true + } + } + } + } + + companion object { + var hasSeenRecordsWithData = false + var hasSeenRecordsWithFile = false + } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteRecordMessage.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteRecordMessage.kt index fd26f6ad5747..4bc5b2312b2b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteRecordMessage.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteRecordMessage.kt @@ -41,6 +41,11 @@ class PartialAirbyteRecordMessage { @JsonProperty("meta") var meta: AirbyteRecordMessageMeta? = null + @get:JsonProperty("file") + @set:JsonProperty("file") + @JsonProperty("file") + var file: String? = null + fun withNamespace(namespace: String?): PartialAirbyteRecordMessage { this.namespace = namespace return this @@ -66,6 +71,11 @@ class PartialAirbyteRecordMessage { return this } + fun withFile(file: String): PartialAirbyteRecordMessage { + this.file = file + return this + } + override fun equals(other: Any?): Boolean { if (this === other) { return true @@ -98,6 +108,9 @@ class PartialAirbyteRecordMessage { ", meta='" + meta + '\'' + + ", file='" + + file + + '\'' + '}' } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/FileUploadFormat.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/FileUploadFormat.kt index 1a79a574d7d3..8527aa3f5b64 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/FileUploadFormat.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/FileUploadFormat.kt @@ -9,4 +9,5 @@ enum class FileUploadFormat(val fileExtension: String) { CSV("csv"), JSONL("jsonl"), PARQUET("parquet"), + RAW_FILES("raw_files") } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt index fdf25cf0c6f7..5dcf01a73f26 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt @@ -5,11 +5,14 @@ package io.airbyte.cdk.integrations.destination.s3 import com.fasterxml.jackson.databind.JsonNode import com.google.common.base.Preconditions +import io.airbyte.cdk.integrations.BaseConnector import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer import io.airbyte.cdk.integrations.destination.StreamSyncSummary import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager +import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction @@ -27,6 +30,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging import java.util.concurrent.Executors import java.util.function.Consumer import java.util.function.Function +import java.util.stream.Stream import org.joda.time.DateTime import org.joda.time.DateTimeZone @@ -203,15 +207,6 @@ class S3ConsumerFactory { descriptor to Pair(stream.generationId, stream.syncId) } - val createFunction = - getCreateFunction( - s3Config, - Function { fileExtension: String -> - FileBuffer(fileExtension) - }, - useV2FieldNames = true - ) - // Parquet has significantly higher overhead. This small adjustment // results in a ~5x performance improvement. val adjustedMemoryRatio = @@ -221,25 +216,73 @@ class S3ConsumerFactory { memoryRatio } + // This needs to be called before the creation of the flush function because it updates + // writeConfigs! + val onStartFunction = onStartFunction(storageOps, writeConfigs) + + val streamDescriptorToWriteConfig = + writeConfigs.associateBy { + StreamDescriptor().withNamespace(it.namespace).withName(it.streamName) + } + val flushFunction = + if (s3Config.formatConfig.format == FileUploadFormat.RAW_FILES) { + object : DestinationFlushFunction { + override fun flush( + streamDescriptor: StreamDescriptor, + stream: Stream + ) { + val records = stream.toList() + val writeConfig = streamDescriptorToWriteConfig.getValue(streamDescriptor) + if (records.isEmpty()) { + return + } + if (records.size > 1) { + throw RuntimeException( + "the destinationFlushFunction for RAW_FILES should be called with only 1 record" + ) + } + val relativePath = records[0].record!!.file!! + storageOps.loadDataIntoBucket( + fullObjectKey = relativePath, + dataFilePath = + BaseConnector.FILE_TRANSFER_DIRECTORY.resolve(relativePath), + generationId = writeConfig.generationId + ) + } + + override val optimalBatchSizeBytes: Long = 1L + } + } else { + val createFunction = + getCreateFunction( + s3Config, + Function { fileExtension: String -> + FileBuffer(fileExtension) + }, + useV2FieldNames = true + ) + S3DestinationFlushFunction( + // Ensure the file buffer is always larger than the memory buffer, + // as the file buffer will be flushed at the end of the memory flush. + optimalBatchSizeBytes = + (FileBuffer.MAX_PER_STREAM_BUFFER_SIZE_BYTES * 0.9).toLong(), + { + // Yield a new BufferingStrategy every time we flush (for thread-safety). + SerializedBufferingStrategy( + createFunction, + catalog, + flushBufferFunction(storageOps, writeConfigs, catalog) + ) + }, + generationAndSyncIds + ) + } + return AsyncStreamConsumer( outputRecordCollector, - onStartFunction(storageOps, writeConfigs), + onStartFunction, onCloseFunction(storageOps, writeConfigs), - S3DestinationFlushFunction( - // Ensure the file buffer is always larger than the memory buffer, - // as the file buffer will be flushed at the end of the memory flush. - optimalBatchSizeBytes = - (FileBuffer.MAX_PER_STREAM_BUFFER_SIZE_BYTES * 0.9).toLong(), - { - // Yield a new BufferingStrategy every time we flush (for thread-safety). - SerializedBufferingStrategy( - createFunction, - catalog, - flushBufferFunction(storageOps, writeConfigs, catalog) - ) - }, - generationAndSyncIds - ), + flushFunction, catalog, // S3 has no concept of default namespace // In the "namespace from destination case", the namespace diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt index cb89081ddd0b..038fd7bbcd8e 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt @@ -27,13 +27,13 @@ class S3DestinationFlushFunction( strategyProvider().use { strategy -> for (partialMessage in stream) { val partialRecord = partialMessage.record!! - val data = /** * This should always be null, but if something changes upstream to trigger a clone * of the record, then `null` becomes `JsonNull` and `data == null` goes from `true` * to `false` */ - if (partialRecord.data == null || partialRecord.data!!.isNull) { + val data = + if (partialRecord.data == null || partialRecord.data!!.isNull) { Jsons.deserialize(partialMessage.serialized) } else { partialRecord.data diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt index 894e53789973..2a27c1762efa 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt @@ -22,8 +22,9 @@ import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFact import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil import io.airbyte.commons.exceptions.ConfigErrorException import io.github.oshai.kotlinlogging.KotlinLogging -import java.io.IOException -import java.io.OutputStream +import java.io.* +import java.nio.file.Files +import java.nio.file.Path import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap @@ -194,15 +195,27 @@ open class S3StorageOperations( } while (objectNameByPrefix.getValue(objectPath).contains(fullObjectKey)) return fullObjectKey } + @Throws(IOException::class) private fun loadDataIntoBucket( objectPath: String, recordsData: SerializableBuffer, generationId: Long + ): String { + val fullObjectKey: String = getFileName(objectPath, recordsData) + val dataFileName = recordsData.file?.toPath()!! + return loadDataIntoBucket(fullObjectKey, dataFileName, generationId) + } + + @Throws(IOException::class) + public fun loadDataIntoBucket( + fullObjectKey: String, + dataFilePath: Path, + generationId: Long ): String { val partSize: Long = DEFAULT_PART_SIZE.toLong() val bucket: String? = s3Config.bucketName - val fullObjectKey: String = getFileName(objectPath, recordsData) + val metadata: MutableMap = HashMap() for (blobDecorator: BlobDecorator in blobDecorators) { blobDecorator.updateMetadata(metadata, getMetadataMapping()) @@ -232,13 +245,13 @@ open class S3StorageOperations( try { rawOutputStream.use { outputStream -> - recordsData.inputStream!!.use { dataStream -> + Files.newInputStream(dataFilePath).use { dataStream -> dataStream.transferTo(outputStream) succeeded = true } } } catch (e: Exception) { - logger.error(e) { "Failed to load data into storage $objectPath" } + logger.error(e) { "Failed to load data into storage $fullObjectKey" } throw RuntimeException(e) } finally { if (!succeeded) { @@ -253,7 +266,7 @@ open class S3StorageOperations( } val newFilename: String = getFilename(fullObjectKey) logger.info { - "Uploaded buffer file to storage: ${recordsData.filename} -> $fullObjectKey (filename: $newFilename)" + "Uploaded buffer file to storage: $dataFilePath -> $fullObjectKey (filename: $newFilename)" } return newFilename } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/UploadFormatConfigFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/UploadFormatConfigFactory.kt index 084126cce5bd..0f19eecac8ce 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/UploadFormatConfigFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/UploadFormatConfigFactory.kt @@ -36,6 +36,11 @@ object UploadFormatConfigFactory { FileUploadFormat.PARQUET -> { UploadParquetFormatConfig(formatConfig) } + FileUploadFormat.RAW_FILES -> + object : UploadFormatConfig { + override val format: FileUploadFormat = FileUploadFormat.RAW_FILES + override val fileExtension: String = "DUMMY" + } } } } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json index a9e9889df1c0..5e938a43f5fc 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json @@ -375,6 +375,37 @@ "default": true } } + }, + { + "title": "Raw Files", + "required": [ + "format_type" + ], + "properties": { + "format_type": { + "title": "Format Type", + "type": "string", + "enum": [ + "RAW_FILES" + ], + "default": "RAW_FILES" + }, + "compression_codec": { + "title": "Compression Codec", + "description": "The compression algorithm used to compress data pages.", + "type": "string", + "enum": [ + "UNCOMPRESSED", + "SNAPPY", + "GZIP", + "LZO", + "BROTLI", + "LZ4", + "ZSTD" + ], + "default": "UNCOMPRESSED" + } + } } ], "order": 6 diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.kt index 76ff3e210a9f..007f0fdea538 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.kt @@ -6,6 +6,7 @@ package io.airbyte.integrations.destination.s3 import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.integrations.destination.s3.S3BaseCsvDestinationAcceptanceTest import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion +import org.junit.jupiter.api.Test class S3CsvDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() { override fun getProtocolVersion(): ProtocolVersion { @@ -14,4 +15,9 @@ class S3CsvDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() { override val baseConfigJson: JsonNode get() = S3DestinationTestUtils.baseConfigJsonFilePath + + @Test + override fun testAirbyteTimeTypes() { + super.testAirbyteTimeTypes() + } } diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3FileTransferDestinationTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3FileTransferDestinationTest.kt new file mode 100644 index 000000000000..e97e5e5be3df --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3FileTransferDestinationTest.kt @@ -0,0 +1,94 @@ +package io.airbyte.integrations.destination.s3 + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.JsonNodeFactory +import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat +import io.airbyte.cdk.integrations.destination.s3.S3DestinationAcceptanceTest +import io.airbyte.cdk.integrations.destination.s3.util.Flattening +import io.airbyte.commons.json.Jsons +import io.airbyte.protocol.models.v0.* +import io.github.oshai.kotlinlogging.KotlinLogging +import java.time.Instant +import kotlin.io.path.createFile +import kotlin.io.path.writeText +import org.apache.commons.lang3.RandomStringUtils +import org.junit.jupiter.api.Test + +private val LOGGER = KotlinLogging.logger {} + +class S3FileTransferDestinationTest : S3DestinationAcceptanceTest(FileUploadFormat.RAW_FILES) { + override val formatConfig: JsonNode? + get() = + Jsons.jsonNode( + java.util.Map.of( + "format_type", + outputFormat, + "flattening", + Flattening.ROOT_LEVEL.value, + "compression", + Jsons.jsonNode(java.util.Map.of("compression_type", "No Compression")) + ) + ) + + override fun retrieveRecords( + testEnv: TestDestinationEnv?, + streamName: String, + namespace: String, + streamSchema: JsonNode + ): List { + TODO("Not yet implemented") + } + + @Test + open fun testFakeFileTransfer() { + fileTransferMountSource!!.resolve("fakeFile").createFile().writeText("file text content!!!") + LOGGER.info("/file-transfer/ is mounted from $fileTransferMountSource") + val streamSchema = JsonNodeFactory.instance.objectNode() + streamSchema.set("properties", JsonNodeFactory.instance.objectNode()) + val streamName = "str" + RandomStringUtils.randomAlphanumeric(5) + val catalog = + ConfiguredAirbyteCatalog() + .withStreams( + java.util.List.of( + ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) + .withGenerationId(0) + .withMinimumGenerationId(0) + .withSyncId(0) + .withStream( + AirbyteStream().withName(streamName).withJsonSchema(streamSchema) + ), + ), + ) + + val recordMessage = + AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord( + AirbyteRecordMessage() + .withStream(streamName) + .withEmittedAt(Instant.now().toEpochMilli()) + .withAdditionalProperty("file", "fakeFile") + ) + val streamCompleteMessage = + AirbyteMessage() + .withType(AirbyteMessage.Type.TRACE) + .withTrace( + AirbyteTraceMessage() + .withStreamStatus( + AirbyteStreamStatusTraceMessage() + .withStatus( + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE + ) + .withStreamDescriptor(StreamDescriptor().withName(streamName)) + ) + ) + runSyncAndVerifyStateOutput( + getConfig(), + listOf(recordMessage, streamCompleteMessage), + catalog, + false + ) + } +}