From 9cd5ea2ae1b3db527333e4a966c49794f4b83efc Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Fri, 11 Oct 2024 17:08:25 -0700 Subject: [PATCH] Minimal Base S3V2 w/ Bulk Load CDK (#46742) --- .../connectors/destination-s3-v2/README.md | 2 ++ .../connectors/destination-s3-v2/build.gradle | 30 ++++++++++++++++ .../connectors/destination-s3-v2/icon.svg | 1 + .../destination-s3-v2/metadata.yaml | 28 +++++++++++++++ .../src/main/kotlin/S3V2Checker.kt | 15 ++++++++ .../src/main/kotlin/S3V2Configuration.kt | 28 +++++++++++++++ .../src/main/kotlin/S3V2Destination.kt | 16 +++++++++ .../src/main/kotlin/S3V2Specification.kt | 26 ++++++++++++++ .../src/main/kotlin/S3V2Writer.kt | 27 +++++++++++++++ .../destination/s3_v2/S3V2CheckTest.kt | 23 +++++++++++++ .../destination/s3_v2/S3V2SpecTest.kt | 9 +++++ .../destination/s3_v2/S3V2WriteTest.kt | 34 +++++++++++++++++++ .../resources/expected-spec-cloud.json | 14 ++++++++ .../resources/expected-spec-oss.json | 14 ++++++++ .../test-configs/default.json | 1 + 15 files changed, 268 insertions(+) create mode 100644 airbyte-integrations/connectors/destination-s3-v2/README.md create mode 100644 airbyte-integrations/connectors/destination-s3-v2/build.gradle create mode 100644 airbyte-integrations/connectors/destination-s3-v2/icon.svg create mode 100644 airbyte-integrations/connectors/destination-s3-v2/metadata.yaml create mode 100644 airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt create mode 100644 airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt create mode 100644 airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Destination.kt create mode 100644 airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt create mode 100644 airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt create mode 100644 airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt create mode 100644 airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2SpecTest.kt create mode 100644 airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt create mode 100644 airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json create mode 100644 airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json create mode 100644 airbyte-integrations/connectors/destination-s3-v2/test-configs/default.json diff --git a/airbyte-integrations/connectors/destination-s3-v2/README.md b/airbyte-integrations/connectors/destination-s3-v2/README.md new file mode 100644 index 000000000000..beeddb894cb1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/README.md @@ -0,0 +1,2 @@ +# S3 V2 (Bulk CDK) Destination + diff --git a/airbyte-integrations/connectors/destination-s3-v2/build.gradle b/airbyte-integrations/connectors/destination-s3-v2/build.gradle new file mode 100644 index 000000000000..347cb34a5460 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/build.gradle @@ -0,0 +1,30 @@ +plugins { + id 'application' + id 'airbyte-bulk-connector' +} + +airbyteBulkConnector { + core = 'load' + toolkits = [] + cdk = 'local' +} + +application { + mainClass = 'io.airbyte.integrations.destination.s3_v2.S3V2Destination' + applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] + + // Uncomment and replace to run locally + //applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0', '--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED', '--add-opens', 'java.base/sun.security.action=ALL-UNNAMED', '--add-opens', 'java.base/java.lang=ALL-UNNAMED'] +} + +// Uncomment to run locally +//run { +// standardInput = System.in +//} + +dependencies { + // temporary dependencies so that we can continue running the legacy test suite. + // eventually we should remove those tests + rely solely on the bulk CDK tests. + // integrationTestLegacyImplementation testFixtures(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-db-destinations")) + // integrationTestLegacyImplementation testFixtures("io.airbyte.cdk:airbyte-cdk-db-destinations:0.47.0") +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/icon.svg b/airbyte-integrations/connectors/destination-s3-v2/icon.svg new file mode 100644 index 000000000000..85e50f25587f --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/icon.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml new file mode 100644 index 000000000000..c1a9e6fd7874 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -0,0 +1,28 @@ +data: + connectorSubtype: file + connectorType: destination + definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 + dockerImageTag: 0.1.0 + dockerRepository: airbyte/destination-s3-v2 + githubIssueLabel: destination-s3-v2 + icon: s3.svg + license: ELv2 + name: S3 V2 Destination + registryOverrides: + cloud: + enabled: false + oss: + enabled: false + releaseStage: alpha + documentationUrl: https://docs.airbyte.com/integrations/destinations/s3 + tags: + - language:java + ab_internal: + sl: 100 + ql: 100 + supportLevel: community + supportsRefreshes: true + connectorTestSuitesOptions: + - suite: unitTests + - suite: integrationTests +metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt new file mode 100644 index 000000000000..7a5d42b6b9ad --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_v2 + +import io.airbyte.cdk.load.check.DestinationChecker +import jakarta.inject.Singleton + +@Singleton +class S3V2Checker : DestinationChecker { + override fun check(config: S3V2Configuration) { + // TODO: validate that the configuration works + } +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt new file mode 100644 index 000000000000..85e53c5ca254 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_v2 + +import io.airbyte.cdk.load.command.DestinationConfiguration +import io.airbyte.cdk.load.command.DestinationConfigurationFactory +import io.micronaut.context.annotation.Factory +import jakarta.inject.Singleton + +data object S3V2Configuration : DestinationConfiguration() + +@Singleton +class S3V2ConfigurationFactory : + DestinationConfigurationFactory { + override fun makeWithoutExceptionHandling(pojo: S3V2Specification): S3V2Configuration { + return S3V2Configuration + } +} + +@Factory +class S3V2ConfigurationProvider(private val config: DestinationConfiguration) { + @Singleton + fun get(): S3V2Configuration { + return config as S3V2Configuration + } +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Destination.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Destination.kt new file mode 100644 index 000000000000..aad5c3520860 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Destination.kt @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_v2 + +import io.airbyte.cdk.AirbyteDestinationRunner + +class S3V2Destination { + companion object { + @JvmStatic + fun main(args: Array) { + AirbyteDestinationRunner.run(*args) + } + } +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt new file mode 100644 index 000000000000..c111ca986a7b --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_v2 + +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import io.airbyte.cdk.command.ConfigurationSpecification +import io.airbyte.cdk.load.spec.DestinationSpecificationExtension +import io.airbyte.protocol.models.v0.DestinationSyncMode +import jakarta.inject.Singleton + +@Singleton +@JsonSchemaTitle("S3 V2 Destination Spec") +class S3V2Specification : ConfigurationSpecification() + +@Singleton +class S3V2SpecificationExtension : DestinationSpecificationExtension { + override val supportedSyncModes = + listOf( + DestinationSyncMode.OVERWRITE, + DestinationSyncMode.APPEND, + DestinationSyncMode.APPEND_DEDUP, + ) + override val supportsIncremental = true +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt new file mode 100644 index 000000000000..fb34ea5500e2 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_v2 + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.SimpleBatch +import io.airbyte.cdk.load.write.DestinationWriter +import io.airbyte.cdk.load.write.StreamLoader +import jakarta.inject.Singleton + +@Singleton +class S3V2Writer : DestinationWriter { + override fun createStreamLoader(stream: DestinationStream): StreamLoader { + return S3V2StreamLoader(stream) + } + + inner class S3V2StreamLoader(override val stream: DestinationStream) : StreamLoader { + override suspend fun processRecords( + records: Iterator, + totalSizeBytes: Long + ): Batch = SimpleBatch(state = Batch.State.COMPLETE) + } +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt new file mode 100644 index 000000000000..2db7fd3e592b --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_v2 + +import io.airbyte.cdk.load.check.CheckIntegrationTest +import io.airbyte.cdk.load.check.CheckTestConfig +import io.airbyte.cdk.load.test.util.destination_process.TestDeploymentMode +import org.junit.jupiter.api.Test + +class S3V2CheckTest : + CheckIntegrationTest( + S3V2Specification::class.java, + successConfigFilenames = + listOf(CheckTestConfig("test-configs/default.json", TestDeploymentMode.CLOUD)), + failConfigFilenamesAndFailureReasons = emptyMap() + ) { + @Test + override fun testSuccessConfigs() { + super.testSuccessConfigs() + } +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2SpecTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2SpecTest.kt new file mode 100644 index 000000000000..e77e8378a7f7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2SpecTest.kt @@ -0,0 +1,9 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_v2 + +import io.airbyte.cdk.load.spec.SpecTest + +class S3V2SpecTest : SpecTest() diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt new file mode 100644 index 000000000000..0068da59f22f --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_v2 + +import io.airbyte.cdk.load.test.util.DestinationDataDumper +import io.airbyte.cdk.load.test.util.NoopDestinationCleaner +import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper +import io.airbyte.cdk.load.test.util.OutputRecord +import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest +import org.junit.jupiter.api.Test + +class S3V2WriteTest : + BasicFunctionalityIntegrationTest( + S3V2Specification(), + S3V2DataDumper, + NoopDestinationCleaner, + NoopExpectedRecordMapper, + verifyDataWriting = false + ) { + @Test + override fun testBasicWrite() { + super.testBasicWrite() + } +} + +object S3V2DataDumper : DestinationDataDumper { + override fun dumpRecords(streamName: String, streamNamespace: String?): List { + // E2e destination doesn't actually write records, so we shouldn't even + // have tests that try to read back the records + throw NotImplementedError() + } +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json new file mode 100644 index 000000000000..0562b559e6eb --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json @@ -0,0 +1,14 @@ +{ + "documentationUrl" : "https://docs.airbyte.com/integrations/destinations/s3", + "connectionSpecification" : { + "$schema" : "http://json-schema.org/draft-07/schema#", + "title" : "S3 V2 Destination Spec", + "type" : "object", + "additionalProperties" : true, + "properties" : { } + }, + "supportsIncremental" : true, + "supportsNormalization" : false, + "supportsDBT" : false, + "supported_destination_sync_modes" : [ "overwrite", "append", "append_dedup" ] +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json new file mode 100644 index 000000000000..0562b559e6eb --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json @@ -0,0 +1,14 @@ +{ + "documentationUrl" : "https://docs.airbyte.com/integrations/destinations/s3", + "connectionSpecification" : { + "$schema" : "http://json-schema.org/draft-07/schema#", + "title" : "S3 V2 Destination Spec", + "type" : "object", + "additionalProperties" : true, + "properties" : { } + }, + "supportsIncremental" : true, + "supportsNormalization" : false, + "supportsDBT" : false, + "supported_destination_sync_modes" : [ "overwrite", "append", "append_dedup" ] +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-s3-v2/test-configs/default.json b/airbyte-integrations/connectors/destination-s3-v2/test-configs/default.json new file mode 100644 index 000000000000..0967ef424bce --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/test-configs/default.json @@ -0,0 +1 @@ +{}