Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulk-cdk: add more exception classifier implementations, add extra checks #44824

Merged
merged 5 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
package io.airbyte.cdk

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.output.ExceptionClassifier
import io.airbyte.cdk.output.ExceptionHandler
import io.airbyte.cdk.output.OutputConsumer
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Value
Expand All @@ -20,7 +20,7 @@ class AirbyteConnectorRunnable : Runnable {

@Inject lateinit var outputConsumer: OutputConsumer

@Inject lateinit var exceptionClassifier: ExceptionClassifier
@Inject lateinit var exceptionHandler: ExceptionHandler

override fun run() {
var operation: Operation? = null
Expand All @@ -40,7 +40,7 @@ class AirbyteConnectorRunnable : Runnable {
"Failed ${operation::class} operation execution."
}
}
outputConsumer.accept(exceptionClassifier.handle(e))
outputConsumer.accept(exceptionHandler.handle(e))
throw e
} finally {
log.info { "Flushing output consumer prior to shutdown." }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ class TransientErrorException(
) : ConnectorErrorException(displayMessage, exception)

/** See [io.airbyte.cdk.output.SystemError]. */
class SystemErrorException
private constructor(
class SystemErrorException(
displayMessage: String?,
exception: Throwable? = null,
) : ConnectorErrorException(displayMessage, exception)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.output

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.ConnectorErrorException
import io.airbyte.cdk.SystemErrorException
import io.airbyte.cdk.TransientErrorException
import io.micronaut.context.annotation.ConfigurationProperties
import io.micronaut.context.annotation.Value
import jakarta.inject.Singleton

const val DEFAULT_CLASSIFIER_PREFIX = "${EXCEPTION_CLASSIFIER_PREFIX}.default"

/** Default implementation of [ExceptionClassifier]. */
@Singleton
@ConfigurationProperties(DEFAULT_CLASSIFIER_PREFIX)
class DefaultExceptionClassifier(
@Value("\${$DEFAULT_CLASSIFIER_PREFIX.order:1}") override val orderValue: Int
) : ExceptionClassifier {

override fun classify(e: Throwable): ConnectorError? {
return when (val connectorErrorException: ConnectorErrorException? = unwind(e)) {
is ConfigErrorException -> ConfigError(connectorErrorException.message!!)
is TransientErrorException -> TransientError(connectorErrorException.message!!)
is SystemErrorException -> SystemError(connectorErrorException.message)
null -> null
}
}

/** Recursively walks the causes of [e] and returns the last [ConnectorErrorException]. */
fun unwind(e: Throwable): ConnectorErrorException? {
var connectorErrorException: ConnectorErrorException? = null
var unwound: Throwable? = e
while (unwound != null) {
if (unwound is ConnectorErrorException) {
connectorErrorException = unwound
}
unwound = unwound.cause
}
return connectorErrorException
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,18 @@
*/
package io.airbyte.cdk.output

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.ConnectorErrorException
import io.airbyte.cdk.SystemErrorException
import io.airbyte.cdk.TransientErrorException
import io.airbyte.cdk.util.ApmTraceUtils
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage
import io.micronaut.context.annotation.DefaultImplementation
import jakarta.inject.Singleton
import org.apache.commons.lang3.exception.ExceptionUtils
import io.micronaut.core.order.Ordered

@Singleton
@DefaultImplementation(DefaultExceptionClassifier::class)
fun interface ExceptionClassifier {
interface ExceptionClassifier : Ordered {

/** Classifies [e] into a [ConnectorError] if possible, null otherwise. */
fun classify(e: Throwable): ConnectorError?

/** [SystemError] display message for [e] in case it can't be classified. */
fun fallbackDisplayMessage(e: Throwable): String? = e.message

/** Maps [e] to a [AirbyteErrorTraceMessage] to be passed to the [OutputConsumer]. */
fun handle(e: Throwable): AirbyteErrorTraceMessage {
ApmTraceUtils.addExceptionToTrace(e)
val connectorError: ConnectorError =
DefaultExceptionClassifier().classify(e)
?: classify(e) ?: SystemError(fallbackDisplayMessage(e) ?: e.message)
val errorTraceMessage =
AirbyteErrorTraceMessage()
.withInternalMessage(e.toString())
.withStackTrace(ExceptionUtils.getStackTrace(e))
return when (connectorError) {
is ConfigError ->
errorTraceMessage
.withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
.withMessage(connectorError.displayMessage)
is TransientError ->
errorTraceMessage
.withFailureType(AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR)
.withMessage(connectorError.displayMessage)
is SystemError ->
errorTraceMessage
.withFailureType(AirbyteErrorTraceMessage.FailureType.SYSTEM_ERROR)
.withMessage(connectorError.displayMessage ?: e.message)
}
}
/** Convenience val for [getOrder]. */
val orderValue: Int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice I like this - enables us to override some specific errors.


override fun getOrder(): Int = orderValue
}

/** Each [ConnectorError] subtype corresponds to a [AirbyteErrorTraceMessage.FailureType]. */
Expand Down Expand Up @@ -75,29 +42,63 @@ data class TransientError(val displayMessage: String) : ConnectorError
*/
data class SystemError(val displayMessage: String?) : ConnectorError

/** Default implementation of [ExceptionClassifier]. */
@Singleton
class DefaultExceptionClassifier : ExceptionClassifier {
/** Common Micronaut property prefix for all exception classifiers. */
const val EXCEPTION_CLASSIFIER_PREFIX = "airbyte.connector.exception-classifiers"

/** Convenience interface for rules-based [ExceptionClassifier] implementations. */
interface RuleBasedExceptionClassifier<T : RuleBasedExceptionClassifier.Rule> :
ExceptionClassifier {

/** List of rules to match for. */
val rules: List<T>

override fun classify(e: Throwable): ConnectorError? {
return when (val connectorErrorException: ConnectorErrorException? = unwind(e)) {
is ConfigErrorException -> ConfigError(connectorErrorException.message!!)
is TransientErrorException -> TransientError(connectorErrorException.message!!)
is SystemErrorException -> SystemError(connectorErrorException.message)
null -> null
for (rule in rules) {
if (!rule.matches(e)) {
continue
}
val message: String = rule.output ?: e.message ?: e.toString()
val firstLine: String = if (rule.group == null) message else "${rule.group}: $message"
val lines: List<String> = listOf(firstLine) + rule.referenceLinks
val displayMessage: String = lines.joinToString(separator = "\n")
return when (rule.error) {
ErrorKind.CONFIG -> ConfigError(displayMessage)
ErrorKind.TRANSIENT -> TransientError(displayMessage)
ErrorKind.SYSTEM -> SystemError(displayMessage)
}
}
return null
}

/** Recursively walks the causes of [e] and returns the last [ConnectorErrorException]. */
fun unwind(e: Throwable): ConnectorErrorException? {
var connectorErrorException: ConnectorErrorException? = null
var unwound: Throwable? = e
while (unwound != null) {
if (unwound is ConnectorErrorException) {
connectorErrorException = unwound
}
unwound = unwound.cause
}
return connectorErrorException
interface Rule : Ordered {

/** Rule ordinal in the rule set. */
val ordinal: Int

/** If the rule matches, the kind of [ConnectorError] to produce. */
val error: ErrorKind

/** Optional display message prefix. */
val group: String?

/** Optional display message. */
val output: String?

/** Optional list of reference links to display. */
val referenceLinks: List<String>

/** Rule predicate. */
fun matches(e: Throwable): Boolean

override fun getOrder(): Int = ordinal

/** Validates rule definition correctness. */
fun validate()
}

enum class ErrorKind {
CONFIG,
TRANSIENT,
SYSTEM,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.output

import io.airbyte.cdk.util.ApmTraceUtils
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage
import jakarta.inject.Singleton
import org.apache.commons.lang3.exception.ExceptionUtils

/** [ExceptionHandler] applies all available [ExceptionClassifier] implementations in sequence. */
@Singleton
class ExceptionHandler(val classifiers: List<ExceptionClassifier>) {

fun classify(e: Throwable): ConnectorError {
for (classifier in classifiers) {
val classified: ConnectorError? = classifier.classify(e)
if (classified != null) {
return classified
}
}
return SystemError(e.message)
}

/** Maps [e] to a [AirbyteErrorTraceMessage] to be passed to the [OutputConsumer]. */
fun handle(e: Throwable): AirbyteErrorTraceMessage {
ApmTraceUtils.addExceptionToTrace(e)
val errorTraceMessage =
AirbyteErrorTraceMessage()
.withInternalMessage(e.toString())
.withStackTrace(ExceptionUtils.getStackTrace(e))
return when (val classified: ConnectorError = classify(e)) {
is ConfigError ->
errorTraceMessage
.withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
.withMessage(classified.displayMessage)
is TransientError ->
errorTraceMessage
.withFailureType(AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR)
.withMessage(classified.displayMessage)
is SystemError ->
errorTraceMessage
.withFailureType(AirbyteErrorTraceMessage.FailureType.SYSTEM_ERROR)
.withMessage(classified.displayMessage ?: e.message)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.output

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.micronaut.context.annotation.EachProperty
import io.micronaut.context.annotation.Parameter
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import jakarta.inject.Singleton

const val REGEX_CLASSIFIER_PREFIX = "${EXCEPTION_CLASSIFIER_PREFIX}.regex"

/** [ExceptionClassifier] implementation based on regexes applied to the exception message. */
@Singleton
@Requires(property = "${REGEX_CLASSIFIER_PREFIX}.rules")
class RegexExceptionClassifier(
@Value("\${${REGEX_CLASSIFIER_PREFIX}.order:10}") override val orderValue: Int,
override val rules: List<RegexExceptionClassifierRule>,
) : RuleBasedExceptionClassifier<RegexExceptionClassifierRule> {

init {
for (rule in rules) {
rule.validate()
}
}
}

/** Micronaut configuration object for [RuleBasedExceptionClassifier] rules. */
@EachProperty("${REGEX_CLASSIFIER_PREFIX}.rules", list = true)
@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI")
class RegexExceptionClassifierRule(
@param:Parameter override val ordinal: Int,
) : RuleBasedExceptionClassifier.Rule {

// Micronaut configuration objects work better with mutable properties.
override lateinit var error: RuleBasedExceptionClassifier.ErrorKind
lateinit var pattern: String
lateinit var inputExample: String
override var group: String? = null
override var output: String? = null
override var referenceLinks: List<String> = emptyList()

val regex: Regex by lazy {
pattern.toRegex(setOf(RegexOption.MULTILINE, RegexOption.IGNORE_CASE))
}

override fun matches(e: Throwable): Boolean =
e.message?.let { regex.containsMatchIn(it) } ?: false

override fun validate() {
require(runCatching { error }.isSuccess) { "error kind must be set" }
require(runCatching { pattern }.isSuccess) { "regex pattern must be set" }
require(runCatching { inputExample }.isSuccess) {
"input exception message example must be set"
}
val compileResult: Result<Regex> = runCatching { regex }
require(compileResult.isSuccess) {
"regex pattern error: ${compileResult.exceptionOrNull()?.message}"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.output

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.SystemErrorException
import io.airbyte.cdk.TransientErrorException
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Inject
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

@MicronautTest
class DefaultExceptionClassifierTest {

@Inject lateinit var classifier: DefaultExceptionClassifier

@Test
fun testConfigError() {
Assertions.assertEquals(
ConfigError("foo"),
classifier.classify(ConfigErrorException("foo")),
)
}

@Test
fun testTransientError() {
Assertions.assertEquals(
TransientError("bar"),
classifier.classify(TransientErrorException("bar")),
)
}

@Test
fun testSystemError() {
Assertions.assertEquals(
SystemError("baz"),
classifier.classify(SystemErrorException("baz")),
)
}

@Test
fun testUnclassified() {
Assertions.assertNull(classifier.classify(RuntimeException("quux")))
}
}
Loading
Loading