From c062722d5ef4f88d0bf92d8afa95b59fab4f8576 Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Wed, 9 Oct 2024 12:38:50 -0700 Subject: [PATCH] simple split of DestinationAcceptanceTest --- .../BaseDestinationAcceptanceTest.kt | 275 ++++++++++++++++++ .../destination/DestinationAcceptanceTest.kt | 228 +-------------- 2 files changed, 284 insertions(+), 219 deletions(-) create mode 100644 airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt new file mode 100644 index 000000000000..8c647d37b7d3 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt @@ -0,0 +1,275 @@ +package io.airbyte.cdk.integrations.standardtest.destination + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.commons.features.EnvVariableFeatureFlags +import io.airbyte.commons.features.FeatureFlags +import io.airbyte.commons.features.FeatureFlagsWrapper +import io.airbyte.commons.lang.Exceptions +import io.airbyte.configoss.WorkerDestinationConfig +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.AirbyteStateStats +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import io.airbyte.workers.exception.TestHarnessException +import io.airbyte.workers.helper.ConnectorConfigUpdater +import io.airbyte.workers.internal.AirbyteDestination +import io.airbyte.workers.internal.DefaultAirbyteDestination +import io.airbyte.workers.normalization.DefaultNormalizationRunner +import io.airbyte.workers.normalization.NormalizationRunner +import io.airbyte.workers.process.AirbyteIntegrationLauncher +import io.airbyte.workers.process.DockerProcessFactory +import io.airbyte.workers.process.ProcessFactory +import io.github.oshai.kotlinlogging.KotlinLogging +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.mockito.Mockito +import java.nio.file.Files +import java.nio.file.Path +import java.util.* +import java.util.function.Consumer + +private val LOGGER = KotlinLogging.logger {} + +abstract class BaseDestinationAcceptanceTest( + // If false, ignore counts and only verify the final state message. + protected val verifyIndividualStateAndCounts: Boolean = false, +) { + protected lateinit var processFactory: ProcessFactory + private set + protected lateinit var jobRoot: Path + private set + protected var localRoot: Path? = null + private set + protected lateinit var testEnv: DestinationAcceptanceTest.TestDestinationEnv + private set + protected var fileTransferMountSource: Path? = null + private set + protected open val supportsFileTransfer: Boolean = false + protected var testSchemas: HashSet = HashSet() + protected lateinit var mConnectorConfigUpdater: ConnectorConfigUpdater + private set + protected open val isCloudTest: Boolean = true + protected val featureFlags: FeatureFlags = + if (isCloudTest) { + FeatureFlagsWrapper.overridingDeploymentMode(EnvVariableFeatureFlags(), "CLOUD") + } else { + FeatureFlagsWrapper.overridingDeploymentMode(EnvVariableFeatureFlags(), "OSS") + } + protected abstract val imageName: String + /** + * Name of the docker image that the tests will run against. + * + * @return docker image name + */ + get + + /** + * Configuration specific to the integration. Will be passed to integration where appropriate in + * each test. Should be valid. + * + * @return integration-specific configuration + */ + @Throws(Exception::class) protected abstract fun getConfig(): JsonNode + + protected open fun supportsInDestinationNormalization(): Boolean { + return false + } + + protected fun inDestinationNormalizationFlags(shouldNormalize: Boolean): Map { + if (shouldNormalize && supportsInDestinationNormalization()) { + return java.util.Map.of("NORMALIZATION_TECHNIQUE", "LEGACY") + } + return emptyMap() + } + + + protected fun getDestinationConfig(config: JsonNode, catalog: ConfiguredAirbyteCatalog,): WorkerDestinationConfig { + return WorkerDestinationConfig() + .withConnectionId(UUID.randomUUID()) + .withCatalog( + DestinationAcceptanceTest.convertProtocolObject( + catalog, + io.airbyte.protocol.models.ConfiguredAirbyteCatalog::class.java + ) + ) + .withDestinationConnectionConfiguration(config) + } + + protected fun runSyncAndVerifyStateOutput( + config: JsonNode, + messages: List, + catalog: ConfiguredAirbyteCatalog, + runNormalization: Boolean, + ) { + runSyncAndVerifyStateOutput( + config, + messages, + catalog, + runNormalization, + imageName, + verifyIndividualStateAndCounts + ) + } + + @Throws(Exception::class) + protected fun runSyncAndVerifyStateOutput( + config: JsonNode, + messages: List, + catalog: ConfiguredAirbyteCatalog, + runNormalization: Boolean, + imageName: String, + verifyIndividualStateAndCounts: Boolean + ) { + val destinationOutput = runSync(config, messages, catalog, runNormalization, imageName) + + var expected = messages.filter { it.type == AirbyteMessage.Type.STATE } + var actual = destinationOutput.filter { it.type == AirbyteMessage.Type.STATE } + + if (verifyIndividualStateAndCounts) { + /* Collect the counts and add them to each expected state message */ + val stateToCount = mutableMapOf() + messages.fold(0) { acc, message -> + if (message.type == AirbyteMessage.Type.STATE) { + stateToCount[message.state.global.sharedState] = acc + 0 + } else { + acc + 1 + } + } + + expected.forEach { message -> + val clone = message.state + clone.destinationStats = + AirbyteStateStats().withRecordCount(stateToCount[clone.global.sharedState]!!.toDouble()) + message.state = clone + } + } else { + /* Null the stats and collect only the final messages */ + val finalActual = + actual.lastOrNull() + ?: throw IllegalArgumentException( + "All message sets used for testing should include a state record" + ) + val clone = finalActual.state + clone.destinationStats = null + finalActual.state = clone + + expected = listOf(expected.last()) + actual = listOf(finalActual) + } + + Assertions.assertEquals(expected, actual) + } + + @Throws(Exception::class) + open protected fun runSync( + config: JsonNode, + messages: List, + catalog: ConfiguredAirbyteCatalog, + runNormalization: Boolean, + imageName: String, + ): List { + val destinationConfig = getDestinationConfig(config, catalog) + return runSync(messages, runNormalization, imageName, destinationConfig) + } + + @Throws(Exception::class) + protected fun runSync( + messages: List, + runNormalization: Boolean, + imageName: String, + destinationConfig: WorkerDestinationConfig, + ): List { + val destination = getDestination(imageName) + + destination.start( + destinationConfig, + jobRoot, + inDestinationNormalizationFlags(runNormalization) + ) + messages.forEach( + Consumer { message: AirbyteMessage -> + Exceptions.toRuntime { + destination.accept( + DestinationAcceptanceTest.convertProtocolObject( + message, + io.airbyte.protocol.models.AirbyteMessage::class.java + ) + ) + } + } + ) + destination.notifyEndOfInput() + + val destinationOutput: MutableList = ArrayList() + while (!destination.isFinished()) { + destination.attemptRead().ifPresent { + destinationOutput.add(DestinationAcceptanceTest.convertProtocolObject(it, AirbyteMessage::class.java)) + } + } + + destination.close() + + return destinationOutput + } + + protected fun getDestination(imageName: String): AirbyteDestination { + return DefaultAirbyteDestination( + integrationLauncher = + AirbyteIntegrationLauncher( + DestinationAcceptanceTest.JOB_ID, + DestinationAcceptanceTest.JOB_ATTEMPT, + imageName, + processFactory, + null, + null, + false, + featureFlags + ) + ) + } + + @BeforeEach + @Throws(Exception::class) + open fun setUpInternal() { + val testDir = Path.of("/tmp/airbyte_tests/") + Files.createDirectories(testDir) + val workspaceRoot = Files.createTempDirectory(testDir, "test") + jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job")) + localRoot = Files.createTempDirectory(testDir, "output") + LOGGER.info { "${"jobRoot: {}"} $jobRoot" } + LOGGER.info { "${"localRoot: {}"} $localRoot" } + testEnv = DestinationAcceptanceTest.TestDestinationEnv(localRoot) + mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater::class.java) + testSchemas = HashSet() + setup(testEnv, testSchemas) + fileTransferMountSource = + if (supportsFileTransfer) Files.createTempDirectory(testDir, "file_transfer") else null + + processFactory = + DockerProcessFactory( + workspaceRoot, + workspaceRoot.toString(), + localRoot.toString(), + fileTransferMountSource, + "host", + getConnectorEnv() + ) + } + + /** + * Function that performs any setup of external resources required for the test. e.g. + * instantiate a postgres database. This function will be called before EACH test. + * + * @param testEnv + * - information about the test environment. + * @param TEST_SCHEMAS + * @throws Exception + * - can throw any exception, test framework will handle. + */ + @Throws(Exception::class) + protected abstract fun setup(testEnv: DestinationAcceptanceTest.TestDestinationEnv, TEST_SCHEMAS: HashSet) + + open fun getConnectorEnv(): Map { + return emptyMap() + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index c975a0558818..93b84b7e6b46 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -91,56 +91,21 @@ private val LOGGER = KotlinLogging.logger {} abstract class DestinationAcceptanceTest( // If false, ignore counts and only verify the final state message. - private val verifyIndividualStateAndCounts: Boolean = false, + verifyIndividualStateAndCounts: Boolean = false, private val useV2Fields: Boolean = false, private val supportsChangeCapture: Boolean = false, private val expectNumericTimestamps: Boolean = false, private val expectSchemalessObjectsCoercedToStrings: Boolean = false, private val expectUnionsPromotedToDisjointRecords: Boolean = false +): BaseDestinationAcceptanceTest( + verifyIndividualStateAndCounts = verifyIndividualStateAndCounts ) { - protected var testSchemas: HashSet = HashSet() - - private lateinit var testEnv: TestDestinationEnv - protected var fileTransferMountSource: Path? = null - private set - protected open val isCloudTest: Boolean = true - protected val featureFlags: FeatureFlags = - if (isCloudTest) { - FeatureFlagsWrapper.overridingDeploymentMode(EnvVariableFeatureFlags(), "CLOUD") - } else { - FeatureFlagsWrapper.overridingDeploymentMode(EnvVariableFeatureFlags(), "OSS") - } - - private lateinit var jobRoot: Path - private lateinit var processFactory: ProcessFactory - private lateinit var mConnectorConfigUpdater: ConnectorConfigUpdater - - protected var localRoot: Path? = null open protected var _testDataComparator: TestDataComparator = getTestDataComparator() protected open fun getTestDataComparator(): TestDataComparator { return BasicTestDataComparator { @Suppress("deprecation") this.resolveIdentifier(it) } } - protected abstract val imageName: String - /** - * Name of the docker image that the tests will run against. - * - * @return docker image name - */ - get - - protected open fun supportsInDestinationNormalization(): Boolean { - return false - } - - protected fun inDestinationNormalizationFlags(shouldNormalize: Boolean): Map { - if (shouldNormalize && supportsInDestinationNormalization()) { - return java.util.Map.of("NORMALIZATION_TECHNIQUE", "LEGACY") - } - return emptyMap() - } - private val imageNameWithoutTag: String get() = if (imageName.contains(":")) @@ -167,14 +132,6 @@ abstract class DestinationAcceptanceTest( return normalizationRepository.asText() + ":" + NORMALIZATION_VERSION } - /** - * Configuration specific to the integration. Will be passed to integration where appropriate in - * each test. Should be valid. - * - * @return integration-specific configuration - */ - @Throws(Exception::class) protected abstract fun getConfig(): JsonNode - /** * Configuration specific to the integration. Will be passed to integration where appropriate in * tests that test behavior when configuration is invalid. e.g incorrect password. Should be @@ -390,19 +347,6 @@ abstract class DestinationAcceptanceTest( throw IllegalStateException("Not implemented") } - /** - * Function that performs any setup of external resources required for the test. e.g. - * instantiate a postgres database. This function will be called before EACH test. - * - * @param testEnv - * - information about the test environment. - * @param TEST_SCHEMAS - * @throws Exception - * - can throw any exception, test framework will handle. - */ - @Throws(Exception::class) - protected abstract fun setup(testEnv: TestDestinationEnv, TEST_SCHEMAS: HashSet) - /** * Function that performs any clean up of external resources required for the test. e.g. delete * a postgres database. This function will be called after EACH test. It MUST remove all data in @@ -423,38 +367,6 @@ abstract class DestinationAcceptanceTest( return listOf(identifier) } - @BeforeEach - @Throws(Exception::class) - fun setUpInternal() { - val testDir = Path.of("/tmp/airbyte_tests/") - Files.createDirectories(testDir) - val workspaceRoot = Files.createTempDirectory(testDir, "test") - jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job")) - localRoot = Files.createTempDirectory(testDir, "output") - LOGGER.info { "${"jobRoot: {}"} $jobRoot" } - LOGGER.info { "${"localRoot: {}"} $localRoot" } - testEnv = TestDestinationEnv(localRoot) - mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater::class.java) - testSchemas = HashSet() - setup(testEnv, testSchemas) - fileTransferMountSource = - if (supportsFileTransfer) Files.createTempDirectory(testDir, "file_transfer") else null - - processFactory = - DockerProcessFactory( - workspaceRoot, - workspaceRoot.toString(), - localRoot.toString(), - fileTransferMountSource, - "host", - getConnectorEnv() - ) - } - - open fun getConnectorEnv(): Map { - return emptyMap() - } - @AfterEach @Throws(Exception::class) fun tearDownInternal() { @@ -2036,136 +1948,16 @@ abstract class DestinationAcceptanceTest( ) } - private fun getDestination(imageName: String): AirbyteDestination { - return DefaultAirbyteDestination( - integrationLauncher = - AirbyteIntegrationLauncher( - JOB_ID, - JOB_ATTEMPT, - imageName, - processFactory, - null, - null, - false, - featureFlags - ) - ) - } - - protected fun runSyncAndVerifyStateOutput( - config: JsonNode, - messages: List, - catalog: ConfiguredAirbyteCatalog, - runNormalization: Boolean, - ) { - runSyncAndVerifyStateOutput( - config, - messages, - catalog, - runNormalization, - imageName, - verifyIndividualStateAndCounts - ) - } - - @Throws(Exception::class) - protected fun runSyncAndVerifyStateOutput( - config: JsonNode, - messages: List, - catalog: ConfiguredAirbyteCatalog, - runNormalization: Boolean, - imageName: String, - verifyIndividualStateAndCounts: Boolean - ) { - val destinationOutput = runSync(config, messages, catalog, runNormalization, imageName) - - var expected = messages.filter { it.type == Type.STATE } - var actual = destinationOutput.filter { it.type == Type.STATE } - - if (verifyIndividualStateAndCounts) { - /* Collect the counts and add them to each expected state message */ - val stateToCount = mutableMapOf() - messages.fold(0) { acc, message -> - if (message.type == Type.STATE) { - stateToCount[message.state.global.sharedState] = acc - 0 - } else { - acc + 1 - } - } - - expected.forEach { message -> - val clone = message.state - clone.destinationStats = - AirbyteStateStats().withRecordCount(stateToCount[clone.global.sharedState]!!.toDouble()) - message.state = clone - } - } else { - /* Null the stats and collect only the final messages */ - val finalActual = - actual.lastOrNull() - ?: throw IllegalArgumentException( - "All message sets used for testing should include a state record" - ) - val clone = finalActual.state - clone.destinationStats = null - finalActual.state = clone - - expected = listOf(expected.last()) - actual = listOf(finalActual) - } - - Assertions.assertEquals(expected, actual) - } - @Throws(Exception::class) - private fun runSync( + override fun runSync( config: JsonNode, messages: List, catalog: ConfiguredAirbyteCatalog, runNormalization: Boolean, imageName: String, ): List { - val destinationConfig = - WorkerDestinationConfig() - .withConnectionId(UUID.randomUUID()) - .withCatalog( - convertProtocolObject( - catalog, - io.airbyte.protocol.models.ConfiguredAirbyteCatalog::class.java - ) - ) - .withDestinationConnectionConfiguration(config) - - val destination = getDestination(imageName) - - destination.start( - destinationConfig, - jobRoot, - inDestinationNormalizationFlags(runNormalization) - ) - messages.forEach( - Consumer { message: AirbyteMessage -> - Exceptions.toRuntime { - destination.accept( - convertProtocolObject( - message, - io.airbyte.protocol.models.AirbyteMessage::class.java - ) - ) - } - } - ) - destination.notifyEndOfInput() - - val destinationOutput: MutableList = ArrayList() - while (!destination.isFinished()) { - destination.attemptRead().ifPresent { - destinationOutput.add(convertProtocolObject(it, AirbyteMessage::class.java)) - } - } - - destination.close() + val destinationConfig = getDestinationConfig(config, catalog) + val destinationOutput = super.runSync(messages, runNormalization, imageName, destinationConfig) if (!runNormalization || (supportsInDestinationNormalization())) { return destinationOutput @@ -2689,14 +2481,12 @@ abstract class DestinationAcceptanceTest( return supportsInDestinationNormalization() || normalizationFromDefinition() } - protected open val supportsFileTransfer: Boolean = false - companion object { private val RANDOM = Random() private const val NORMALIZATION_VERSION = "dev" - private const val JOB_ID = "0" - private const val JOB_ATTEMPT = 0 + public const val JOB_ID = "0" + public const val JOB_ATTEMPT = 0 private const val DUMMY_CATALOG_NAME = "DummyCatalog" @@ -2833,7 +2623,7 @@ abstract class DestinationAcceptanceTest( return airbyteMessages } - private fun convertProtocolObject(v1: V1, klass: Class): V0 { + fun convertProtocolObject(v1: V1, klass: Class): V0 { return Jsons.`object`(Jsons.jsonNode(v1), klass)!! } }