Skip to content

Commit

Permalink
cdk-java: add file transfer mount to DestinationAcceptanceTest
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Oct 7, 2024
1 parent bc093d8 commit df089e5
Show file tree
Hide file tree
Showing 13 changed files with 40 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.airbyte.commons.features.FeatureFlags
import io.airbyte.commons.json.Jsons
import io.airbyte.commons.resources.MoreResources
import io.airbyte.protocol.models.v0.ConnectorSpecification
import java.nio.file.Path

abstract class BaseConnector : Integration {
open val featureFlags: FeatureFlags = EnvVariableFeatureFlags()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ abstract class DestinationAcceptanceTest(
protected var testSchemas: HashSet<String> = HashSet()

private lateinit var testEnv: TestDestinationEnv
protected var fileTransferMountSource: Path? = null
private set
protected open val isCloudTest: Boolean = true
protected val featureFlags: FeatureFlags =
if (isCloudTest) {
Expand Down Expand Up @@ -435,12 +437,15 @@ abstract class DestinationAcceptanceTest(
mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater::class.java)
testSchemas = HashSet()
setup(testEnv, testSchemas)
fileTransferMountSource =
if (supportsFileTransfer) Files.createTempDirectory(testDir, "file_transfer") else null

processFactory =
DockerProcessFactory(
workspaceRoot,
workspaceRoot.toString(),
localRoot.toString(),
fileTransferMountSource,
"host",
getConnectorEnv()
)
Expand Down Expand Up @@ -2684,6 +2689,8 @@ abstract class DestinationAcceptanceTest(
return supportsInDestinationNormalization() || normalizationFromDefinition()
}

protected open val supportsFileTransfer: Boolean = false

companion object {
private val RANDOM = Random()
private const val NORMALIZATION_VERSION = "dev"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ abstract class AbstractSourceConnectorTest {
workspaceRoot,
workspaceRoot.toString(),
localRoot.toString(),
fileTransferMountSource = null,
"host",
envMap
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.airbyte.commons.features

import io.github.oshai.kotlinlogging.KotlinLogging
import java.nio.file.Path
import java.util.function.Function

private val log = KotlinLogging.logger {}
Expand Down Expand Up @@ -46,6 +47,10 @@ class EnvVariableFeatureFlags : FeatureFlags {
return getEnvOrDefault(DEPLOYMENT_MODE, "") { arg: String -> arg }
}

override fun airbyteStagingDirectory(): Path {
return getEnvOrDefault(AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME, DEFAULT_AIRBYTE_STAGING_DIRECTORY) { arg: String -> Path.of(arg) }
}

// TODO: refactor in order to use the same method than the ones in EnvConfigs.java
fun <T> getEnvOrDefault(key: String?, defaultValue: T, parser: Function<String, T>): T {
val value = System.getenv(key)
Expand Down Expand Up @@ -73,5 +78,7 @@ class EnvVariableFeatureFlags : FeatureFlags {
const val STRICT_COMPARISON_NORMALIZATION_TAG: String =
"STRICT_COMPARISON_NORMALIZATION_TAG"
const val DEPLOYMENT_MODE: String = "DEPLOYMENT_MODE"
val DEFAULT_AIRBYTE_STAGING_DIRECTORY: Path = Path.of("/staging/files")
const val AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME: String = "AIRBYTE_STAGING_DIRECTORY"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/
package io.airbyte.commons.features

import java.nio.file.Path

/**
* Interface that describe which features are activated in airbyte. Currently, the only
* implementation relies on env. Ideally it should be on some DB.
Expand Down Expand Up @@ -51,4 +53,6 @@ interface FeatureFlags {
* @return empty string for the default deployment mode, "CLOUD" for cloud deployment mode.
*/
fun deploymentMode(): String?

fun airbyteStagingDirectory(): Path
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/
package io.airbyte.commons.features

import java.nio.file.Path

open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags {
override fun autoDetectSchema(): Boolean {
return wrapped.autoDetectSchema()
Expand Down Expand Up @@ -36,6 +38,10 @@ open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags
return wrapped.deploymentMode()
}

override fun airbyteStagingDirectory(): Path {
return wrapped.airbyteStagingDirectory()
}

companion object {
/** Overrides the [FeatureFlags.deploymentMode] method in the feature flags. */
@JvmStatic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import com.google.common.annotations.VisibleForTesting
import com.google.common.base.Joiner
import com.google.common.base.Strings
import com.google.common.collect.Lists
import io.airbyte.cdk.integrations.BaseConnector
import io.airbyte.commons.features.EnvVariableFeatureFlags
import io.airbyte.commons.io.IOs
import io.airbyte.commons.io.LineGobbler
import io.airbyte.commons.map.MoreMaps
Expand All @@ -30,6 +32,7 @@ class DockerProcessFactory(
private val workspaceRoot: Path,
private val workspaceMountSource: String?,
private val localMountSource: String?,
private val fileTransferMountSource: Path?,
private val networkName: String?,
private val envMap: Map<String, String>
) : ProcessFactory {
Expand Down Expand Up @@ -125,6 +128,11 @@ class DockerProcessFactory(
cmd.add(String.format("%s:%s", localMountSource, LOCAL_MOUNT_DESTINATION))
}

if (fileTransferMountSource != null) {
cmd.add("-v")
cmd.add("$fileTransferMountSource:${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY}")
}

val allEnvMap = MoreMaps.merge(jobMetadata, envMap, additionalEnvironmentVariables)
for ((key, value) in allEnvMap) {
cmd.add("-e")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ protected constructor(
override val imageName: String
get() = "airbyte/destination-s3:dev"

override val supportsFileTransfer = true

override fun getDefaultSchema(config: JsonNode): String? {
if (config.has("s3_bucket_path")) {
return config["s3_bucket_path"].asText()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1525,6 +1525,7 @@ abstract class BaseTypingDedupingTest {
workspaceRoot,
workspaceRoot.toString(),
localRoot.toString(),
fileTransferMountSource = null,
"host",
emptyMap()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.46.1'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
useLocalCdk = true
}

airbyteJavaConnector.addCdkDependencies()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.2.1
dockerImageTag: 1.3.0
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.airbyte.integrations.destination.s3

import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.integrations.base.IntegrationRunner
import io.airbyte.cdk.integrations.destination.s3.BaseS3Destination
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfigFactory
import io.airbyte.cdk.integrations.destination.s3.StorageProvider
Expand All @@ -21,12 +20,4 @@ open class S3Destination : BaseS3Destination {
override fun storageProvider(): StorageProvider {
return StorageProvider.AWS_S3
}

companion object {
@Throws(Exception::class)
@JvmStatic
fun main(args: Array<String>) {
IntegrationRunner(S3Destination()).run(args)
}
}
}
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.3.0 | 2024-10-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | enable file transfer |
| 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields |
| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams |
| 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies |
Expand Down

0 comments on commit df089e5

Please sign in to comment.