Skip to content

Commit

Permalink
[WIP] Prerelease S3V2 Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Dec 16, 2024
1 parent 052be6f commit ccf187b
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ interface QueueWriter<T> : CloseableCoroutine {
interface MessageQueue<T> : QueueReader<T>, QueueWriter<T>

abstract class ChannelMessageQueue<T> : MessageQueue<T> {
open val channel = Channel<T>(Channel.UNLIMITED)
open val channel: Channel<T> = Channel(Channel.UNLIMITED)

override suspend fun publish(message: T) = channel.send(message)
override suspend fun consume(): Flow<T> = channel.receiveAsFlow()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import java.io.File
import java.io.OutputStream
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicLong
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.take

@Singleton
@Secondary
Expand Down Expand Up @@ -84,6 +87,18 @@ class ObjectStorageStreamLoader<T : RemoteObject<*>, U : OutputStream>(
val nextPartNumber = state.nextPartNumber
log.info { "Got next part number from destination state: $nextPartNumber" }
partNumber.set(nextPartNumber)
if (stream.descriptor.name == "products") {
throw RuntimeException("Synthetic exception (product stream only)")
}
}

fun test(recordsIn: Flow<DestinationRecord>): Flow<Flow<DestinationRecord>> {
// Turn `recordsIn` into a series of (lazily evaluated flows) of
// 100 records each; NOTE: there is no `chunked` function available.
return flow {
val chunk = recordsIn.take(100)
emit(chunk)
}
}

override suspend fun processRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import java.io.ByteArrayOutputStream
import java.io.OutputStream
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import kotlin.time.measureTime
import kotlin.time.measureTimedValue
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -51,6 +54,7 @@ class S3MultipartUpload<T : OutputStream>(
private val wrappingBuffer = streamProcessor.wrapper(underlyingBuffer)
private val partQueue = Channel<ByteArray>(Channel.UNLIMITED)
private val isClosed = AtomicBoolean(false)
private val channelSize = AtomicLong(0L)

/**
* Run the upload using the provided block. This should only be used by the
Expand All @@ -73,6 +77,7 @@ class S3MultipartUpload<T : OutputStream>(
launch {
val uploadedParts = mutableListOf<CompletedPart>()
for (bytes in partQueue) {
channelSize.decrementAndGet()
val part = uploadPart(bytes, uploadedParts)
uploadedParts.add(part)
}
Expand Down Expand Up @@ -117,7 +122,11 @@ class S3MultipartUpload<T : OutputStream>(
wrappingBuffer.flush()
val bytes = underlyingBuffer.toByteArray()
underlyingBuffer.reset()
runBlocking { partQueue.send(bytes) }
channelSize.incrementAndGet()
val duration = measureTime { runBlocking { partQueue.send(bytes) } }
log.info {
"Enqueued part in $duration (channelSize = ${channelSize.get()}; uploadId = ${response.uploadId})"
}
}

private suspend fun uploadPart(
Expand All @@ -132,10 +141,13 @@ class S3MultipartUpload<T : OutputStream>(
body = ByteStream.fromBytes(bytes)
this.partNumber = partNumber
}
val uploadResponse = client.uploadPart(request)
val uploadResponse = measureTimedValue { client.uploadPart(request) }
log.info {
"Uploaded part $partNumber in ${uploadResponse.duration} (channelSize = ${channelSize.get()}; uploadId = ${response.uploadId})"
}
return CompletedPart {
this.partNumber = partNumber
this.eTag = uploadResponse.eTag
this.eTag = uploadResponse.value.eTag
}
}

Expand Down
16 changes: 8 additions & 8 deletions airbyte-integrations/connectors/destination-s3-v2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ application {
// Uncomment to run locally:
// '--add-opens', 'java.base/java.lang=ALL-UNNAMED'
// Uncomment to enable remote profiling:
// '-XX:NativeMemoryTracking=detail',
// '-Djava.rmi.server.hostname=localhost',
// '-Dcom.sun.management.jmxremote=true',
// '-Dcom.sun.management.jmxremote.port=6000',
// '-Dcom.sun.management.jmxremote.rmi.port=6000',
// '-Dcom.sun.management.jmxremote.local.only=false',
// '-Dcom.sun.management.jmxremote.authenticate=false',
// '-Dcom.sun.management.jmxremote.ssl=false'
'-XX:NativeMemoryTracking=detail',
'-Djava.rmi.server.hostname=localhost',
'-Dcom.sun.management.jmxremote=true',
'-Dcom.sun.management.jmxremote.port=6000',
'-Dcom.sun.management.jmxremote.rmi.port=6000',
'-Dcom.sun.management.jmxremote.local.only=false',
'-Dcom.sun.management.jmxremote.authenticate=false',
'-Dcom.sun.management.jmxremote.ssl=false'
]
}

Expand Down
35 changes: 24 additions & 11 deletions airbyte-integrations/connectors/destination-s3-v2/metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,27 +1,40 @@
data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.3.2
dockerRepository: airbyte/destination-s3-v2
githubIssueLabel: destination-s3-v2
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.5.0
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
license: ELv2
name: S3 V2 Destination
name: S3
registryOverrides:
cloud:
enabled: false
enabled: true
oss:
enabled: false
releaseStage: alpha
enabled: true
releaseStage: generally_available
releases:
breakingChanges:
1.0.0:
message: >
**This release includes breaking changes, including major revisions to the schema of stored data. Do not upgrade without reviewing the migration guide.**
upgradeDeadline: "2024-10-08"
resourceRequirements:
jobSpecific:
- jobType: sync
resourceRequirements:
memory_limit: 2Gi
memory_request: 2Gi
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3
tags:
- language:java
ab_internal:
sl: 100
ql: 100
supportLevel: community
sl: 300
ql: 300
supportLevel: certified
supportsRefreshes: true
supportsFileTransfer: true
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ data class S3V2Configuration<T : OutputStream>(
override val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration =
ObjectStorageUploadConfiguration(),
override val recordBatchSizeBytes: Long,
override val numProcessRecordsWorkers: Int = 2
override val numProcessRecordsWorkers: Int = 2,
override val estimatedRecordMemoryOverheadRatio: Double = 5.0
) :
DestinationConfiguration(),
AWSAccessKeyConfigurationProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination.s3_v2

import com.fasterxml.jackson.annotation.JsonProperty
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import io.airbyte.cdk.command.ConfigurationSpecification
Expand Down Expand Up @@ -81,6 +82,12 @@ class S3V2Specification :
//
// @get:JsonSchemaInject(json = """{"examples":["__staging/data_sync/test"],"order":11}""")
// override val s3StagingPrefix: String? = null

@get:JsonProperty("num_process_records_workers")
val numProcessRecordsWorkers: Int? = 2

@get:JsonProperty("estimated_record_memory_overhead_ratio")
val estimatedRecordMemoryOverheadRatio: Double? = 5.0
}

@Singleton
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,8 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer |
| 1.5.0 | 2024-11-08 | []() | Migrate to Bulk Load CDK; adds opt-in support for staging |
| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer |
| 1.3.0 | 2024-09-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | fix tests |
| 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields |
| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams |
Expand Down

0 comments on commit ccf187b

Please sign in to comment.