Skip to content

Commit

Permalink
[WIP] Prerelease S3V2 Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Nov 27, 2024
1 parent f5d7cdb commit 4e222c7
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 18 deletions.
18 changes: 14 additions & 4 deletions airbyte-integrations/connectors/destination-s3-v2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,20 @@ airbyteBulkConnector {
application {
mainClass = 'io.airbyte.integrations.destination.s3_v2.S3V2Destination'

applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']

// Uncomment and replace to run locally
//applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0', '--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED', '--add-opens', 'java.base/sun.security.action=ALL-UNNAMED', '--add-opens', 'java.base/java.lang=ALL-UNNAMED']
applicationDefaultJvmArgs = [
'-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0',
// Uncomment to run locally:
// '--add-opens', 'java.base/java.lang=ALL-UNNAMED'
// Uncomment to enable remote profiling:
// '-XX:NativeMemoryTracking=detail',
// '-Djava.rmi.server.hostname=localhost',
// '-Dcom.sun.management.jmxremote=true',
// '-Dcom.sun.management.jmxremote.port=6000',
// '-Dcom.sun.management.jmxremote.rmi.port=6000',
// '-Dcom.sun.management.jmxremote.local.only=false',
// '-Dcom.sun.management.jmxremote.authenticate=false',
// '-Dcom.sun.management.jmxremote.ssl=false'
]
}

// Uncomment to run locally
Expand Down
35 changes: 24 additions & 11 deletions airbyte-integrations/connectors/destination-s3-v2/metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,27 +1,40 @@
data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.2.7
dockerRepository: airbyte/destination-s3-v2
githubIssueLabel: destination-s3-v2
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.5.0
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
license: ELv2
name: S3 V2 Destination
name: S3
registryOverrides:
cloud:
enabled: false
enabled: true
oss:
enabled: false
releaseStage: alpha
enabled: true
releaseStage: generally_available
releases:
breakingChanges:
1.0.0:
message: >
**This release includes breaking changes, including major revisions to the schema of stored data. Do not upgrade without reviewing the migration guide.**
upgradeDeadline: "2024-10-08"
resourceRequirements:
jobSpecific:
- jobType: sync
resourceRequirements:
memory_limit: 2Gi
memory_request: 2Gi
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3
tags:
- language:java
ab_internal:
sl: 100
ql: 100
supportLevel: community
sl: 300
ql: 300
supportLevel: certified
supportsRefreshes: true
supportsFileTransfer: true
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ data class S3V2Configuration<T : OutputStream>(
override val objectStorageFormatConfiguration: ObjectStorageFormatConfiguration,
override val objectStorageCompressionConfiguration: ObjectStorageCompressionConfiguration<T>,

// Internal configuration
// Temporary to expose internal config for tuning
override val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration,
override val recordBatchSizeBytes: Long,
override val maxMessageQueueMemoryUsageRatio: Double = 0.2,
override val estimatedRecordMemoryOverheadRatio: Double = 1.1,
) :
DestinationConfiguration(),
AWSAccessKeyConfigurationProvider,
Expand All @@ -63,14 +65,17 @@ class S3V2ConfigurationFactory(
objectStoragePathConfiguration = pojo.toObjectStoragePathConfiguration(),
objectStorageFormatConfiguration = pojo.toObjectStorageFormatConfiguration(),
objectStorageCompressionConfiguration = pojo.toCompressionConfiguration(),
// Temporary to expose internal config for tuning
objectStorageUploadConfiguration =
ObjectStorageUploadConfiguration(
pojo.uploadPartSize
?: ObjectStorageUploadConfiguration.DEFAULT_STREAMING_UPLOAD_PART_SIZE,
pojo.maxConcurrentUploads
?: ObjectStorageUploadConfiguration.DEFAULT_MAX_NUM_CONCURRENT_UPLOADS
),
recordBatchSizeBytes = recordBatchSizeBytes
recordBatchSizeBytes = recordBatchSizeBytes,
maxMessageQueueMemoryUsageRatio = pojo.maxMessageQueueMemoryUsageRatio ?: 0.2,
estimatedRecordMemoryOverheadRatio = pojo.estimatedRecordMemoryOverheadRatio ?: 1.1
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,16 @@ class S3V2Specification :
@get:JsonSchemaInject(json = """{"examples":["__staging/data_sync/test"],"order":11}""")
override val s3StagingPrefix: String? = null

// Temporary to expose internal config for tuning
@JsonProperty("max_concurrent_uploads")
val maxConcurrentUploads: Int? =
ObjectStorageUploadConfiguration.DEFAULT_MAX_NUM_CONCURRENT_UPLOADS
@JsonProperty("upload_part_size")
val uploadPartSize: Long? = ObjectStorageUploadConfiguration.DEFAULT_STREAMING_UPLOAD_PART_SIZE
@JsonProperty("max_message_queue_memory_usage_ratio")
val maxMessageQueueMemoryUsageRatio: Double? = 0.2
@JsonProperty("estimated_record_memory_overhead_ratio")
val estimatedRecordMemoryOverheadRatio: Double? = 1.1
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@
},
"upload_part_size" : {
"type" : "integer"
},
"max_message_queue_memory_usage_ratio" : {
"type" : "number"
},
"estimated_record_memory_overhead_ratio" : {
"type" : "number"
}
},
"required" : [ "s3_bucket_name", "s3_bucket_path", "s3_bucket_region", "format" ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@
},
"upload_part_size" : {
"type" : "integer"
},
"max_message_queue_memory_usage_ratio" : {
"type" : "number"
},
"estimated_record_memory_overhead_ratio" : {
"type" : "number"
}
},
"required" : [ "s3_bucket_name", "s3_bucket_path", "s3_bucket_region", "format" ]
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,8 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer |
| 1.5.0 | 2024-11-08 | []() | Migrate to Bulk Load CDK; adds opt-in support for staging |
| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer |
| 1.3.0 | 2024-09-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | fix tests |
| 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields |
| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams |
Expand Down

0 comments on commit 4e222c7

Please sign in to comment.