Skip to content

Commit

Permalink
Minimal Base S3V2 w/ Bulk Load CDK (#46742)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Oct 12, 2024
1 parent cd3afe0 commit 9cd5ea2
Show file tree
Hide file tree
Showing 15 changed files with 268 additions and 0 deletions.
2 changes: 2 additions & 0 deletions airbyte-integrations/connectors/destination-s3-v2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# S3 V2 (Bulk CDK) Destination

30 changes: 30 additions & 0 deletions airbyte-integrations/connectors/destination-s3-v2/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
plugins {
id 'application'
id 'airbyte-bulk-connector'
}

airbyteBulkConnector {
core = 'load'
toolkits = []
cdk = 'local'
}

application {
mainClass = 'io.airbyte.integrations.destination.s3_v2.S3V2Destination'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']

// Uncomment and replace to run locally
//applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0', '--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED', '--add-opens', 'java.base/sun.security.action=ALL-UNNAMED', '--add-opens', 'java.base/java.lang=ALL-UNNAMED']
}

// Uncomment to run locally
//run {
// standardInput = System.in
//}

dependencies {
// temporary dependencies so that we can continue running the legacy test suite.
// eventually we should remove those tests + rely solely on the bulk CDK tests.
// integrationTestLegacyImplementation testFixtures(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-db-destinations"))
// integrationTestLegacyImplementation testFixtures("io.airbyte.cdk:airbyte-cdk-db-destinations:0.47.0")
}
1 change: 1 addition & 0 deletions airbyte-integrations/connectors/destination-s3-v2/icon.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
28 changes: 28 additions & 0 deletions airbyte-integrations/connectors/destination-s3-v2/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.1.0
dockerRepository: airbyte/destination-s3-v2
githubIssueLabel: destination-s3-v2
icon: s3.svg
license: ELv2
name: S3 V2 Destination
registryOverrides:
cloud:
enabled: false
oss:
enabled: false
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3
tags:
- language:java
ab_internal:
sl: 100
ql: 100
supportLevel: community
supportsRefreshes: true
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_v2

import io.airbyte.cdk.load.check.DestinationChecker
import jakarta.inject.Singleton

@Singleton
class S3V2Checker : DestinationChecker<S3V2Configuration> {
override fun check(config: S3V2Configuration) {
// TODO: validate that the configuration works
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_v2

import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.command.DestinationConfigurationFactory
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton

data object S3V2Configuration : DestinationConfiguration()

@Singleton
class S3V2ConfigurationFactory :
DestinationConfigurationFactory<S3V2Specification, S3V2Configuration> {
override fun makeWithoutExceptionHandling(pojo: S3V2Specification): S3V2Configuration {
return S3V2Configuration
}
}

@Factory
class S3V2ConfigurationProvider(private val config: DestinationConfiguration) {
@Singleton
fun get(): S3V2Configuration {
return config as S3V2Configuration
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_v2

import io.airbyte.cdk.AirbyteDestinationRunner

class S3V2Destination {
companion object {
@JvmStatic
fun main(args: Array<String>) {
AirbyteDestinationRunner.run(*args)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_v2

import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.load.spec.DestinationSpecificationExtension
import io.airbyte.protocol.models.v0.DestinationSyncMode
import jakarta.inject.Singleton

@Singleton
@JsonSchemaTitle("S3 V2 Destination Spec")
class S3V2Specification : ConfigurationSpecification()

@Singleton
class S3V2SpecificationExtension : DestinationSpecificationExtension {
override val supportedSyncModes =
listOf(
DestinationSyncMode.OVERWRITE,
DestinationSyncMode.APPEND,
DestinationSyncMode.APPEND_DEDUP,
)
override val supportsIncremental = true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_v2

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import jakarta.inject.Singleton

@Singleton
class S3V2Writer : DestinationWriter {
override fun createStreamLoader(stream: DestinationStream): StreamLoader {
return S3V2StreamLoader(stream)
}

inner class S3V2StreamLoader(override val stream: DestinationStream) : StreamLoader {
override suspend fun processRecords(
records: Iterator<DestinationRecord>,
totalSizeBytes: Long
): Batch = SimpleBatch(state = Batch.State.COMPLETE)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_v2

import io.airbyte.cdk.load.check.CheckIntegrationTest
import io.airbyte.cdk.load.check.CheckTestConfig
import io.airbyte.cdk.load.test.util.destination_process.TestDeploymentMode
import org.junit.jupiter.api.Test

class S3V2CheckTest :
CheckIntegrationTest<S3V2Specification>(
S3V2Specification::class.java,
successConfigFilenames =
listOf(CheckTestConfig("test-configs/default.json", TestDeploymentMode.CLOUD)),
failConfigFilenamesAndFailureReasons = emptyMap()
) {
@Test
override fun testSuccessConfigs() {
super.testSuccessConfigs()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_v2

import io.airbyte.cdk.load.spec.SpecTest

class S3V2SpecTest : SpecTest()
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_v2

import io.airbyte.cdk.load.test.util.DestinationDataDumper
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import org.junit.jupiter.api.Test

class S3V2WriteTest :
BasicFunctionalityIntegrationTest(
S3V2Specification(),
S3V2DataDumper,
NoopDestinationCleaner,
NoopExpectedRecordMapper,
verifyDataWriting = false
) {
@Test
override fun testBasicWrite() {
super.testBasicWrite()
}
}

object S3V2DataDumper : DestinationDataDumper {
override fun dumpRecords(streamName: String, streamNamespace: String?): List<OutputRecord> {
// E2e destination doesn't actually write records, so we shouldn't even
// have tests that try to read back the records
throw NotImplementedError()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"documentationUrl" : "https://docs.airbyte.com/integrations/destinations/s3",
"connectionSpecification" : {
"$schema" : "http://json-schema.org/draft-07/schema#",
"title" : "S3 V2 Destination Spec",
"type" : "object",
"additionalProperties" : true,
"properties" : { }
},
"supportsIncremental" : true,
"supportsNormalization" : false,
"supportsDBT" : false,
"supported_destination_sync_modes" : [ "overwrite", "append", "append_dedup" ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"documentationUrl" : "https://docs.airbyte.com/integrations/destinations/s3",
"connectionSpecification" : {
"$schema" : "http://json-schema.org/draft-07/schema#",
"title" : "S3 V2 Destination Spec",
"type" : "object",
"additionalProperties" : true,
"properties" : { }
},
"supportsIncremental" : true,
"supportsNormalization" : false,
"supportsDBT" : false,
"supported_destination_sync_modes" : [ "overwrite", "append", "append_dedup" ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}

0 comments on commit 9cd5ea2

Please sign in to comment.