Skip to content

Commit

Permalink
destination-s3: add file transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Oct 3, 2024
1 parent 52c0fe1 commit 2dd1a87
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class BufferDequeue(

// otherwise pull records until we hit the memory limit.
val newSize: Long = (memoryItem.size) + bytesRead.get()
if (newSize <= optimalBytesToRead) {
if (newSize <= optimalBytesToRead || output.isEmpty()) {
memoryItem.size.let { bytesRead.addAndGet(it) }
queue.poll()?.item?.let { output.add(it) }
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,35 @@ class AirbyteMessageDeserializer(

return partial
}

fun checkRecordMessageValid(partial: PartialAirbyteMessage) {
if (partial.record?.data != null) {
if (hasSeenRecordsWithFile) {
throw RuntimeException()
} else {
synchronized(javaClass) {
if (hasSeenRecordsWithFile) {
throw RuntimeException()
}
hasSeenRecordsWithData = true
}
}
} else {
if (hasSeenRecordsWithData) {
throw RuntimeException()
} else {
synchronized(javaClass) {
if (hasSeenRecordsWithData) {
throw RuntimeException()
}
hasSeenRecordsWithFile = true
}
}
}
}

companion object {
var hasSeenRecordsWithData = false
var hasSeenRecordsWithFile = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ class PartialAirbyteRecordMessage {
@JsonProperty("meta")
var meta: AirbyteRecordMessageMeta? = null

@get:JsonProperty("file")
@set:JsonProperty("file")
@JsonProperty("file")
var file: String? = null

fun withNamespace(namespace: String?): PartialAirbyteRecordMessage {
this.namespace = namespace
return this
Expand All @@ -66,6 +71,11 @@ class PartialAirbyteRecordMessage {
return this
}

fun withFile(file: String): PartialAirbyteRecordMessage {
this.file = file
return this
}

override fun equals(other: Any?): Boolean {
if (this === other) {
return true
Expand Down Expand Up @@ -98,6 +108,9 @@ class PartialAirbyteRecordMessage {
", meta='" +
meta +
'\'' +
", file='" +
file +
'\'' +
'}'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ enum class FileUploadFormat(val fileExtension: String) {
CSV("csv"),
JSONL("jsonl"),
PARQUET("parquet"),
RAW_FILES("raw_files")
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ package io.airbyte.cdk.integrations.destination.s3

import com.fasterxml.jackson.databind.JsonNode
import com.google.common.base.Preconditions
import io.airbyte.cdk.integrations.BaseConnector
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction
Expand All @@ -27,6 +30,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.concurrent.Executors
import java.util.function.Consumer
import java.util.function.Function
import java.util.stream.Stream
import org.joda.time.DateTime
import org.joda.time.DateTimeZone

Expand Down Expand Up @@ -203,15 +207,6 @@ class S3ConsumerFactory {
descriptor to Pair(stream.generationId, stream.syncId)
}

val createFunction =
getCreateFunction(
s3Config,
Function<String, BufferStorage> { fileExtension: String ->
FileBuffer(fileExtension)
},
useV2FieldNames = true
)

// Parquet has significantly higher overhead. This small adjustment
// results in a ~5x performance improvement.
val adjustedMemoryRatio =
Expand All @@ -221,25 +216,73 @@ class S3ConsumerFactory {
memoryRatio
}

// This needs to be called before the creation of the flush function because it updates
// writeConfigs!
val onStartFunction = onStartFunction(storageOps, writeConfigs)

val streamDescriptorToWriteConfig =
writeConfigs.associateBy {
StreamDescriptor().withNamespace(it.namespace).withName(it.streamName)
}
val flushFunction =
if (s3Config.formatConfig.format == FileUploadFormat.RAW_FILES) {
object : DestinationFlushFunction {
override fun flush(
streamDescriptor: StreamDescriptor,
stream: Stream<PartialAirbyteMessage>
) {
val records = stream.toList()
val writeConfig = streamDescriptorToWriteConfig.getValue(streamDescriptor)
if (records.isEmpty()) {
return
}
if (records.size > 1) {
throw RuntimeException(
"the destinationFlushFunction for RAW_FILES should be called with only 1 record"
)
}
val relativePath = records[0].record!!.file!!
storageOps.loadDataIntoBucket(
fullObjectKey = relativePath,
dataFilePath =
BaseConnector.FILE_TRANSFER_DIRECTORY.resolve(relativePath),
generationId = writeConfig.generationId
)
}

override val optimalBatchSizeBytes: Long = 1L
}
} else {
val createFunction =
getCreateFunction(
s3Config,
Function<String, BufferStorage> { fileExtension: String ->
FileBuffer(fileExtension)
},
useV2FieldNames = true
)
S3DestinationFlushFunction(
// Ensure the file buffer is always larger than the memory buffer,
// as the file buffer will be flushed at the end of the memory flush.
optimalBatchSizeBytes =
(FileBuffer.MAX_PER_STREAM_BUFFER_SIZE_BYTES * 0.9).toLong(),
{
// Yield a new BufferingStrategy every time we flush (for thread-safety).
SerializedBufferingStrategy(
createFunction,
catalog,
flushBufferFunction(storageOps, writeConfigs, catalog)
)
},
generationAndSyncIds
)
}

return AsyncStreamConsumer(
outputRecordCollector,
onStartFunction(storageOps, writeConfigs),
onStartFunction,
onCloseFunction(storageOps, writeConfigs),
S3DestinationFlushFunction(
// Ensure the file buffer is always larger than the memory buffer,
// as the file buffer will be flushed at the end of the memory flush.
optimalBatchSizeBytes =
(FileBuffer.MAX_PER_STREAM_BUFFER_SIZE_BYTES * 0.9).toLong(),
{
// Yield a new BufferingStrategy every time we flush (for thread-safety).
SerializedBufferingStrategy(
createFunction,
catalog,
flushBufferFunction(storageOps, writeConfigs, catalog)
)
},
generationAndSyncIds
),
flushFunction,
catalog,
// S3 has no concept of default namespace
// In the "namespace from destination case", the namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ class S3DestinationFlushFunction(
strategyProvider().use { strategy ->
for (partialMessage in stream) {
val partialRecord = partialMessage.record!!
val data =
/**
* This should always be null, but if something changes upstream to trigger a clone
* of the record, then `null` becomes `JsonNull` and `data == null` goes from `true`
* to `false`
*/
if (partialRecord.data == null || partialRecord.data!!.isNull) {
val data =
if (partialRecord.data == null || partialRecord.data!!.isNull) {
Jsons.deserialize(partialMessage.serialized)
} else {
partialRecord.data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFact
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil
import io.airbyte.commons.exceptions.ConfigErrorException
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.IOException
import java.io.OutputStream
import java.io.*
import java.nio.file.Files
import java.nio.file.Path
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
Expand Down Expand Up @@ -194,15 +195,27 @@ open class S3StorageOperations(
} while (objectNameByPrefix.getValue(objectPath).contains(fullObjectKey))
return fullObjectKey
}

@Throws(IOException::class)
private fun loadDataIntoBucket(
objectPath: String,
recordsData: SerializableBuffer,
generationId: Long
): String {
val fullObjectKey: String = getFileName(objectPath, recordsData)
val dataFileName = recordsData.file?.toPath()!!
return loadDataIntoBucket(fullObjectKey, dataFileName, generationId)
}

@Throws(IOException::class)
public fun loadDataIntoBucket(
fullObjectKey: String,
dataFilePath: Path,
generationId: Long
): String {
val partSize: Long = DEFAULT_PART_SIZE.toLong()
val bucket: String? = s3Config.bucketName
val fullObjectKey: String = getFileName(objectPath, recordsData)

val metadata: MutableMap<String, String> = HashMap()
for (blobDecorator: BlobDecorator in blobDecorators) {
blobDecorator.updateMetadata(metadata, getMetadataMapping())
Expand Down Expand Up @@ -232,13 +245,13 @@ open class S3StorageOperations(

try {
rawOutputStream.use { outputStream ->
recordsData.inputStream!!.use { dataStream ->
Files.newInputStream(dataFilePath).use { dataStream ->
dataStream.transferTo(outputStream)
succeeded = true
}
}
} catch (e: Exception) {
logger.error(e) { "Failed to load data into storage $objectPath" }
logger.error(e) { "Failed to load data into storage $fullObjectKey" }
throw RuntimeException(e)
} finally {
if (!succeeded) {
Expand All @@ -253,7 +266,7 @@ open class S3StorageOperations(
}
val newFilename: String = getFilename(fullObjectKey)
logger.info {
"Uploaded buffer file to storage: ${recordsData.filename} -> $fullObjectKey (filename: $newFilename)"
"Uploaded buffer file to storage: $dataFilePath -> $fullObjectKey (filename: $newFilename)"
}
return newFilename
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ object UploadFormatConfigFactory {
FileUploadFormat.PARQUET -> {
UploadParquetFormatConfig(formatConfig)
}
FileUploadFormat.RAW_FILES ->
object : UploadFormatConfig {
override val format: FileUploadFormat = FileUploadFormat.RAW_FILES
override val fileExtension: String = "DUMMY"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,37 @@
"default": true
}
}
},
{
"title": "Raw Files",
"required": [
"format_type"
],
"properties": {
"format_type": {
"title": "Format Type",
"type": "string",
"enum": [
"RAW_FILES"
],
"default": "RAW_FILES"
},
"compression_codec": {
"title": "Compression Codec",
"description": "The compression algorithm used to compress data pages.",
"type": "string",
"enum": [
"UNCOMPRESSED",
"SNAPPY",
"GZIP",
"LZO",
"BROTLI",
"LZ4",
"ZSTD"
],
"default": "UNCOMPRESSED"
}
}
}
],
"order": 6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.integrations.destination.s3
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.S3BaseCsvDestinationAcceptanceTest
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import org.junit.jupiter.api.Test

class S3CsvDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() {
override fun getProtocolVersion(): ProtocolVersion {
Expand All @@ -14,4 +15,9 @@ class S3CsvDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() {

override val baseConfigJson: JsonNode
get() = S3DestinationTestUtils.baseConfigJsonFilePath

@Test
override fun testAirbyteTimeTypes() {
super.testAirbyteTimeTypes()
}
}
Loading

0 comments on commit 2dd1a87

Please sign in to comment.