Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Prerelease S3V2 Connector #48519

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ interface QueueWriter<T> : CloseableCoroutine {
interface MessageQueue<T> : QueueReader<T>, QueueWriter<T>

abstract class ChannelMessageQueue<T> : MessageQueue<T> {
open val channel = Channel<T>(Channel.UNLIMITED)
open val channel: Channel<T> = Channel(Channel.UNLIMITED)

override suspend fun publish(message: T) = channel.send(message)
override suspend fun consume(): Flow<T> = channel.receiveAsFlow()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import java.io.File
import java.io.OutputStream
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicLong
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.take

@Singleton
@Secondary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.ByteArrayOutputStream
import java.io.OutputStream
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import kotlin.time.measureTime
import kotlin.time.measureTimedValue
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -51,6 +54,7 @@ class S3MultipartUpload<T : OutputStream>(
private val wrappingBuffer = streamProcessor.wrapper(underlyingBuffer)
private val partQueue = Channel<ByteArray>(Channel.UNLIMITED)
private val isClosed = AtomicBoolean(false)
private val channelSize = AtomicLong(0L)

/**
* Run the upload using the provided block. This should only be used by the
Expand All @@ -73,6 +77,7 @@ class S3MultipartUpload<T : OutputStream>(
launch {
val uploadedParts = mutableListOf<CompletedPart>()
for (bytes in partQueue) {
channelSize.decrementAndGet()
val part = uploadPart(bytes, uploadedParts)
uploadedParts.add(part)
}
Expand Down Expand Up @@ -117,7 +122,11 @@ class S3MultipartUpload<T : OutputStream>(
wrappingBuffer.flush()
val bytes = underlyingBuffer.toByteArray()
underlyingBuffer.reset()
runBlocking { partQueue.send(bytes) }
channelSize.incrementAndGet()
val duration = measureTime { runBlocking { partQueue.send(bytes) } }
log.info {
"Enqueued part in $duration (channelSize = ${channelSize.get()}; uploadId = ${response.uploadId})"
}
}

private suspend fun uploadPart(
Expand All @@ -132,10 +141,13 @@ class S3MultipartUpload<T : OutputStream>(
body = ByteStream.fromBytes(bytes)
this.partNumber = partNumber
}
val uploadResponse = client.uploadPart(request)
val uploadResponse = measureTimedValue { client.uploadPart(request) }
log.info {
"Uploaded part $partNumber in ${uploadResponse.duration} (channelSize = ${channelSize.get()}; uploadId = ${response.uploadId})"
}
return CompletedPart {
this.partNumber = partNumber
this.eTag = uploadResponse.eTag
this.eTag = uploadResponse.value.eTag
}
}

Expand Down
16 changes: 8 additions & 8 deletions airbyte-integrations/connectors/destination-s3-v2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ application {
// Uncomment to run locally:
// '--add-opens', 'java.base/java.lang=ALL-UNNAMED'
// Uncomment to enable remote profiling:
// '-XX:NativeMemoryTracking=detail',
// '-Djava.rmi.server.hostname=localhost',
// '-Dcom.sun.management.jmxremote=true',
// '-Dcom.sun.management.jmxremote.port=6000',
// '-Dcom.sun.management.jmxremote.rmi.port=6000',
// '-Dcom.sun.management.jmxremote.local.only=false',
// '-Dcom.sun.management.jmxremote.authenticate=false',
// '-Dcom.sun.management.jmxremote.ssl=false'
'-XX:NativeMemoryTracking=detail',
'-Djava.rmi.server.hostname=localhost',
'-Dcom.sun.management.jmxremote=true',
'-Dcom.sun.management.jmxremote.port=6000',
'-Dcom.sun.management.jmxremote.rmi.port=6000',
'-Dcom.sun.management.jmxremote.local.only=false',
'-Dcom.sun.management.jmxremote.authenticate=false',
'-Dcom.sun.management.jmxremote.ssl=false'
]
}

Expand Down
35 changes: 24 additions & 11 deletions airbyte-integrations/connectors/destination-s3-v2/metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,27 +1,40 @@
data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.3.4
dockerRepository: airbyte/destination-s3-v2
githubIssueLabel: destination-s3-v2
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.5.0
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
license: ELv2
name: S3 V2 Destination
name: S3
registryOverrides:
cloud:
enabled: false
enabled: true
oss:
enabled: false
releaseStage: alpha
enabled: true
releaseStage: generally_available
releases:
breakingChanges:
1.0.0:
message: >
**This release includes breaking changes, including major revisions to the schema of stored data. Do not upgrade without reviewing the migration guide.**
upgradeDeadline: "2024-10-08"
resourceRequirements:
jobSpecific:
- jobType: sync
resourceRequirements:
memory_limit: 2Gi
memory_request: 2Gi
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3
tags:
- language:java
ab_internal:
sl: 100
ql: 100
supportLevel: community
sl: 300
ql: 300
supportLevel: certified
supportsRefreshes: true
supportsFileTransfer: true
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination.s3_v2

import com.fasterxml.jackson.annotation.JsonProperty
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import io.airbyte.cdk.command.ConfigurationSpecification
Expand Down Expand Up @@ -81,6 +82,12 @@ class S3V2Specification :
//
// @get:JsonSchemaInject(json = """{"examples":["__staging/data_sync/test"],"order":11}""")
// override val s3StagingPrefix: String? = null

@get:JsonProperty("num_process_records_workers")
val numProcessRecordsWorkers: Int? = 2

@get:JsonProperty("estimated_record_memory_overhead_ratio")
val estimatedRecordMemoryOverheadRatio: Double? = 5.0
}

@Singleton
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,8 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer |
| 1.5.0 | 2024-11-08 | []() | Migrate to Bulk Load CDK; adds opt-in support for staging |
| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer |
| 1.3.0 | 2024-09-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | fix tests |
| 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields |
| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams |
Expand Down
Loading