From e4fec505367ca9b350aef2813b98548c7df1608d Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Wed, 11 Sep 2024 05:41:38 -0700 Subject: [PATCH] bulk-cdk: improve AirbyteConnectorRunner and CliRunner (#45374) --- .../io/airbyte/cdk/AirbyteConnectorRunner.kt | 30 +++--- .../ConnectorCommandLinePropertySource.kt | 2 - .../io/airbyte/cdk/output/OutputConsumer.kt | 11 --- .../io/airbyte/cdk/command/CliRunnable.kt | 27 ++++++ .../io/airbyte/cdk/command/CliRunner.kt | 92 ++++++++++++------- .../cdk/command/CliRunnerOutputStream.kt | 50 ++++++++++ .../cdk/output/BufferingOutputConsumer.kt | 7 +- .../airbyte/cdk/command/SyncsTestFixture.kt | 10 +- .../MysqlSourceDatatypeIntegrationTest.kt | 5 +- 9 files changed, 164 insertions(+), 70 deletions(-) create mode 100644 airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunnable.kt create mode 100644 airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunnerOutputStream.kt diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunner.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunner.kt index 1ebccfac61e3..4dd986691199 100644 --- a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunner.kt +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunner.kt @@ -5,6 +5,7 @@ import io.airbyte.cdk.command.ConnectorCommandLinePropertySource import io.airbyte.cdk.command.MetadataYamlPropertySource import io.micronaut.configuration.picocli.MicronautFactory import io.micronaut.context.ApplicationContext +import io.micronaut.context.RuntimeBeanDefinition import io.micronaut.context.env.CommandLinePropertySource import io.micronaut.context.env.Environment import io.micronaut.core.cli.CommandLine as MicronautCommandLine @@ -17,8 +18,11 @@ import picocli.CommandLine.Model.UsageMessageSpec /** Source connector entry point. */ class AirbyteSourceRunner( + /** CLI args. */ args: Array, -) : AirbyteConnectorRunner("source", args) { + /** Micronaut bean definition overrides, used only for tests. */ + vararg testBeanDefinitions: RuntimeBeanDefinition<*>, +) : AirbyteConnectorRunner("source", args, testBeanDefinitions) { companion object { @JvmStatic fun run(vararg args: String) { @@ -29,8 +33,11 @@ class AirbyteSourceRunner( /** Destination connector entry point. */ class AirbyteDestinationRunner( + /** CLI args. */ args: Array, -) : AirbyteConnectorRunner("destination", args) { + /** Micronaut bean definition overrides, used only for tests. */ + vararg testBeanDefinitions: RuntimeBeanDefinition<*>, +) : AirbyteConnectorRunner("destination", args, testBeanDefinitions) { companion object { @JvmStatic fun run(vararg args: String) { @@ -46,6 +53,7 @@ class AirbyteDestinationRunner( sealed class AirbyteConnectorRunner( val connectorType: String, val args: Array, + val testBeanDefinitions: Array>, ) { val envs: Array = arrayOf(Environment.CLI, connectorType) @@ -65,11 +73,12 @@ sealed class AirbyteConnectorRunner( commandLinePropertySource, MetadataYamlPropertySource(), ) + .beanDefinitions(*testBeanDefinitions) .start() val isTest: Boolean = ctx.environment.activeNames.contains(Environment.TEST) val picocliFactory: CommandLine.IFactory = MicronautFactory(ctx) val picocliCommandLine: CommandLine = - picocliCommandLineFactory.build(picocliFactory, isTest) + picocliCommandLineFactory.build(picocliFactory) val exitCode: Int = picocliCommandLine.execute(*args) if (!isTest) { // Required by the platform, otherwise syncs may hang. @@ -82,10 +91,7 @@ sealed class AirbyteConnectorRunner( class PicocliCommandLineFactory( val runner: AirbyteConnectorRunner, ) { - inline fun build( - factory: CommandLine.IFactory, - isTest: Boolean, - ): CommandLine { + inline fun build(factory: CommandLine.IFactory): CommandLine { val commandSpec: CommandLine.Model.CommandSpec = CommandLine.Model.CommandSpec.wrapWithoutInspection(R::class.java, factory) .name("airbyte-${runner.connectorType}-connector") @@ -95,10 +101,6 @@ class PicocliCommandLineFactory( .addOption(config) .addOption(catalog) .addOption(state) - - if (isTest) { - commandSpec.addOption(output) - } return CommandLine(commandSpec, factory) } @@ -168,10 +170,4 @@ class PicocliCommandLineFactory( "path to the json-encoded state file", "Required by the following commands: read", ) - val output: OptionSpec = - fileOption( - "output", - "path to the output file", - "When present, the connector writes to this file instead of stdout", - ) } diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/command/ConnectorCommandLinePropertySource.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/command/ConnectorCommandLinePropertySource.kt index d4fd9c5a4527..0dd3f8bb3a98 100644 --- a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/command/ConnectorCommandLinePropertySource.kt +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/command/ConnectorCommandLinePropertySource.kt @@ -23,7 +23,6 @@ class ConnectorCommandLinePropertySource( const val CONNECTOR_CONFIG_PREFIX: String = "airbyte.connector.config" const val CONNECTOR_CATALOG_PREFIX: String = "airbyte.connector.catalog" const val CONNECTOR_STATE_PREFIX: String = "airbyte.connector.state" -const val CONNECTOR_OUTPUT_FILE = "airbyte.connector.output.file" private fun resolveValues( commandLine: CommandLine, @@ -39,7 +38,6 @@ private fun resolveValues( } val values: MutableMap = mutableMapOf() values[Operation.PROPERTY] = ops.first() - commandLine.optionValue("output")?.let { values[CONNECTOR_OUTPUT_FILE] = it } for ((cliOptionKey, prefix) in mapOf( "config" to CONNECTOR_CONFIG_PREFIX, diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/OutputConsumer.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/OutputConsumer.kt index 29849b1408f4..71c0e2025a4b 100644 --- a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/OutputConsumer.kt +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/OutputConsumer.kt @@ -25,9 +25,7 @@ import io.micronaut.context.annotation.Value import io.micronaut.context.env.Environment import jakarta.inject.Singleton import java.io.ByteArrayOutputStream -import java.io.FileOutputStream import java.io.PrintStream -import java.nio.file.Path import java.time.Clock import java.time.Instant import java.util.concurrent.ConcurrentHashMap @@ -104,9 +102,6 @@ interface OutputConsumer : Consumer, AutoCloseable { /** Configuration properties prefix for [StdoutOutputConsumer]. */ const val CONNECTOR_OUTPUT_PREFIX = "airbyte.connector.output" -// Used for integration tests. -const val CONNECTOR_OUTPUT_FILE = "$CONNECTOR_OUTPUT_PREFIX.file" - /** Default implementation of [OutputConsumer]. */ @Singleton @Secondary @@ -293,10 +288,4 @@ private class RecordTemplate( private class PrintStreamFactory { @Singleton @Requires(notEnv = [Environment.TEST]) fun stdout(): PrintStream = System.out - - @Singleton - @Requires(env = [Environment.TEST]) - @Requires(property = CONNECTOR_OUTPUT_FILE) - fun file(@Value("\${$CONNECTOR_OUTPUT_FILE}") filePath: Path): PrintStream = - PrintStream(FileOutputStream(filePath.toFile()), false, Charsets.UTF_8) } diff --git a/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunnable.kt b/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunnable.kt new file mode 100644 index 000000000000..a78d7624182a --- /dev/null +++ b/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunnable.kt @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.command + +import io.airbyte.cdk.output.BufferingOutputConsumer +import io.airbyte.protocol.models.v0.AirbyteMessage + +/** Convenience object for return values in [CliRunner]. */ +data class CliRunnable( + val runnable: Runnable, + val results: BufferingOutputConsumer, +) { + + /** Decorates the [BufferingOutputConsumer] with a callback, which should return quickly. */ + fun withCallback(nonBlockingFn: (AirbyteMessage) -> Unit): CliRunnable { + results.callback = nonBlockingFn + return this + } + + /** Runs the [Runnable]. */ + fun run(): BufferingOutputConsumer { + runnable.run() + return results + } +} diff --git a/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunner.kt b/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunner.kt index dd5eba61417d..fa7666a8f240 100644 --- a/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunner.kt +++ b/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunner.kt @@ -5,19 +5,22 @@ import io.airbyte.cdk.AirbyteConnectorRunnable import io.airbyte.cdk.AirbyteConnectorRunner import io.airbyte.cdk.AirbyteDestinationRunner import io.airbyte.cdk.AirbyteSourceRunner -import io.airbyte.cdk.ClockFactory import io.airbyte.cdk.output.BufferingOutputConsumer import io.airbyte.cdk.util.Jsons import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteStateMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import io.micronaut.context.RuntimeBeanDefinition +import java.io.ByteArrayInputStream +import java.io.ByteArrayOutputStream +import java.io.InputStream import java.nio.file.Files import java.nio.file.Path import kotlin.io.path.deleteIfExists data object CliRunner { /** - * Runs a source connector with the given arguments and returns the results. + * Builds a [CliRunnable] which runs a source connector with the given arguments. * * This is useful for writing connector integration tests: * - the [config], [catalog] and [state] get written to temporary files; @@ -26,61 +29,88 @@ data object CliRunner { * - that file name gets passed with the test-only `--output` CLI argument; * - [AirbyteSourceRunner] takes the CLI arguments and runs them in a new Micronaut context; * - after it's done, the output file contents are read and parsed into [AirbyteMessage]s. - * - those are stored in a [BufferingOutputConsumer] which is returned. + * - those are stored in the [BufferingOutputConsumer] which is returned in the [CliRunnable]. */ - fun runSource( + fun source( op: String, config: ConfigurationJsonObjectBase? = null, catalog: ConfiguredAirbyteCatalog? = null, state: List? = null, - ): BufferingOutputConsumer = - runConnector(op, config, catalog, state) { args: Array -> - AirbyteSourceRunner(args) - } + ): CliRunnable { + val out = CliRunnerOutputStream() + val runnable: Runnable = + makeRunnable(op, config, catalog, state) { args: Array -> + AirbyteSourceRunner(args, out.beanDefinition) + } + return CliRunnable(runnable, out.results) + } - /** Same as [runSource] but for destinations. */ - fun runDestination( + /** Same as [source] but for destinations. */ + fun destination( op: String, config: ConfigurationJsonObjectBase? = null, catalog: ConfiguredAirbyteCatalog? = null, state: List? = null, - ): BufferingOutputConsumer = - runConnector(op, config, catalog, state) { args: Array -> - AirbyteDestinationRunner(args) - } + inputStream: InputStream, + ): CliRunnable { + val inputBeanDefinition: RuntimeBeanDefinition = + RuntimeBeanDefinition.builder(InputStream::class.java) { inputStream } + .singleton(true) + .build() + val out = CliRunnerOutputStream() + val runnable: Runnable = + makeRunnable(op, config, catalog, state) { args: Array -> + AirbyteDestinationRunner(args, inputBeanDefinition, out.beanDefinition) + } + return CliRunnable(runnable, out.results) + } + + /** Same as the other [destination] but with [AirbyteMessage] input. */ + fun destination( + op: String, + config: ConfigurationJsonObjectBase? = null, + catalog: ConfiguredAirbyteCatalog? = null, + state: List? = null, + vararg input: AirbyteMessage, + ): CliRunnable { + val inputJsonBytes: ByteArray = + ByteArrayOutputStream().use { baos -> + for (msg in input) { + Jsons.writeValue(baos, msg) + baos.write('\n'.code) + } + baos.toByteArray() + } + val inputStream: InputStream = ByteArrayInputStream(inputJsonBytes) + return destination(op, config, catalog, state, inputStream) + } - private fun runConnector( + private fun makeRunnable( op: String, config: ConfigurationJsonObjectBase?, catalog: ConfiguredAirbyteCatalog?, state: List?, connectorRunnerConstructor: (Array) -> AirbyteConnectorRunner, - ): BufferingOutputConsumer { - val result = BufferingOutputConsumer(ClockFactory().fixed()) + ): Runnable { val configFile: Path? = inputFile(config) val catalogFile: Path? = inputFile(catalog) val stateFile: Path? = inputFile(state) - val outputFile: Path = Files.createTempFile(null, null) val args: List = listOfNotNull( "--$op", configFile?.let { "--config=$it" }, catalogFile?.let { "--catalog=$it" }, stateFile?.let { "--state=$it" }, - "--output=$outputFile", ) - try { - connectorRunnerConstructor(args.toTypedArray()).run() - Files.readAllLines(outputFile) - .filter { it.isNotBlank() } - .map { Jsons.readValue(it, AirbyteMessage::class.java) } - .forEach { result.accept(it) } - return result - } finally { - configFile?.deleteIfExists() - catalogFile?.deleteIfExists() - stateFile?.deleteIfExists() - outputFile.deleteIfExists() + val runner: AirbyteConnectorRunner = connectorRunnerConstructor(args.toTypedArray()) + return Runnable { + try { + runner.run() + } finally { + configFile?.deleteIfExists() + catalogFile?.deleteIfExists() + stateFile?.deleteIfExists() + } } } diff --git a/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunnerOutputStream.kt b/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunnerOutputStream.kt new file mode 100644 index 000000000000..4472699622dc --- /dev/null +++ b/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunnerOutputStream.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.command + +import io.airbyte.cdk.ClockFactory +import io.airbyte.cdk.output.BufferingOutputConsumer +import io.airbyte.cdk.util.Jsons +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.micronaut.context.RuntimeBeanDefinition +import java.io.ByteArrayOutputStream +import java.io.OutputStream +import java.io.PrintStream + +/** Used by [CliRunner] to populate a [BufferingOutputConsumer] instance. */ +class CliRunnerOutputStream : OutputStream() { + + val results = BufferingOutputConsumer(ClockFactory().fixed()) + private val lineStream = ByteArrayOutputStream() + private val printStream = PrintStream(this, true, Charsets.UTF_8) + + val beanDefinition: RuntimeBeanDefinition = + RuntimeBeanDefinition.builder(PrintStream::class.java) { printStream } + .singleton(true) + .build() + + override fun write(b: Int) { + if (b == '\n'.code) { + readLine() + } else { + lineStream.write(b) + } + } + + override fun close() { + readLine() + lineStream.close() + results.close() + super.close() + } + + private fun readLine() { + val line: String = lineStream.toString(Charsets.UTF_8).trim() + lineStream.reset() + if (line.isNotBlank()) { + results.accept(Jsons.readValue(line, AirbyteMessage::class.java)) + } + } +} diff --git a/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/output/BufferingOutputConsumer.kt b/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/output/BufferingOutputConsumer.kt index 4ec1779c1c0b..26a93fe18047 100644 --- a/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/output/BufferingOutputConsumer.kt +++ b/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/output/BufferingOutputConsumer.kt @@ -20,7 +20,6 @@ import java.time.Instant /** [OutputConsumer] implementation for unit tests. Collects everything into thread-safe buffers. */ @Singleton @Requires(notEnv = [Environment.CLI]) -@Requires(missingProperty = CONNECTOR_OUTPUT_FILE) @Replaces(OutputConsumer::class) class BufferingOutputConsumer( clock: Clock, @@ -36,6 +35,11 @@ class BufferingOutputConsumer( private val traces = mutableListOf() private val messages = mutableListOf() + var callback: (AirbyteMessage) -> Unit = {} + set(value) { + synchronized(this) { field = value } + } + override fun accept(input: AirbyteMessage) { // Deep copy the input, which may be reused and mutated later on. val m: AirbyteMessage = @@ -52,6 +56,7 @@ class BufferingOutputConsumer( AirbyteMessage.Type.TRACE -> traces.add(m.trace) else -> TODO("${m.type} not supported") } + callback(m) } } diff --git a/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/command/SyncsTestFixture.kt b/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/command/SyncsTestFixture.kt index 3bf5015426b4..671c9777c8ff 100644 --- a/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/command/SyncsTestFixture.kt +++ b/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/command/SyncsTestFixture.kt @@ -25,7 +25,7 @@ import org.junit.jupiter.api.Assertions data object SyncsTestFixture { fun testSpec(expectedSpec: ConnectorSpecification) { val expected: String = Jsons.writeValueAsString(expectedSpec) - val output: BufferingOutputConsumer = CliRunner.runSource("spec") + val output: BufferingOutputConsumer = CliRunner.source("spec").run() val actual: String = Jsons.writeValueAsString(output.specs().last()) val jsonMatcher: JsonMatcher = @@ -52,7 +52,7 @@ data object SyncsTestFixture { configPojo: ConfigurationJsonObjectBase, expectedFailure: String? = null, ) { - val checkOutput: BufferingOutputConsumer = CliRunner.runSource("check", configPojo) + val checkOutput: BufferingOutputConsumer = CliRunner.source("check", configPojo).run() Assertions.assertEquals(1, checkOutput.statuses().size, checkOutput.statuses().toString()) if (expectedFailure == null) { Assertions.assertEquals( @@ -73,7 +73,7 @@ data object SyncsTestFixture { configPojo: ConfigurationJsonObjectBase, expectedCatalog: AirbyteCatalog, ) { - val discoverOutput: BufferingOutputConsumer = CliRunner.runSource("discover", configPojo) + val discoverOutput: BufferingOutputConsumer = CliRunner.source("discover", configPojo).run() Assertions.assertEquals(listOf(expectedCatalog), discoverOutput.catalogs()) } @@ -102,7 +102,7 @@ data object SyncsTestFixture { var state: List = initialState for (step in afterRead) { val readOutput: BufferingOutputConsumer = - CliRunner.runSource("read", configPojo, configuredCatalog, state) + CliRunner.source("read", configPojo, configuredCatalog, state).run() step.validate(readOutput) connectionSupplier.get().use(step::update) state = readOutput.states() @@ -141,7 +141,7 @@ data object SyncsTestFixture { var state: List = listOf() for (step in afterRead) { val readOutput: BufferingOutputConsumer = - CliRunner.runSource("read", configPojo, configuredCatalog, state) + CliRunner.source("read", configPojo, configuredCatalog, state).run() step.validate(readOutput) connectionSupplier.get().use(step::update) state = readOutput.states() diff --git a/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceDatatypeIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceDatatypeIntegrationTest.kt index afde5a8dee7d..1cc28d67a4e8 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceDatatypeIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceDatatypeIntegrationTest.kt @@ -30,7 +30,6 @@ import org.testcontainers.containers.MySQLContainer private val log = KotlinLogging.logger {} -/** Reference: https://docs.mysql.com/en/database/mysql/mysql-database/23/sqlrf/Data-Types.html */ class MysqlSourceDatatypeIntegrationTest { @TestFactory @Timeout(300) @@ -58,7 +57,7 @@ class MysqlSourceDatatypeIntegrationTest { object LazyValues { val actualStreams: Map by lazy { - val output: BufferingOutputConsumer = CliRunner.runSource("discover", config()) + val output: BufferingOutputConsumer = CliRunner.source("discover", config()).run() output.catalogs().firstOrNull()?.streams?.filterNotNull()?.associateBy { it.name } ?: mapOf() } @@ -77,7 +76,7 @@ class MysqlSourceDatatypeIntegrationTest { } val allReadMessages: List by lazy { - CliRunner.runSource("read", config(), configuredCatalog).messages() + CliRunner.source("read", config(), configuredCatalog).run().messages() } val actualReads: Map by lazy {