diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/BaseConnector.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/BaseConnector.kt index ef1d80ea635f..a937f4c911d2 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/BaseConnector.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/BaseConnector.kt @@ -10,6 +10,7 @@ import io.airbyte.commons.features.FeatureFlags import io.airbyte.commons.json.Jsons import io.airbyte.commons.resources.MoreResources import io.airbyte.protocol.models.v0.ConnectorSpecification +import java.nio.file.Path abstract class BaseConnector : Integration { open val featureFlags: FeatureFlags = EnvVariableFeatureFlags() diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index 8c2e3444336d..de373715c739 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -101,6 +101,8 @@ abstract class DestinationAcceptanceTest( protected var testSchemas: HashSet = HashSet() private lateinit var testEnv: TestDestinationEnv + protected var fileTransferMountSource: Path? = null + private set protected open val isCloudTest: Boolean = true protected val featureFlags: FeatureFlags = if (isCloudTest) { @@ -435,12 +437,15 @@ abstract class DestinationAcceptanceTest( mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater::class.java) testSchemas = HashSet() setup(testEnv, testSchemas) + fileTransferMountSource = + if (supportsFileTransfer) Files.createTempDirectory(testDir, "file_transfer") else null processFactory = DockerProcessFactory( workspaceRoot, workspaceRoot.toString(), localRoot.toString(), + fileTransferMountSource, "host", getConnectorEnv() ) @@ -2684,6 +2689,8 @@ abstract class DestinationAcceptanceTest( return supportsInDestinationNormalization() || normalizationFromDefinition() } + protected open val supportsFileTransfer: Boolean = false + companion object { private val RANDOM = Random() private const val NORMALIZATION_VERSION = "dev" diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt index 4761398a496c..a71aca115d2e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt @@ -117,6 +117,7 @@ abstract class AbstractSourceConnectorTest { workspaceRoot, workspaceRoot.toString(), localRoot.toString(), + fileTransferMountSource = null, "host", envMap ) diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt index 5106ad19f598..1bc37ac4afd6 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt @@ -4,6 +4,7 @@ package io.airbyte.commons.features import io.github.oshai.kotlinlogging.KotlinLogging +import java.nio.file.Path import java.util.function.Function private val log = KotlinLogging.logger {} @@ -46,6 +47,10 @@ class EnvVariableFeatureFlags : FeatureFlags { return getEnvOrDefault(DEPLOYMENT_MODE, "") { arg: String -> arg } } + override fun airbyteStagingDirectory(): Path { + return getEnvOrDefault(AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME, DEFAULT_AIRBYTE_STAGING_DIRECTORY) { arg: String -> Path.of(arg) } + } + // TODO: refactor in order to use the same method than the ones in EnvConfigs.java fun getEnvOrDefault(key: String?, defaultValue: T, parser: Function): T { val value = System.getenv(key) @@ -73,5 +78,7 @@ class EnvVariableFeatureFlags : FeatureFlags { const val STRICT_COMPARISON_NORMALIZATION_TAG: String = "STRICT_COMPARISON_NORMALIZATION_TAG" const val DEPLOYMENT_MODE: String = "DEPLOYMENT_MODE" + val DEFAULT_AIRBYTE_STAGING_DIRECTORY: Path = Path.of("/staging/files") + const val AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME: String = "AIRBYTE_STAGING_DIRECTORY" } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt index a8626b46ec64..91fd2f028a7a 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt @@ -3,6 +3,8 @@ */ package io.airbyte.commons.features +import java.nio.file.Path + /** * Interface that describe which features are activated in airbyte. Currently, the only * implementation relies on env. Ideally it should be on some DB. @@ -51,4 +53,6 @@ interface FeatureFlags { * @return empty string for the default deployment mode, "CLOUD" for cloud deployment mode. */ fun deploymentMode(): String? + + fun airbyteStagingDirectory(): Path } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt index 056c6730332c..f339de179b17 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt @@ -3,6 +3,8 @@ */ package io.airbyte.commons.features +import java.nio.file.Path + open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags { override fun autoDetectSchema(): Boolean { return wrapped.autoDetectSchema() @@ -36,6 +38,10 @@ open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags return wrapped.deploymentMode() } + override fun airbyteStagingDirectory(): Path { + return wrapped.airbyteStagingDirectory() + } + companion object { /** Overrides the [FeatureFlags.deploymentMode] method in the feature flags. */ @JvmStatic diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt index e58319655d9d..9ca95e9720a6 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt @@ -7,6 +7,8 @@ import com.google.common.annotations.VisibleForTesting import com.google.common.base.Joiner import com.google.common.base.Strings import com.google.common.collect.Lists +import io.airbyte.cdk.integrations.BaseConnector +import io.airbyte.commons.features.EnvVariableFeatureFlags import io.airbyte.commons.io.IOs import io.airbyte.commons.io.LineGobbler import io.airbyte.commons.map.MoreMaps @@ -30,6 +32,7 @@ class DockerProcessFactory( private val workspaceRoot: Path, private val workspaceMountSource: String?, private val localMountSource: String?, + private val fileTransferMountSource: Path?, private val networkName: String?, private val envMap: Map ) : ProcessFactory { @@ -125,6 +128,11 @@ class DockerProcessFactory( cmd.add(String.format("%s:%s", localMountSource, LOCAL_MOUNT_DESTINATION)) } + if (fileTransferMountSource != null) { + cmd.add("-v") + cmd.add("$fileTransferMountSource:${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY}") + } + val allEnvMap = MoreMaps.merge(jobMetadata, envMap, additionalEnvironmentVariables) for ((key, value) in allEnvMap) { cmd.add("-e") diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt index 3bd38d2e0857..d767b47695b8 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt @@ -75,6 +75,8 @@ protected constructor( override val imageName: String get() = "airbyte/destination-s3:dev" + override val supportsFileTransfer = true + override fun getDefaultSchema(config: JsonNode): String? { if (config.has("s3_bucket_path")) { return config["s3_bucket_path"].asText() diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt index 1c7c08350d01..dcfdda420636 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt @@ -1525,6 +1525,7 @@ abstract class BaseTypingDedupingTest { workspaceRoot, workspaceRoot.toString(), localRoot.toString(), + fileTransferMountSource = null, "host", emptyMap() ) diff --git a/airbyte-integrations/connectors/destination-s3/build.gradle b/airbyte-integrations/connectors/destination-s3/build.gradle index d47914ebb801..b5faddcddae5 100644 --- a/airbyte-integrations/connectors/destination-s3/build.gradle +++ b/airbyte-integrations/connectors/destination-s3/build.gradle @@ -6,7 +6,7 @@ plugins { airbyteJavaConnector { cdkVersionRequired = '0.46.1' features = ['db-destinations', 's3-destinations'] - useLocalCdk = false + useLocalCdk = true } airbyteJavaConnector.addCdkDependencies() diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml index 3dfdd600bf72..727d7c75c210 100644 --- a/airbyte-integrations/connectors/destination-s3/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 - dockerImageTag: 1.2.1 + dockerImageTag: 1.3.0 dockerRepository: airbyte/destination-s3 githubIssueLabel: destination-s3 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt index 89173b904b09..9968bd366d9b 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt +++ b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination.s3 import com.google.common.annotations.VisibleForTesting -import io.airbyte.cdk.integrations.base.IntegrationRunner import io.airbyte.cdk.integrations.destination.s3.BaseS3Destination import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfigFactory import io.airbyte.cdk.integrations.destination.s3.StorageProvider @@ -21,12 +20,4 @@ open class S3Destination : BaseS3Destination { override fun storageProvider(): StorageProvider { return StorageProvider.AWS_S3 } - - companion object { - @Throws(Exception::class) - @JvmStatic - fun main(args: Array) { - IntegrationRunner(S3Destination()).run(args) - } - } } diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 25dfc19257da..02a69c64cb21 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -536,6 +536,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.3.0 | 2024-10-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | enable file transfer | | 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields | | 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams | | 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies |