diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt index e9ae6c33e39f..d74a0056a5cf 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt @@ -23,7 +23,7 @@ interface QueueWriter : CloseableCoroutine { interface MessageQueue : QueueReader, QueueWriter abstract class ChannelMessageQueue : MessageQueue { - open val channel = Channel(Channel.UNLIMITED) + open val channel: Channel = Channel(Channel.UNLIMITED) override suspend fun publish(message: T) = channel.send(message) override suspend fun consume(): Flow = channel.receiveAsFlow() diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt index 5f362fcffa49..e1682dbe42ec 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt @@ -30,6 +30,9 @@ import java.io.File import java.io.OutputStream import java.nio.file.Path import java.util.concurrent.atomic.AtomicLong +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.take @Singleton @Secondary diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt index b12dab4b4c52..6cc75f27ac15 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt @@ -20,6 +20,9 @@ import io.github.oshai.kotlinlogging.KotlinLogging import java.io.ByteArrayOutputStream import java.io.OutputStream import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicLong +import kotlin.time.measureTime +import kotlin.time.measureTimedValue import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -51,6 +54,7 @@ class S3MultipartUpload( private val wrappingBuffer = streamProcessor.wrapper(underlyingBuffer) private val partQueue = Channel(Channel.UNLIMITED) private val isClosed = AtomicBoolean(false) + private val channelSize = AtomicLong(0L) /** * Run the upload using the provided block. This should only be used by the @@ -73,6 +77,7 @@ class S3MultipartUpload( launch { val uploadedParts = mutableListOf() for (bytes in partQueue) { + channelSize.decrementAndGet() val part = uploadPart(bytes, uploadedParts) uploadedParts.add(part) } @@ -117,7 +122,11 @@ class S3MultipartUpload( wrappingBuffer.flush() val bytes = underlyingBuffer.toByteArray() underlyingBuffer.reset() - runBlocking { partQueue.send(bytes) } + channelSize.incrementAndGet() + val duration = measureTime { runBlocking { partQueue.send(bytes) } } + log.info { + "Enqueued part in $duration (channelSize = ${channelSize.get()}; uploadId = ${response.uploadId})" + } } private suspend fun uploadPart( @@ -132,10 +141,13 @@ class S3MultipartUpload( body = ByteStream.fromBytes(bytes) this.partNumber = partNumber } - val uploadResponse = client.uploadPart(request) + val uploadResponse = measureTimedValue { client.uploadPart(request) } + log.info { + "Uploaded part $partNumber in ${uploadResponse.duration} (channelSize = ${channelSize.get()}; uploadId = ${response.uploadId})" + } return CompletedPart { this.partNumber = partNumber - this.eTag = uploadResponse.eTag + this.eTag = uploadResponse.value.eTag } } diff --git a/airbyte-integrations/connectors/destination-s3-v2/build.gradle b/airbyte-integrations/connectors/destination-s3-v2/build.gradle index d370a2d14c0c..a48674517442 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/build.gradle +++ b/airbyte-integrations/connectors/destination-s3-v2/build.gradle @@ -17,14 +17,14 @@ application { // Uncomment to run locally: // '--add-opens', 'java.base/java.lang=ALL-UNNAMED' // Uncomment to enable remote profiling: -// '-XX:NativeMemoryTracking=detail', -// '-Djava.rmi.server.hostname=localhost', -// '-Dcom.sun.management.jmxremote=true', -// '-Dcom.sun.management.jmxremote.port=6000', -// '-Dcom.sun.management.jmxremote.rmi.port=6000', -// '-Dcom.sun.management.jmxremote.local.only=false', -// '-Dcom.sun.management.jmxremote.authenticate=false', -// '-Dcom.sun.management.jmxremote.ssl=false' + '-XX:NativeMemoryTracking=detail', + '-Djava.rmi.server.hostname=localhost', + '-Dcom.sun.management.jmxremote=true', + '-Dcom.sun.management.jmxremote.port=6000', + '-Dcom.sun.management.jmxremote.rmi.port=6000', + '-Dcom.sun.management.jmxremote.local.only=false', + '-Dcom.sun.management.jmxremote.authenticate=false', + '-Dcom.sun.management.jmxremote.ssl=false' ] } diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index 2dfbda00c691..68ff9306d770 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -1,27 +1,40 @@ data: connectorSubtype: file connectorType: destination - definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.3.4 - dockerRepository: airbyte/destination-s3-v2 - githubIssueLabel: destination-s3-v2 + definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 + dockerImageTag: 1.5.0 + dockerRepository: airbyte/destination-s3 + githubIssueLabel: destination-s3 icon: s3.svg license: ELv2 - name: S3 V2 Destination + name: S3 registryOverrides: cloud: - enabled: false + enabled: true oss: - enabled: false - releaseStage: alpha + enabled: true + releaseStage: generally_available + releases: + breakingChanges: + 1.0.0: + message: > + **This release includes breaking changes, including major revisions to the schema of stored data. Do not upgrade without reviewing the migration guide.** + upgradeDeadline: "2024-10-08" + resourceRequirements: + jobSpecific: + - jobType: sync + resourceRequirements: + memory_limit: 2Gi + memory_request: 2Gi documentationUrl: https://docs.airbyte.com/integrations/destinations/s3 tags: - language:java ab_internal: - sl: 100 - ql: 100 - supportLevel: community + sl: 300 + ql: 300 + supportLevel: certified supportsRefreshes: true + supportsFileTransfer: true connectorTestSuitesOptions: - suite: unitTests - suite: integrationTests diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt index 536d5673d22e..8b216bbbfd0d 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.s3_v2 +import com.fasterxml.jackson.annotation.JsonProperty import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import io.airbyte.cdk.command.ConfigurationSpecification @@ -81,6 +82,12 @@ class S3V2Specification : // // @get:JsonSchemaInject(json = """{"examples":["__staging/data_sync/test"],"order":11}""") // override val s3StagingPrefix: String? = null + + @get:JsonProperty("num_process_records_workers") + val numProcessRecordsWorkers: Int? = 2 + + @get:JsonProperty("estimated_record_memory_overhead_ratio") + val estimatedRecordMemoryOverheadRatio: Double? = 5.0 } @Singleton diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index fda8dce24cb4..ebf35fe2dd3a 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -544,7 +544,8 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| -| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer | +| 1.5.0 | 2024-11-08 | []() | Migrate to Bulk Load CDK; adds opt-in support for staging | +| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer | | 1.3.0 | 2024-09-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | fix tests | | 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields | | 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams |