Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make sure AirbyteExceptionHandler always terminates #38122

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ corresponds to that version.
### Java CDK

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.34.3 | 2024-05-10 | [\#38095](https://github.com/airbytehq/airbyte/pull/38095) | Minor changes for databricks connector |
| 0.34.1 | 2024-05-07 | [\#38030](https://github.com/airbytehq/airbyte/pull/38030) | Add support for transient errors |
| 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,51 +26,58 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
// the sync."
// from the spec:
// https://docs.google.com/document/d/1ctrj3Yh_GjtQ93aND-WH3ocqGxsmxyC3jfiarrF6NY0/edit#
LOGGER.error(logMessage, throwable)
// Attempt to deinterpolate the error message before emitting a trace message
val mangledMessage: String?
// If any exception in the chain is of a deinterpolatable type, find it and deinterpolate
// its
// message.
// This assumes that any wrapping exceptions are just noise (e.g. runtime exception).
val deinterpolatableException =
ExceptionUtils.getThrowableList(throwable)
.stream()
.filter { t: Throwable ->
THROWABLES_TO_DEINTERPOLATE.stream().anyMatch {
deinterpolatableClass: Class<out Throwable> ->
deinterpolatableClass.isAssignableFrom(t.javaClass)
try {
LOGGER.error(logMessage, throwable)
// Attempt to deinterpolate the error message before emitting a trace message
val mangledMessage: String?
// If any exception in the chain is of a deinterpolatable type, find it and
// deinterpolate
// its
// message.
// This assumes that any wrapping exceptions are just noise (e.g. runtime exception).
val deinterpolatableException =
ExceptionUtils.getThrowableList(throwable)
.stream()
.filter { t: Throwable ->
THROWABLES_TO_DEINTERPOLATE.stream().anyMatch {
deinterpolatableClass: Class<out Throwable> ->
deinterpolatableClass.isAssignableFrom(t.javaClass)
}
}
}
.findFirst()
val messageWasMangled: Boolean
if (deinterpolatableException.isPresent) {
val originalMessage = deinterpolatableException.get().message
mangledMessage =
STRINGS_TO_DEINTERPOLATE
.stream() // Sort the strings longest to shortest, in case any target string is
// a substring of another
// e.g. "airbyte_internal" should be swapped out before "airbyte"
.sorted(Comparator.comparing { obj: String -> obj.length }.reversed())
.reduce(originalMessage) { message: String?, targetString: String? ->
deinterpolate(message, targetString)
}
messageWasMangled = mangledMessage != originalMessage
} else {
mangledMessage = throwable.message
messageWasMangled = false
}
.findFirst()
val messageWasMangled: Boolean
if (deinterpolatableException.isPresent) {
val originalMessage = deinterpolatableException.get().message
mangledMessage =
STRINGS_TO_DEINTERPOLATE
.stream() // Sort the strings longest to shortest, in case any target string
// is
// a substring of another
// e.g. "airbyte_internal" should be swapped out before "airbyte"
.sorted(Comparator.comparing { obj: String -> obj.length }.reversed())
.reduce(originalMessage) { message: String?, targetString: String? ->
deinterpolate(message, targetString)
}
messageWasMangled = mangledMessage != originalMessage
} else {
mangledMessage = throwable.message
messageWasMangled = false
}

if (!messageWasMangled) {
// If we did not modify the message (either not a deinterpolatable class, or we tried to
// deinterpolate but made no changes) then emit our default trace message
AirbyteTraceMessageUtility.emitSystemErrorTrace(throwable, logMessage)
} else {
// If we did modify the message, then emit a custom trace message
AirbyteTraceMessageUtility.emitCustomErrorTrace(throwable.message, mangledMessage)
if (!messageWasMangled) {
// If we did not modify the message (either not a deinterpolatable class, or we
// tried to
// deinterpolate but made no changes) then emit our default trace message
AirbyteTraceMessageUtility.emitSystemErrorTrace(throwable, logMessage)
} else {
// If we did modify the message, then emit a custom trace message
AirbyteTraceMessageUtility.emitCustomErrorTrace(throwable.message, mangledMessage)
}
} catch (t: Throwable) {
LOGGER.error("exception in the exception handler", t)
} finally {
terminate()
}

terminate()
}

// by doing this in a separate method we can mock it to avoid closing the jvm and therefore test
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.34.3
version=0.34.4
Loading