From 70732cac5c232e5800ec17e0325d055938b293aa Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Tue, 27 Aug 2024 11:13:20 -0700 Subject: [PATCH] bulk-cdk: add more exception classifier implementations, add extra checks (#44824) --- .../airbyte/cdk/AirbyteConnectorRunnable.kt | 6 +- .../io/airbyte/cdk/ConnectorErrorException.kt | 3 +- .../cdk/output/DefaultExceptionClassifier.kt | 45 +++++++ .../airbyte/cdk/output/ExceptionClassifier.kt | 117 +++++++++--------- .../io/airbyte/cdk/output/ExceptionHandler.kt | 48 +++++++ .../cdk/output/RegexExceptionClassifier.kt | 64 ++++++++++ .../output/DefaultExceptionClassifierTest.kt | 48 +++++++ .../cdk/output/ExceptionHandlerTest.kt | 44 +++++++ .../output/RegexExceptionClassifierTest.kt | 83 +++++++++++++ .../io/airbyte/cdk/check/CheckOperation.kt | 12 +- .../airbyte/cdk/discover/MetadataQuerier.kt | 3 + .../ResourceDrivenMetadataQuerierFactory.kt | 2 + .../io/airbyte/cdk/check/JdbcCheckQueries.kt | 85 +++++++++++++ .../cdk/discover/JdbcMetadataQuerier.kt | 8 ++ .../cdk/output/JdbcExceptionClassifier.kt | 77 ++++++++---- .../airbyte/cdk/check/JdbcCheckQueriesTest.kt | 45 +++++++ .../cdk/discover/JdbcMetadataQuerierTest.kt | 2 + .../cdk/h2source/H2SourceIntegrationTest.kt | 13 +- .../cdk/output/JdbcExceptionClassifierTest.kt | 65 ++++++++++ .../testFixtures/resources/application.yml | 14 +++ 20 files changed, 690 insertions(+), 94 deletions(-) create mode 100644 airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/DefaultExceptionClassifier.kt create mode 100644 airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionHandler.kt create mode 100644 airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/RegexExceptionClassifier.kt create mode 100644 airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/DefaultExceptionClassifierTest.kt create mode 100644 airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/ExceptionHandlerTest.kt create mode 100644 airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/RegexExceptionClassifierTest.kt create mode 100644 airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/check/JdbcCheckQueries.kt create mode 100644 airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/check/JdbcCheckQueriesTest.kt create mode 100644 airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/output/JdbcExceptionClassifierTest.kt diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunnable.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunnable.kt index 0f8c26e920a3..9e8e08eabf6e 100644 --- a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunnable.kt +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunnable.kt @@ -2,7 +2,7 @@ package io.airbyte.cdk import edu.umd.cs.findbugs.annotations.SuppressFBWarnings -import io.airbyte.cdk.output.ExceptionClassifier +import io.airbyte.cdk.output.ExceptionHandler import io.airbyte.cdk.output.OutputConsumer import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Value @@ -20,7 +20,7 @@ class AirbyteConnectorRunnable : Runnable { @Inject lateinit var outputConsumer: OutputConsumer - @Inject lateinit var exceptionClassifier: ExceptionClassifier + @Inject lateinit var exceptionHandler: ExceptionHandler override fun run() { var operation: Operation? = null @@ -40,7 +40,7 @@ class AirbyteConnectorRunnable : Runnable { "Failed ${operation::class} operation execution." } } - outputConsumer.accept(exceptionClassifier.handle(e)) + outputConsumer.accept(exceptionHandler.handle(e)) throw e } finally { log.info { "Flushing output consumer prior to shutdown." } diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/ConnectorErrorException.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/ConnectorErrorException.kt index 161717e2054e..9463f484e439 100644 --- a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/ConnectorErrorException.kt +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/ConnectorErrorException.kt @@ -25,8 +25,7 @@ class TransientErrorException( ) : ConnectorErrorException(displayMessage, exception) /** See [io.airbyte.cdk.output.SystemError]. */ -class SystemErrorException -private constructor( +class SystemErrorException( displayMessage: String?, exception: Throwable? = null, ) : ConnectorErrorException(displayMessage, exception) diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/DefaultExceptionClassifier.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/DefaultExceptionClassifier.kt new file mode 100644 index 000000000000..71da76a5859f --- /dev/null +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/DefaultExceptionClassifier.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.output + +import io.airbyte.cdk.ConfigErrorException +import io.airbyte.cdk.ConnectorErrorException +import io.airbyte.cdk.SystemErrorException +import io.airbyte.cdk.TransientErrorException +import io.micronaut.context.annotation.ConfigurationProperties +import io.micronaut.context.annotation.Value +import jakarta.inject.Singleton + +const val DEFAULT_CLASSIFIER_PREFIX = "${EXCEPTION_CLASSIFIER_PREFIX}.default" + +/** Default implementation of [ExceptionClassifier]. */ +@Singleton +@ConfigurationProperties(DEFAULT_CLASSIFIER_PREFIX) +class DefaultExceptionClassifier( + @Value("\${$DEFAULT_CLASSIFIER_PREFIX.order:1}") override val orderValue: Int +) : ExceptionClassifier { + + override fun classify(e: Throwable): ConnectorError? { + return when (val connectorErrorException: ConnectorErrorException? = unwind(e)) { + is ConfigErrorException -> ConfigError(connectorErrorException.message!!) + is TransientErrorException -> TransientError(connectorErrorException.message!!) + is SystemErrorException -> SystemError(connectorErrorException.message) + null -> null + } + } + + /** Recursively walks the causes of [e] and returns the last [ConnectorErrorException]. */ + fun unwind(e: Throwable): ConnectorErrorException? { + var connectorErrorException: ConnectorErrorException? = null + var unwound: Throwable? = e + while (unwound != null) { + if (unwound is ConnectorErrorException) { + connectorErrorException = unwound + } + unwound = unwound.cause + } + return connectorErrorException + } +} diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionClassifier.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionClassifier.kt index b1e9434db9f3..0b7e19b44aec 100644 --- a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionClassifier.kt +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionClassifier.kt @@ -3,51 +3,18 @@ */ package io.airbyte.cdk.output -import io.airbyte.cdk.ConfigErrorException -import io.airbyte.cdk.ConnectorErrorException -import io.airbyte.cdk.SystemErrorException -import io.airbyte.cdk.TransientErrorException -import io.airbyte.cdk.util.ApmTraceUtils import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage -import io.micronaut.context.annotation.DefaultImplementation -import jakarta.inject.Singleton -import org.apache.commons.lang3.exception.ExceptionUtils +import io.micronaut.core.order.Ordered -@Singleton -@DefaultImplementation(DefaultExceptionClassifier::class) -fun interface ExceptionClassifier { +interface ExceptionClassifier : Ordered { /** Classifies [e] into a [ConnectorError] if possible, null otherwise. */ fun classify(e: Throwable): ConnectorError? - /** [SystemError] display message for [e] in case it can't be classified. */ - fun fallbackDisplayMessage(e: Throwable): String? = e.message - - /** Maps [e] to a [AirbyteErrorTraceMessage] to be passed to the [OutputConsumer]. */ - fun handle(e: Throwable): AirbyteErrorTraceMessage { - ApmTraceUtils.addExceptionToTrace(e) - val connectorError: ConnectorError = - DefaultExceptionClassifier().classify(e) - ?: classify(e) ?: SystemError(fallbackDisplayMessage(e) ?: e.message) - val errorTraceMessage = - AirbyteErrorTraceMessage() - .withInternalMessage(e.toString()) - .withStackTrace(ExceptionUtils.getStackTrace(e)) - return when (connectorError) { - is ConfigError -> - errorTraceMessage - .withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR) - .withMessage(connectorError.displayMessage) - is TransientError -> - errorTraceMessage - .withFailureType(AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR) - .withMessage(connectorError.displayMessage) - is SystemError -> - errorTraceMessage - .withFailureType(AirbyteErrorTraceMessage.FailureType.SYSTEM_ERROR) - .withMessage(connectorError.displayMessage ?: e.message) - } - } + /** Convenience val for [getOrder]. */ + val orderValue: Int + + override fun getOrder(): Int = orderValue } /** Each [ConnectorError] subtype corresponds to a [AirbyteErrorTraceMessage.FailureType]. */ @@ -75,29 +42,63 @@ data class TransientError(val displayMessage: String) : ConnectorError */ data class SystemError(val displayMessage: String?) : ConnectorError -/** Default implementation of [ExceptionClassifier]. */ -@Singleton -class DefaultExceptionClassifier : ExceptionClassifier { +/** Common Micronaut property prefix for all exception classifiers. */ +const val EXCEPTION_CLASSIFIER_PREFIX = "airbyte.connector.exception-classifiers" + +/** Convenience interface for rules-based [ExceptionClassifier] implementations. */ +interface RuleBasedExceptionClassifier : + ExceptionClassifier { + + /** List of rules to match for. */ + val rules: List override fun classify(e: Throwable): ConnectorError? { - return when (val connectorErrorException: ConnectorErrorException? = unwind(e)) { - is ConfigErrorException -> ConfigError(connectorErrorException.message!!) - is TransientErrorException -> TransientError(connectorErrorException.message!!) - is SystemErrorException -> SystemError(connectorErrorException.message) - null -> null + for (rule in rules) { + if (!rule.matches(e)) { + continue + } + val message: String = rule.output ?: e.message ?: e.toString() + val firstLine: String = if (rule.group == null) message else "${rule.group}: $message" + val lines: List = listOf(firstLine) + rule.referenceLinks + val displayMessage: String = lines.joinToString(separator = "\n") + return when (rule.error) { + ErrorKind.CONFIG -> ConfigError(displayMessage) + ErrorKind.TRANSIENT -> TransientError(displayMessage) + ErrorKind.SYSTEM -> SystemError(displayMessage) + } } + return null } - /** Recursively walks the causes of [e] and returns the last [ConnectorErrorException]. */ - fun unwind(e: Throwable): ConnectorErrorException? { - var connectorErrorException: ConnectorErrorException? = null - var unwound: Throwable? = e - while (unwound != null) { - if (unwound is ConnectorErrorException) { - connectorErrorException = unwound - } - unwound = unwound.cause - } - return connectorErrorException + interface Rule : Ordered { + + /** Rule ordinal in the rule set. */ + val ordinal: Int + + /** If the rule matches, the kind of [ConnectorError] to produce. */ + val error: ErrorKind + + /** Optional display message prefix. */ + val group: String? + + /** Optional display message. */ + val output: String? + + /** Optional list of reference links to display. */ + val referenceLinks: List + + /** Rule predicate. */ + fun matches(e: Throwable): Boolean + + override fun getOrder(): Int = ordinal + + /** Validates rule definition correctness. */ + fun validate() + } + + enum class ErrorKind { + CONFIG, + TRANSIENT, + SYSTEM, } } diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionHandler.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionHandler.kt new file mode 100644 index 000000000000..58697a608cfd --- /dev/null +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionHandler.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.output + +import io.airbyte.cdk.util.ApmTraceUtils +import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage +import jakarta.inject.Singleton +import org.apache.commons.lang3.exception.ExceptionUtils + +/** [ExceptionHandler] applies all available [ExceptionClassifier] implementations in sequence. */ +@Singleton +class ExceptionHandler(val classifiers: List) { + + fun classify(e: Throwable): ConnectorError { + for (classifier in classifiers) { + val classified: ConnectorError? = classifier.classify(e) + if (classified != null) { + return classified + } + } + return SystemError(e.message) + } + + /** Maps [e] to a [AirbyteErrorTraceMessage] to be passed to the [OutputConsumer]. */ + fun handle(e: Throwable): AirbyteErrorTraceMessage { + ApmTraceUtils.addExceptionToTrace(e) + val errorTraceMessage = + AirbyteErrorTraceMessage() + .withInternalMessage(e.toString()) + .withStackTrace(ExceptionUtils.getStackTrace(e)) + return when (val classified: ConnectorError = classify(e)) { + is ConfigError -> + errorTraceMessage + .withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR) + .withMessage(classified.displayMessage) + is TransientError -> + errorTraceMessage + .withFailureType(AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR) + .withMessage(classified.displayMessage) + is SystemError -> + errorTraceMessage + .withFailureType(AirbyteErrorTraceMessage.FailureType.SYSTEM_ERROR) + .withMessage(classified.displayMessage ?: e.message) + } + } +} diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/RegexExceptionClassifier.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/RegexExceptionClassifier.kt new file mode 100644 index 000000000000..ab0ffcb91b91 --- /dev/null +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/RegexExceptionClassifier.kt @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.output + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings +import io.micronaut.context.annotation.EachProperty +import io.micronaut.context.annotation.Parameter +import io.micronaut.context.annotation.Requires +import io.micronaut.context.annotation.Value +import jakarta.inject.Singleton + +const val REGEX_CLASSIFIER_PREFIX = "${EXCEPTION_CLASSIFIER_PREFIX}.regex" + +/** [ExceptionClassifier] implementation based on regexes applied to the exception message. */ +@Singleton +@Requires(property = "${REGEX_CLASSIFIER_PREFIX}.rules") +class RegexExceptionClassifier( + @Value("\${${REGEX_CLASSIFIER_PREFIX}.order:10}") override val orderValue: Int, + override val rules: List, +) : RuleBasedExceptionClassifier { + + init { + for (rule in rules) { + rule.validate() + } + } +} + +/** Micronaut configuration object for [RuleBasedExceptionClassifier] rules. */ +@EachProperty("${REGEX_CLASSIFIER_PREFIX}.rules", list = true) +@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI") +class RegexExceptionClassifierRule( + @param:Parameter override val ordinal: Int, +) : RuleBasedExceptionClassifier.Rule { + + // Micronaut configuration objects work better with mutable properties. + override lateinit var error: RuleBasedExceptionClassifier.ErrorKind + lateinit var pattern: String + lateinit var inputExample: String + override var group: String? = null + override var output: String? = null + override var referenceLinks: List = emptyList() + + val regex: Regex by lazy { + pattern.toRegex(setOf(RegexOption.MULTILINE, RegexOption.IGNORE_CASE)) + } + + override fun matches(e: Throwable): Boolean = + e.message?.let { regex.containsMatchIn(it) } ?: false + + override fun validate() { + require(runCatching { error }.isSuccess) { "error kind must be set" } + require(runCatching { pattern }.isSuccess) { "regex pattern must be set" } + require(runCatching { inputExample }.isSuccess) { + "input exception message example must be set" + } + val compileResult: Result = runCatching { regex } + require(compileResult.isSuccess) { + "regex pattern error: ${compileResult.exceptionOrNull()?.message}" + } + } +} diff --git a/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/DefaultExceptionClassifierTest.kt b/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/DefaultExceptionClassifierTest.kt new file mode 100644 index 000000000000..4c7bfebd1d38 --- /dev/null +++ b/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/DefaultExceptionClassifierTest.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.output + +import io.airbyte.cdk.ConfigErrorException +import io.airbyte.cdk.SystemErrorException +import io.airbyte.cdk.TransientErrorException +import io.micronaut.test.extensions.junit5.annotation.MicronautTest +import jakarta.inject.Inject +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +@MicronautTest +class DefaultExceptionClassifierTest { + + @Inject lateinit var classifier: DefaultExceptionClassifier + + @Test + fun testConfigError() { + Assertions.assertEquals( + ConfigError("foo"), + classifier.classify(ConfigErrorException("foo")), + ) + } + + @Test + fun testTransientError() { + Assertions.assertEquals( + TransientError("bar"), + classifier.classify(TransientErrorException("bar")), + ) + } + + @Test + fun testSystemError() { + Assertions.assertEquals( + SystemError("baz"), + classifier.classify(SystemErrorException("baz")), + ) + } + + @Test + fun testUnclassified() { + Assertions.assertNull(classifier.classify(RuntimeException("quux"))) + } +} diff --git a/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/ExceptionHandlerTest.kt b/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/ExceptionHandlerTest.kt new file mode 100644 index 000000000000..649573f127b4 --- /dev/null +++ b/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/ExceptionHandlerTest.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.output + +import io.airbyte.cdk.TransientErrorException +import io.micronaut.context.annotation.Property +import io.micronaut.context.annotation.PropertySource +import io.micronaut.test.extensions.junit5.annotation.MicronautTest +import jakarta.inject.Inject +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +private const val RULE0 = "${REGEX_CLASSIFIER_PREFIX}.rules[0]" + +@MicronautTest +@PropertySource( + value = + [ + Property(name = "$RULE0.error", value = "config"), + Property(name = "$RULE0.pattern", value = "foo"), + Property(name = "$RULE0.input-example", value = "foobarbaz"), + ] +) +class ExceptionHandlerTest { + + @Inject lateinit var handler: ExceptionHandler + + @Test + fun testClassified() { + Assertions.assertEquals( + TransientError("foo"), + handler.classify(TransientErrorException("foo")) + ) + Assertions.assertEquals(ConfigError("foo"), handler.classify(RuntimeException("foo"))) + } + + @Test + fun testUnclassified() { + Assertions.assertEquals(SystemError("quux"), handler.classify(RuntimeException("quux"))) + Assertions.assertEquals(SystemError(null), handler.classify(RuntimeException())) + } +} diff --git a/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/RegexExceptionClassifierTest.kt b/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/RegexExceptionClassifierTest.kt new file mode 100644 index 000000000000..e511d46488ec --- /dev/null +++ b/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/RegexExceptionClassifierTest.kt @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.output + +import io.micronaut.context.annotation.Property +import io.micronaut.context.annotation.PropertySource +import io.micronaut.test.extensions.junit5.annotation.MicronautTest +import jakarta.inject.Inject +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +private const val RULE0 = "${REGEX_CLASSIFIER_PREFIX}.rules[0]" +private const val RULE1 = "${REGEX_CLASSIFIER_PREFIX}.rules[1]" +private const val RULE2 = "${REGEX_CLASSIFIER_PREFIX}.rules[2]" + +@MicronautTest +@PropertySource( + value = + [ + Property(name = "$RULE0.error", value = "config"), + Property(name = "$RULE0.pattern", value = "foo"), + Property(name = "$RULE0.input-example", value = "foobarbaz"), + Property(name = "$RULE0.group", value = "grouped"), + Property(name = "$RULE0.output", value = "has foo"), + Property( + name = "$RULE0.reference-links", + value = "https://www.youtube.com/watch?v=xvFZjo5PgG0", + ), + Property(name = "$RULE1.error", value = "transient"), + Property(name = "$RULE1.pattern", value = "bar"), + Property(name = "$RULE1.input-example", value = "foobarbaz"), + Property(name = "$RULE2.error", value = "system"), + Property(name = "$RULE2.pattern", value = "baz"), + Property(name = "$RULE2.input-example", value = "foobarbaz"), + ] +) +class RegexExceptionClassifierTest { + + @Inject lateinit var classifier: RegexExceptionClassifier + + @Test + fun testConfigError() { + Assertions.assertEquals( + ConfigError("grouped: has foo\nhttps://www.youtube.com/watch?v=xvFZjo5PgG0"), + classifier.classify(RuntimeException("foo")), + ) + } + + @Test + fun testTransientError() { + Assertions.assertEquals( + TransientError("?bar!"), + classifier.classify(RuntimeException("?bar!")), + ) + } + + @Test + fun testSystemError() { + Assertions.assertEquals( + SystemError("?baz!"), + classifier.classify(RuntimeException("?baz!")), + ) + } + + @Test + fun testUnclassified() { + Assertions.assertNull(classifier.classify(RuntimeException("quux"))) + } + + @Test + fun testRuleOrdering() { + Assertions.assertEquals( + ConfigError("grouped: has foo\nhttps://www.youtube.com/watch?v=xvFZjo5PgG0"), + classifier.classify(RuntimeException("foobarbaz")), + ) + Assertions.assertEquals( + TransientError("barbaz"), + classifier.classify(RuntimeException("barbaz")), + ) + } +} diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/check/CheckOperation.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/check/CheckOperation.kt index d9faea2a5eab..d6389c585e02 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/check/CheckOperation.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/check/CheckOperation.kt @@ -8,7 +8,7 @@ import io.airbyte.cdk.command.ConfigurationJsonObjectSupplier import io.airbyte.cdk.command.SourceConfiguration import io.airbyte.cdk.command.SourceConfigurationFactory import io.airbyte.cdk.discover.MetadataQuerier -import io.airbyte.cdk.output.ExceptionClassifier +import io.airbyte.cdk.output.ExceptionHandler import io.airbyte.cdk.output.OutputConsumer import io.airbyte.protocol.models.v0.AirbyteConnectionStatus import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage @@ -24,7 +24,7 @@ class CheckOperation( val configFactory: SourceConfigurationFactory, val metadataQuerierFactory: MetadataQuerier.Factory, val outputConsumer: OutputConsumer, - val exceptionClassifier: ExceptionClassifier, + val exceptionHandler: ExceptionHandler, ) : Operation { private val log = KotlinLogging.logger {} @@ -36,10 +36,14 @@ class CheckOperation( log.info { "Building internal connector configuration object." } val config: SourceConfiguration = configFactory.make(pojo) log.info { "Connecting for config check." } - metadataQuerierFactory.session(config).use { connectionCheck(it) } + metadataQuerierFactory.session(config).use { + connectionCheck(it) + it.extraChecks() + } } catch (e: Exception) { log.debug(e) { "Exception while checking config." } - val errorTraceMessage: AirbyteErrorTraceMessage = exceptionClassifier.handle(e) + val errorTraceMessage: AirbyteErrorTraceMessage = exceptionHandler.handle(e) + errorTraceMessage.failureType = AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR outputConsumer.accept(errorTraceMessage) val connectionStatusMessage: String = String.format(COMMON_EXCEPTION_MESSAGE_TEMPLATE, errorTraceMessage.message) diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/MetadataQuerier.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/MetadataQuerier.kt index d671576cc76f..3fea5fddef38 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/MetadataQuerier.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/MetadataQuerier.kt @@ -24,6 +24,9 @@ interface MetadataQuerier : AutoCloseable { streamNamespace: String?, ): List> + /** Executes extra checks which throw a [io.airbyte.cdk.ConfigErrorException] on failure. */ + fun extraChecks() + /** Factory for [MetadataQuerier] instances. */ fun interface Factory { /** An implementation might open a connection to build a [MetadataQuerier] instance. */ diff --git a/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/ResourceDrivenMetadataQuerierFactory.kt b/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/ResourceDrivenMetadataQuerierFactory.kt index e67a7617441f..35556e67f9b9 100644 --- a/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/ResourceDrivenMetadataQuerierFactory.kt +++ b/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/ResourceDrivenMetadataQuerierFactory.kt @@ -72,6 +72,8 @@ class ResourceDrivenMetadataQuerierFactory( ?: throw SQLException("query failed", "tbl") } + override fun extraChecks() {} + override fun close() { isClosed = true } diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/check/JdbcCheckQueries.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/check/JdbcCheckQueries.kt new file mode 100644 index 000000000000..f90cd191b508 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/check/JdbcCheckQueries.kt @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.check + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings +import io.airbyte.cdk.ConfigErrorException +import io.github.oshai.kotlinlogging.KotlinLogging +import io.micronaut.context.annotation.ConfigurationProperties +import jakarta.inject.Singleton +import java.sql.Connection +import java.sql.ResultSet +import java.sql.Statement + +const val CHECK_QUERIES_PREFIX = "airbyte.connector.check.jdbc" + +/** + * Micronaut configuration object which implements + * [io.airbyte.cdk.discover.MetadataQuerier.extraChecks] in + * [io.airbyte.cdk.discover.JdbcMetadataQuerier]. + * + * The configuration values are a list of SQL queries which are executed in sequence. Any query + * which returns a non-empty result set containing anything other than NULL values or blank strings + * will throw a [io.airbyte.cdk.ConfigErrorException]. + */ +@Singleton +@ConfigurationProperties(CHECK_QUERIES_PREFIX) +@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI") +class JdbcCheckQueries { + + // Micronaut configuration objects work better with mutable properties. + lateinit var queries: List + + private val log = KotlinLogging.logger {} + + /** Delegated to by [io.airbyte.cdk.discover.JdbcMetadataQuerier.extraChecks]. */ + fun executeAll(conn: Connection) { + if (queries.isEmpty()) return + log.info { "Executing ${queries.size} check queries." } + queries.forEachIndexed { index: Int, sql: String -> + conn.createStatement().use { stmt: Statement -> + log.info { "Executing check query ${index+1} / ${queries.size}: '$sql'." } + val error: String = stmt.executeQuery(sql).use { rs: ResultSet -> stringify(rs) } + if (error.isNotBlank()) { + log.warn { "Check query ${index+1} / ${queries.size} failed with: '$error'." } + throw ConfigErrorException(error) + } + log.info { "Check query ${index+1} / ${queries.size} succeeded." } + } + } + } + + companion object { + + /** Turns a [ResultSet] into a pretty and meaningful one-line string; empty otherwise. */ + fun stringify(rs: ResultSet): String { + val sb = StringBuilder() + var firstRow = true + while (rs.next()) { + var firstColumn = true + for (i in 1..rs.metaData.columnCount) { + // Get column value, ignore NULLs or blank strings + val value: String = + rs.getString(i)?.takeUnless { rs.wasNull() }?.takeUnless { it.isBlank() } + ?: continue + // Print column or row separator, if required + if (firstColumn) { + firstColumn = false + if (firstRow) { + firstRow = false + } else { + sb.append("; ") + } + } else { + sb.append(", ") + } + // Print key-value pair + sb.append(rs.metaData.getColumnName(i)).append(": ").append(value) + } + } + return sb.toString().trim() + } + } +} diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt index a5741a44a33d..1ca85509a81e 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt @@ -1,6 +1,7 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.cdk.discover +import io.airbyte.cdk.check.JdbcCheckQueries import io.airbyte.cdk.command.JdbcSourceConfiguration import io.airbyte.cdk.jdbc.DefaultJdbcConstants import io.airbyte.cdk.jdbc.DefaultJdbcConstants.NamespaceKind @@ -28,6 +29,7 @@ class JdbcMetadataQuerier( val config: JdbcSourceConfiguration, val selectQueryGenerator: SelectQueryGenerator, val fieldTypeMapper: FieldTypeMapper, + val checkQueries: JdbcCheckQueries, jdbcConnectionFactory: JdbcConnectionFactory, ) : MetadataQuerier { val conn: Connection = jdbcConnectionFactory.get() @@ -321,6 +323,10 @@ class JdbcMetadataQuerier( val columnName: String, ) + override fun extraChecks() { + checkQueries.executeAll(conn) + } + override fun close() { log.info { "Closing JDBC connection." } conn.close() @@ -331,6 +337,7 @@ class JdbcMetadataQuerier( class Factory( val selectQueryGenerator: SelectQueryGenerator, val fieldTypeMapper: FieldTypeMapper, + val checkQueries: JdbcCheckQueries, val constants: DefaultJdbcConstants, ) : MetadataQuerier.Factory { /** The [JdbcSourceConfiguration] is deliberately not injected in order to support tests. */ @@ -341,6 +348,7 @@ class JdbcMetadataQuerier( config, selectQueryGenerator, fieldTypeMapper, + checkQueries, jdbcConnectionFactory, ) } diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/output/JdbcExceptionClassifier.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/output/JdbcExceptionClassifier.kt index bd1676ce2912..806f0c1d10c6 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/output/JdbcExceptionClassifier.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/output/JdbcExceptionClassifier.kt @@ -4,35 +4,70 @@ package io.airbyte.cdk.output -import io.airbyte.cdk.Operation -import io.micronaut.context.annotation.Replaces +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings +import io.micronaut.context.annotation.EachProperty +import io.micronaut.context.annotation.Parameter +import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Value +import jakarta.inject.Singleton import java.sql.SQLException -@Replaces(ExceptionClassifier::class) -class JdbcExceptionClassifier(@Value("\${${Operation.PROPERTY}}") operationName: String) : - ExceptionClassifier { +const val JDBC_CLASSIFIER_PREFIX = "${EXCEPTION_CLASSIFIER_PREFIX}.jdbc" - val isCheck: Boolean = operationName == "check" +/** [ExceptionClassifier] implementation based on [SQLException] vendor error codes. */ +@Singleton +@Requires(property = "${JDBC_CLASSIFIER_PREFIX}.rules") +class JdbcExceptionClassifier( + @Value("\${$JDBC_CLASSIFIER_PREFIX.order:100}") override val orderValue: Int, + override val rules: List, +) : RuleBasedExceptionClassifier { - override fun classify(e: Throwable): ConnectorError? = - if (isCheck && e is SQLException) { - ConfigError(sqlExceptionDisplayMessage(e)) - } else { - null + init { + for (rule in rules) { + rule.validate() } + } - override fun fallbackDisplayMessage(e: Throwable): String? = + override fun classify(e: Throwable): ConnectorError? { + if (e !is SQLException) return null + val decoratedMessage: String = + listOfNotNull( + e.sqlState?.let { "State code: $it" }, + e.errorCode.takeIf { it != 0 }?.let { "Error code: $it" }, + e.message?.let { "Message: $it" }, + ) + .joinToString(separator = "; ") + val decoratedException = SQLException(decoratedMessage, e.sqlState, e.errorCode) + val ruleBasedMatch: ConnectorError? = super.classify(decoratedException) + if (ruleBasedMatch != null) { + return ruleBasedMatch + } + return SystemError(decoratedMessage) + } +} + +/** Micronaut configuration object for [JdbcExceptionClassifier] rules. */ +@EachProperty("${JDBC_CLASSIFIER_PREFIX}.rules", list = true) +@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI") +class JdbcExceptionClassifierRule( + @param:Parameter override val ordinal: Int, +) : RuleBasedExceptionClassifier.Rule { + + // Micronaut configuration objects work better with mutable properties. + override lateinit var error: RuleBasedExceptionClassifier.ErrorKind + var code: Int = 0 + override var group: String? = null + override var output: String? = null + override var referenceLinks: List = emptyList() + + override fun matches(e: Throwable): Boolean = when (e) { - is SQLException -> sqlExceptionDisplayMessage(e) - else -> null + is SQLException -> e.errorCode == code + else -> false } - fun sqlExceptionDisplayMessage(e: SQLException): String = - listOfNotNull( - e.sqlState?.let { "State code: $it" }, - e.errorCode.takeIf { it != 0 }?.let { "Error code: $it" }, - e.message?.let { "Message: $it" }, - ) - .joinToString(separator = "; ") + override fun validate() { + require(runCatching { error }.isSuccess) { "error kind must be set" } + require(code != 0) { "vendor code must be non-zero" } + } } diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/check/JdbcCheckQueriesTest.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/check/JdbcCheckQueriesTest.kt new file mode 100644 index 000000000000..ca8f20a6e9be --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/check/JdbcCheckQueriesTest.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.check + +import io.airbyte.cdk.ConfigErrorException +import io.airbyte.cdk.h2.H2TestFixture +import io.micronaut.context.annotation.Property +import io.micronaut.test.extensions.junit5.annotation.MicronautTest +import jakarta.inject.Inject +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +const val Q = "${CHECK_QUERIES_PREFIX}.queries" + +@MicronautTest(rebuildContext = true) +class JdbcCheckQueriesTest { + + val h2 = H2TestFixture() + + @Inject lateinit var checkQueries: JdbcCheckQueries + + @Test + @Property(name = "$Q[0]", value = "SELECT DATABASE_PATH() FROM DUAL") + fun testPass() { + Assertions.assertDoesNotThrow { h2.createConnection().use { checkQueries.executeAll(it) } } + } + + @Test + @Property(name = "$Q[0]", value = "SELECT DATABASE_PATH() FROM DUAL") + @Property(name = "$Q[1]", value = "SELECT H2VERSION() FROM DUAL") + fun testFail() { + lateinit var message: String + Assertions.assertThrows(ConfigErrorException::class.java) { + try { + h2.createConnection().use { checkQueries.executeAll(it) } + } catch (e: Exception) { + e.message?.let { message = it } + throw e + } + } + Assertions.assertEquals("H2VERSION(): 2.2.224", message) + } +} diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerierTest.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerierTest.kt index 9def9c73b6f5..e3d46ce89192 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerierTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerierTest.kt @@ -1,6 +1,7 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.cdk.discover +import io.airbyte.cdk.check.JdbcCheckQueries import io.airbyte.cdk.h2.H2TestFixture import io.airbyte.cdk.h2source.H2SourceConfiguration import io.airbyte.cdk.h2source.H2SourceConfigurationFactory @@ -22,6 +23,7 @@ class JdbcMetadataQuerierTest { JdbcMetadataQuerier.Factory( selectQueryGenerator = H2SourceOperations(), fieldTypeMapper = H2SourceOperations(), + checkQueries = JdbcCheckQueries().apply { queries = listOf() }, constants = DefaultJdbcConstants(), ) diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/h2source/H2SourceIntegrationTest.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/h2source/H2SourceIntegrationTest.kt index fb59b9072cef..f3ff0680dcd7 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/h2source/H2SourceIntegrationTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/h2source/H2SourceIntegrationTest.kt @@ -19,11 +19,12 @@ class H2SourceIntegrationTest { @Test fun testCheckFailBadConfig() { SyncsTestFixture.testCheck( - H2SourceConfigurationJsonObject().apply { - port = -1 - database = "" - }, - "Could not connect with provided configuration", + configPojo = + H2SourceConfigurationJsonObject().apply { + port = -1 + database = "" + }, + expectedFailure = "Could not connect with provided configuration", ) } @@ -35,7 +36,7 @@ class H2SourceIntegrationTest { port = h2.port database = h2.database + "_garbage" } - SyncsTestFixture.testCheck(configPojo, "Database \"mem:.*_garbage\" not found") + SyncsTestFixture.testCheck(configPojo, "Connection failure: Database does not exist") } } diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/output/JdbcExceptionClassifierTest.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/output/JdbcExceptionClassifierTest.kt new file mode 100644 index 000000000000..92d12361d862 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/output/JdbcExceptionClassifierTest.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.output + +import io.micronaut.context.annotation.Property +import io.micronaut.context.annotation.PropertySource +import io.micronaut.test.extensions.junit5.annotation.MicronautTest +import jakarta.inject.Inject +import java.sql.SQLException +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +private const val RULE1 = "${JDBC_CLASSIFIER_PREFIX}.rules[1]" +private const val RULE2 = "${JDBC_CLASSIFIER_PREFIX}.rules[2]" + +@MicronautTest +@PropertySource( + value = + [ + // RULE0 is already defined in application.yml + Property(name = "$RULE1.error", value = "transient"), + Property(name = "$RULE1.code", value = "123"), + Property(name = "$RULE1.group", value = "grouped"), + Property(name = "$RULE2.error", value = "system"), + Property(name = "$RULE2.code", value = "456"), + ] +) +class JdbcExceptionClassifierTest { + + @Inject lateinit var classifier: JdbcExceptionClassifier + + @Test + fun testConfigError() { + Assertions.assertEquals( + ConfigError("Connection failure: Database does not exist"), + classifier.classify(SQLException("foo", "bar", 90149)), + ) + } + + @Test + fun testTransientError() { + Assertions.assertEquals( + TransientError("grouped: State code: bar; Error code: 123; Message: foo"), + classifier.classify(SQLException("foo", "bar", 123)), + ) + } + + @Test + fun testSystemError() { + Assertions.assertEquals( + SystemError("State code: bar; Error code: 456; Message: foo"), + classifier.classify(SQLException("foo", "bar", 456)), + ) + } + + @Test + fun testUnclassified() { + Assertions.assertEquals( + SystemError("State code: bar; Error code: 789; Message: foo"), + classifier.classify(SQLException("foo", "bar", 789)), + ) + } +} diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/testFixtures/resources/application.yml b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/testFixtures/resources/application.yml index 52d72019ae54..366de0d4be1f 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/testFixtures/resources/application.yml +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/testFixtures/resources/application.yml @@ -1,6 +1,20 @@ --- airbyte: connector: + check: + jdbc: + queries: + - SELECT DATABASE_PATH() FROM DUAL + extract: jdbc: mode: sequential + + exception-classifiers: + jdbc: + order: 123 + rules: + - error: config + code: 90149 + group: "Connection failure" + output: "Database does not exist"