Skip to content

Commit

Permalink
make sure AirbyteExceptionHandler always terminates
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed May 10, 2024
1 parent 1188221 commit 29554d9
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 44 deletions.
3 changes: 2 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ corresponds to that version.
### Java CDK

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.34.3 | 2024-05-10 | [\#38095](https://github.com/airbytehq/airbyte/pull/38095) | Minor changes for databricks connector |
| 0.34.1 | 2024-05-07 | [\#38030](https://github.com/airbytehq/airbyte/pull/38030) | Add support for transient errors |
| 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,51 +26,58 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
// the sync."
// from the spec:
// https://docs.google.com/document/d/1ctrj3Yh_GjtQ93aND-WH3ocqGxsmxyC3jfiarrF6NY0/edit#
LOGGER.error(logMessage, throwable)
// Attempt to deinterpolate the error message before emitting a trace message
val mangledMessage: String?
// If any exception in the chain is of a deinterpolatable type, find it and deinterpolate
// its
// message.
// This assumes that any wrapping exceptions are just noise (e.g. runtime exception).
val deinterpolatableException =
ExceptionUtils.getThrowableList(throwable)
.stream()
.filter { t: Throwable ->
THROWABLES_TO_DEINTERPOLATE.stream().anyMatch {
deinterpolatableClass: Class<out Throwable> ->
deinterpolatableClass.isAssignableFrom(t.javaClass)
try {
LOGGER.error(logMessage, throwable)
// Attempt to deinterpolate the error message before emitting a trace message
val mangledMessage: String?
// If any exception in the chain is of a deinterpolatable type, find it and
// deinterpolate
// its
// message.
// This assumes that any wrapping exceptions are just noise (e.g. runtime exception).
val deinterpolatableException =
ExceptionUtils.getThrowableList(throwable)
.stream()
.filter { t: Throwable ->
THROWABLES_TO_DEINTERPOLATE.stream().anyMatch {
deinterpolatableClass: Class<out Throwable> ->
deinterpolatableClass.isAssignableFrom(t.javaClass)
}
}
}
.findFirst()
val messageWasMangled: Boolean
if (deinterpolatableException.isPresent) {
val originalMessage = deinterpolatableException.get().message
mangledMessage =
STRINGS_TO_DEINTERPOLATE
.stream() // Sort the strings longest to shortest, in case any target string is
// a substring of another
// e.g. "airbyte_internal" should be swapped out before "airbyte"
.sorted(Comparator.comparing { obj: String -> obj.length }.reversed())
.reduce(originalMessage) { message: String?, targetString: String? ->
deinterpolate(message, targetString)
}
messageWasMangled = mangledMessage != originalMessage
} else {
mangledMessage = throwable.message
messageWasMangled = false
}
.findFirst()
val messageWasMangled: Boolean
if (deinterpolatableException.isPresent) {
val originalMessage = deinterpolatableException.get().message
mangledMessage =
STRINGS_TO_DEINTERPOLATE
.stream() // Sort the strings longest to shortest, in case any target string
// is
// a substring of another
// e.g. "airbyte_internal" should be swapped out before "airbyte"
.sorted(Comparator.comparing { obj: String -> obj.length }.reversed())
.reduce(originalMessage) { message: String?, targetString: String? ->
deinterpolate(message, targetString)
}
messageWasMangled = mangledMessage != originalMessage
} else {
mangledMessage = throwable.message
messageWasMangled = false
}

if (!messageWasMangled) {
// If we did not modify the message (either not a deinterpolatable class, or we tried to
// deinterpolate but made no changes) then emit our default trace message
AirbyteTraceMessageUtility.emitSystemErrorTrace(throwable, logMessage)
} else {
// If we did modify the message, then emit a custom trace message
AirbyteTraceMessageUtility.emitCustomErrorTrace(throwable.message, mangledMessage)
if (!messageWasMangled) {
// If we did not modify the message (either not a deinterpolatable class, or we
// tried to
// deinterpolate but made no changes) then emit our default trace message
AirbyteTraceMessageUtility.emitSystemErrorTrace(throwable, logMessage)
} else {
// If we did modify the message, then emit a custom trace message
AirbyteTraceMessageUtility.emitCustomErrorTrace(throwable.message, mangledMessage)
}
} catch (t: Throwable) {
LOGGER.error("exception in the exception handler", t)
} finally {
terminate()
}

terminate()
}

// by doing this in a separate method we can mock it to avoid closing the jvm and therefore test
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.34.3
version=0.34.4

0 comments on commit 29554d9

Please sign in to comment.