Skip to content

Commit

Permalink
null safety in debezium to airbyte message generation (#41622)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich authored Jul 23, 2024
1 parent 9daa5ce commit 250c02d
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 85 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.43.1 | 2024-07-22 | [\#41622](https://github.com/airbytehq/airbyte/pull/41622) | Fix null safety bug in debezium event processing |
| 0.43.0 | 2024-07-17 | [\#41954](https://github.com/airbytehq/airbyte/pull/41954) | fix refreshes for connectors using the old SqlOperations |
| 0.43.0 | 2024-07-17 | [\#42017](https://github.com/airbytehq/airbyte/pull/42017) | bump postgres-jdbc version |
| 0.43.0 | 2024-07-17 | [\#42015](https://github.com/airbytehq/airbyte/pull/42015) | wait until migration before creating the Writeconfig objects |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.43.0
version=0.43.1
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,33 @@ class RelationalDbDebeziumEventConverter(
) : DebeziumEventConverter {
override fun toAirbyteMessage(event: ChangeEventWithMetadata): AirbyteMessage {
val debeziumEvent = event.eventValueAsJson()
val before: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
val after: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.AFTER_EVENT)
val source: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.SOURCE_EVENT)
val before: JsonNode? = debeziumEvent.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
val after: JsonNode? = debeziumEvent.get(DebeziumEventConverter.Companion.AFTER_EVENT)
val source: JsonNode =
checkNotNull(debeziumEvent.get(DebeziumEventConverter.Companion.SOURCE_EVENT)) {
"ChangeEvent contains no source record $debeziumEvent"
}

if (listOf(before, after).all { it == null }) {
throw IllegalStateException(
"ChangeEvent contains no before or after records $debeziumEvent"
)
}
// Value of before and after may be a null or a NullNode object,
// representing a "null" in json
val baseNode =
when (after?.isNull == true) {
true -> before
false -> after
}
as ObjectNode

val baseNode = (if (after.isNull) before else after) as ObjectNode
val data: JsonNode =
DebeziumEventConverter.Companion.addCdcMetadata(
baseNode,
source,
cdcMetadataInjector,
after.isNull
after?.isNull == true
)
return DebeziumEventConverter.Companion.buildAirbyteMessage(
source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.42.4'
cdkVersionRequired = '0.43.1'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.6.5
dockerImageTag: 3.6.6
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Loading

0 comments on commit 250c02d

Please sign in to comment.