Skip to content

Commit

Permalink
bulk-cdk: add more exception classifier implementations, add extra ch…
Browse files Browse the repository at this point in the history
…ecks (#44824)
  • Loading branch information
postamar authored Aug 27, 2024
1 parent 67e82e1 commit 70732ca
Show file tree
Hide file tree
Showing 20 changed files with 690 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
package io.airbyte.cdk

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.output.ExceptionClassifier
import io.airbyte.cdk.output.ExceptionHandler
import io.airbyte.cdk.output.OutputConsumer
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Value
Expand All @@ -20,7 +20,7 @@ class AirbyteConnectorRunnable : Runnable {

@Inject lateinit var outputConsumer: OutputConsumer

@Inject lateinit var exceptionClassifier: ExceptionClassifier
@Inject lateinit var exceptionHandler: ExceptionHandler

override fun run() {
var operation: Operation? = null
Expand All @@ -40,7 +40,7 @@ class AirbyteConnectorRunnable : Runnable {
"Failed ${operation::class} operation execution."
}
}
outputConsumer.accept(exceptionClassifier.handle(e))
outputConsumer.accept(exceptionHandler.handle(e))
throw e
} finally {
log.info { "Flushing output consumer prior to shutdown." }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ class TransientErrorException(
) : ConnectorErrorException(displayMessage, exception)

/** See [io.airbyte.cdk.output.SystemError]. */
class SystemErrorException
private constructor(
class SystemErrorException(
displayMessage: String?,
exception: Throwable? = null,
) : ConnectorErrorException(displayMessage, exception)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.output

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.ConnectorErrorException
import io.airbyte.cdk.SystemErrorException
import io.airbyte.cdk.TransientErrorException
import io.micronaut.context.annotation.ConfigurationProperties
import io.micronaut.context.annotation.Value
import jakarta.inject.Singleton

const val DEFAULT_CLASSIFIER_PREFIX = "${EXCEPTION_CLASSIFIER_PREFIX}.default"

/** Default implementation of [ExceptionClassifier]. */
@Singleton
@ConfigurationProperties(DEFAULT_CLASSIFIER_PREFIX)
class DefaultExceptionClassifier(
@Value("\${$DEFAULT_CLASSIFIER_PREFIX.order:1}") override val orderValue: Int
) : ExceptionClassifier {

override fun classify(e: Throwable): ConnectorError? {
return when (val connectorErrorException: ConnectorErrorException? = unwind(e)) {
is ConfigErrorException -> ConfigError(connectorErrorException.message!!)
is TransientErrorException -> TransientError(connectorErrorException.message!!)
is SystemErrorException -> SystemError(connectorErrorException.message)
null -> null
}
}

/** Recursively walks the causes of [e] and returns the last [ConnectorErrorException]. */
fun unwind(e: Throwable): ConnectorErrorException? {
var connectorErrorException: ConnectorErrorException? = null
var unwound: Throwable? = e
while (unwound != null) {
if (unwound is ConnectorErrorException) {
connectorErrorException = unwound
}
unwound = unwound.cause
}
return connectorErrorException
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,18 @@
*/
package io.airbyte.cdk.output

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.ConnectorErrorException
import io.airbyte.cdk.SystemErrorException
import io.airbyte.cdk.TransientErrorException
import io.airbyte.cdk.util.ApmTraceUtils
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage
import io.micronaut.context.annotation.DefaultImplementation
import jakarta.inject.Singleton
import org.apache.commons.lang3.exception.ExceptionUtils
import io.micronaut.core.order.Ordered

@Singleton
@DefaultImplementation(DefaultExceptionClassifier::class)
fun interface ExceptionClassifier {
interface ExceptionClassifier : Ordered {

/** Classifies [e] into a [ConnectorError] if possible, null otherwise. */
fun classify(e: Throwable): ConnectorError?

/** [SystemError] display message for [e] in case it can't be classified. */
fun fallbackDisplayMessage(e: Throwable): String? = e.message

/** Maps [e] to a [AirbyteErrorTraceMessage] to be passed to the [OutputConsumer]. */
fun handle(e: Throwable): AirbyteErrorTraceMessage {
ApmTraceUtils.addExceptionToTrace(e)
val connectorError: ConnectorError =
DefaultExceptionClassifier().classify(e)
?: classify(e) ?: SystemError(fallbackDisplayMessage(e) ?: e.message)
val errorTraceMessage =
AirbyteErrorTraceMessage()
.withInternalMessage(e.toString())
.withStackTrace(ExceptionUtils.getStackTrace(e))
return when (connectorError) {
is ConfigError ->
errorTraceMessage
.withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
.withMessage(connectorError.displayMessage)
is TransientError ->
errorTraceMessage
.withFailureType(AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR)
.withMessage(connectorError.displayMessage)
is SystemError ->
errorTraceMessage
.withFailureType(AirbyteErrorTraceMessage.FailureType.SYSTEM_ERROR)
.withMessage(connectorError.displayMessage ?: e.message)
}
}
/** Convenience val for [getOrder]. */
val orderValue: Int

override fun getOrder(): Int = orderValue
}

/** Each [ConnectorError] subtype corresponds to a [AirbyteErrorTraceMessage.FailureType]. */
Expand Down Expand Up @@ -75,29 +42,63 @@ data class TransientError(val displayMessage: String) : ConnectorError
*/
data class SystemError(val displayMessage: String?) : ConnectorError

/** Default implementation of [ExceptionClassifier]. */
@Singleton
class DefaultExceptionClassifier : ExceptionClassifier {
/** Common Micronaut property prefix for all exception classifiers. */
const val EXCEPTION_CLASSIFIER_PREFIX = "airbyte.connector.exception-classifiers"

/** Convenience interface for rules-based [ExceptionClassifier] implementations. */
interface RuleBasedExceptionClassifier<T : RuleBasedExceptionClassifier.Rule> :
ExceptionClassifier {

/** List of rules to match for. */
val rules: List<T>

override fun classify(e: Throwable): ConnectorError? {
return when (val connectorErrorException: ConnectorErrorException? = unwind(e)) {
is ConfigErrorException -> ConfigError(connectorErrorException.message!!)
is TransientErrorException -> TransientError(connectorErrorException.message!!)
is SystemErrorException -> SystemError(connectorErrorException.message)
null -> null
for (rule in rules) {
if (!rule.matches(e)) {
continue
}
val message: String = rule.output ?: e.message ?: e.toString()
val firstLine: String = if (rule.group == null) message else "${rule.group}: $message"
val lines: List<String> = listOf(firstLine) + rule.referenceLinks
val displayMessage: String = lines.joinToString(separator = "\n")
return when (rule.error) {
ErrorKind.CONFIG -> ConfigError(displayMessage)
ErrorKind.TRANSIENT -> TransientError(displayMessage)
ErrorKind.SYSTEM -> SystemError(displayMessage)
}
}
return null
}

/** Recursively walks the causes of [e] and returns the last [ConnectorErrorException]. */
fun unwind(e: Throwable): ConnectorErrorException? {
var connectorErrorException: ConnectorErrorException? = null
var unwound: Throwable? = e
while (unwound != null) {
if (unwound is ConnectorErrorException) {
connectorErrorException = unwound
}
unwound = unwound.cause
}
return connectorErrorException
interface Rule : Ordered {

/** Rule ordinal in the rule set. */
val ordinal: Int

/** If the rule matches, the kind of [ConnectorError] to produce. */
val error: ErrorKind

/** Optional display message prefix. */
val group: String?

/** Optional display message. */
val output: String?

/** Optional list of reference links to display. */
val referenceLinks: List<String>

/** Rule predicate. */
fun matches(e: Throwable): Boolean

override fun getOrder(): Int = ordinal

/** Validates rule definition correctness. */
fun validate()
}

enum class ErrorKind {
CONFIG,
TRANSIENT,
SYSTEM,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.output

import io.airbyte.cdk.util.ApmTraceUtils
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage
import jakarta.inject.Singleton
import org.apache.commons.lang3.exception.ExceptionUtils

/** [ExceptionHandler] applies all available [ExceptionClassifier] implementations in sequence. */
@Singleton
class ExceptionHandler(val classifiers: List<ExceptionClassifier>) {

fun classify(e: Throwable): ConnectorError {
for (classifier in classifiers) {
val classified: ConnectorError? = classifier.classify(e)
if (classified != null) {
return classified
}
}
return SystemError(e.message)
}

/** Maps [e] to a [AirbyteErrorTraceMessage] to be passed to the [OutputConsumer]. */
fun handle(e: Throwable): AirbyteErrorTraceMessage {
ApmTraceUtils.addExceptionToTrace(e)
val errorTraceMessage =
AirbyteErrorTraceMessage()
.withInternalMessage(e.toString())
.withStackTrace(ExceptionUtils.getStackTrace(e))
return when (val classified: ConnectorError = classify(e)) {
is ConfigError ->
errorTraceMessage
.withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
.withMessage(classified.displayMessage)
is TransientError ->
errorTraceMessage
.withFailureType(AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR)
.withMessage(classified.displayMessage)
is SystemError ->
errorTraceMessage
.withFailureType(AirbyteErrorTraceMessage.FailureType.SYSTEM_ERROR)
.withMessage(classified.displayMessage ?: e.message)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.output

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.micronaut.context.annotation.EachProperty
import io.micronaut.context.annotation.Parameter
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import jakarta.inject.Singleton

const val REGEX_CLASSIFIER_PREFIX = "${EXCEPTION_CLASSIFIER_PREFIX}.regex"

/** [ExceptionClassifier] implementation based on regexes applied to the exception message. */
@Singleton
@Requires(property = "${REGEX_CLASSIFIER_PREFIX}.rules")
class RegexExceptionClassifier(
@Value("\${${REGEX_CLASSIFIER_PREFIX}.order:10}") override val orderValue: Int,
override val rules: List<RegexExceptionClassifierRule>,
) : RuleBasedExceptionClassifier<RegexExceptionClassifierRule> {

init {
for (rule in rules) {
rule.validate()
}
}
}

/** Micronaut configuration object for [RuleBasedExceptionClassifier] rules. */
@EachProperty("${REGEX_CLASSIFIER_PREFIX}.rules", list = true)
@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI")
class RegexExceptionClassifierRule(
@param:Parameter override val ordinal: Int,
) : RuleBasedExceptionClassifier.Rule {

// Micronaut configuration objects work better with mutable properties.
override lateinit var error: RuleBasedExceptionClassifier.ErrorKind
lateinit var pattern: String
lateinit var inputExample: String
override var group: String? = null
override var output: String? = null
override var referenceLinks: List<String> = emptyList()

val regex: Regex by lazy {
pattern.toRegex(setOf(RegexOption.MULTILINE, RegexOption.IGNORE_CASE))
}

override fun matches(e: Throwable): Boolean =
e.message?.let { regex.containsMatchIn(it) } ?: false

override fun validate() {
require(runCatching { error }.isSuccess) { "error kind must be set" }
require(runCatching { pattern }.isSuccess) { "regex pattern must be set" }
require(runCatching { inputExample }.isSuccess) {
"input exception message example must be set"
}
val compileResult: Result<Regex> = runCatching { regex }
require(compileResult.isSuccess) {
"regex pattern error: ${compileResult.exceptionOrNull()?.message}"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.output

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.SystemErrorException
import io.airbyte.cdk.TransientErrorException
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Inject
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

@MicronautTest
class DefaultExceptionClassifierTest {

@Inject lateinit var classifier: DefaultExceptionClassifier

@Test
fun testConfigError() {
Assertions.assertEquals(
ConfigError("foo"),
classifier.classify(ConfigErrorException("foo")),
)
}

@Test
fun testTransientError() {
Assertions.assertEquals(
TransientError("bar"),
classifier.classify(TransientErrorException("bar")),
)
}

@Test
fun testSystemError() {
Assertions.assertEquals(
SystemError("baz"),
classifier.classify(SystemErrorException("baz")),
)
}

@Test
fun testUnclassified() {
Assertions.assertNull(classifier.classify(RuntimeException("quux")))
}
}
Loading

0 comments on commit 70732ca

Please sign in to comment.