Skip to content

Commit

Permalink
[WIP] Prerelease S3V2 Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Nov 15, 2024
1 parent e47f10b commit d79bee4
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 19 deletions.
2 changes: 1 addition & 1 deletion airbyte-cdk/bulk/toolkits/load-csv/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')

api("org.apache.commons:commons-csv:1.10.0")
api("org.apache.commons:commons-csv:1.11.0")

testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:core:bulk-cdk-core-load"))
}
18 changes: 14 additions & 4 deletions airbyte-integrations/connectors/destination-s3-v2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,20 @@ airbyteBulkConnector {
application {
mainClass = 'io.airbyte.integrations.destination.s3_v2.S3V2Destination'

applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']

// Uncomment and replace to run locally
//applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0', '--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED', '--add-opens', 'java.base/sun.security.action=ALL-UNNAMED', '--add-opens', 'java.base/java.lang=ALL-UNNAMED']
applicationDefaultJvmArgs = [
'-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0',
// Uncomment to run locally:
// '--add-opens', 'java.base/java.lang=ALL-UNNAMED'
// Uncomment to enable remote profiling:
'-XX:NativeMemoryTracking=detail',
'-Djava.rmi.server.hostname=localhost',
'-Dcom.sun.management.jmxremote=true',
'-Dcom.sun.management.jmxremote.port=6000',
'-Dcom.sun.management.jmxremote.rmi.port=6000',
'-Dcom.sun.management.jmxremote.local.only=false',
'-Dcom.sun.management.jmxremote.authenticate=false',
'-Dcom.sun.management.jmxremote.ssl=false'
]
}

// Uncomment to run locally
Expand Down
35 changes: 24 additions & 11 deletions airbyte-integrations/connectors/destination-s3-v2/metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,27 +1,40 @@
data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.2.2
dockerRepository: airbyte/destination-s3-v2
githubIssueLabel: destination-s3-v2
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.5.0
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
license: ELv2
name: S3 V2 Destination
name: S3
registryOverrides:
cloud:
enabled: false
enabled: true
oss:
enabled: false
releaseStage: alpha
enabled: true
releaseStage: generally_available
releases:
breakingChanges:
1.0.0:
message: >
**This release includes breaking changes, including major revisions to the schema of stored data. Do not upgrade without reviewing the migration guide.**
upgradeDeadline: "2024-10-08"
resourceRequirements:
jobSpecific:
- jobType: sync
resourceRequirements:
memory_limit: 2Gi
memory_request: 2Gi
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3
tags:
- language:java
ab_internal:
sl: 100
ql: 100
supportLevel: community
sl: 300
ql: 300
supportLevel: certified
supportsRefreshes: true
supportsFileTransfer: true
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ data class S3V2Configuration<T : OutputStream>(
override val objectStorageFormatConfiguration: ObjectStorageFormatConfiguration,
override val objectStorageCompressionConfiguration: ObjectStorageCompressionConfiguration<T>,

// Internal configuration
// Temporary to expose internal config for tuning
override val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration,
override val recordBatchSizeBytes: Long = 200L * 1024 * 1024,
override val maxMessageQueueMemoryUsageRatio: Double = 0.2,
override val estimatedRecordMemoryOverheadRatio: Double = 1.1,
) :
DestinationConfiguration(),
AWSAccessKeyConfigurationProvider,
Expand All @@ -52,13 +54,16 @@ class S3V2ConfigurationFactory :
objectStoragePathConfiguration = pojo.toObjectStoragePathConfiguration(),
objectStorageFormatConfiguration = pojo.toObjectStorageFormatConfiguration(),
objectStorageCompressionConfiguration = pojo.toCompressionConfiguration(),
// Temporary to expose internal config for tuning
objectStorageUploadConfiguration =
ObjectStorageUploadConfiguration(
pojo.uploadPartSize
?: ObjectStorageUploadConfiguration.DEFAULT_STREAMING_UPLOAD_PART_SIZE,
pojo.maxConcurrentUploads
?: ObjectStorageUploadConfiguration.DEFAULT_MAX_NUM_CONCURRENT_UPLOADS
)
),
maxMessageQueueMemoryUsageRatio = pojo.maxMessageQueueMemoryUsageRatio ?: 0.2,
estimatedRecordMemoryOverheadRatio = pojo.estimatedRecordMemoryOverheadRatio ?: 1.1
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,16 @@ class S3V2Specification :
override val useStagingDirectory: Boolean? = null
override val s3StagingPrefix: String? = null

// Temporary to expose internal config for tuning
@JsonProperty("max_concurrent_uploads")
val maxConcurrentUploads: Int? =
ObjectStorageUploadConfiguration.DEFAULT_MAX_NUM_CONCURRENT_UPLOADS
@JsonProperty("upload_part_size")
val uploadPartSize: Long? = ObjectStorageUploadConfiguration.DEFAULT_STREAMING_UPLOAD_PART_SIZE
@JsonProperty("max_message_queue_memory_usage_ratio")
val maxMessageQueueMemoryUsageRatio: Double? = 0.2
@JsonProperty("estimated_record_memory_overhead_ratio")
val estimatedRecordMemoryOverheadRatio: Double? = 1.1
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,12 @@
},
"upload_part_size" : {
"type" : "integer"
},
"max_message_queue_memory_usage_ratio" : {
"type" : "number"
},
"estimated_record_memory_overhead_ratio" : {
"type" : "number"
}
},
"required" : [ "access_key_id", "secret_access_key", "s3_bucket_name", "s3_bucket_path", "s3_bucket_region", "format" ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,12 @@
},
"upload_part_size" : {
"type" : "integer"
},
"max_message_queue_memory_usage_ratio" : {
"type" : "number"
},
"estimated_record_memory_overhead_ratio" : {
"type" : "number"
}
},
"required" : [ "access_key_id", "secret_access_key", "s3_bucket_name", "s3_bucket_path", "s3_bucket_region", "format" ]
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,8 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer |
| 1.5.0 | 2024-11-08 | []() | Migrate to Bulk Load CDK; adds opt-in support for staging |
| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer |
| 1.3.0 | 2024-09-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | fix tests |
| 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields |
| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams |
Expand Down

0 comments on commit d79bee4

Please sign in to comment.