Skip to content

Commit

Permalink
Better attribution of DBZ closing reason (#32686)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Nov 29, 2023
1 parent 6fb2591 commit e11d765
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 15 deletions.
3 changes: 2 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ MavenLocal debugging steps:
### Java CDK

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
|:--------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.5.3 | 2023-11-28 | [\#32686](https://github.com/airbytehq/airbyte/pull/32686) | Better attribution of debezium engine shutdown due to heartbeat. |
| 0.5.1 | 2023-11-27 | [\#32662](https://github.com/airbytehq/airbyte/pull/32662) | Debezium initialization wait time will now read from initial setup time. |
| 0.5.0 | 2023-11-22 | [\#32656](https://github.com/airbytehq/airbyte/pull/32656) | Introduce TestDatabase test fixture, refactor database source test base classes. |
| 0.4.11 | 2023-11-14 | [\#32526](https://github.com/airbytehq/airbyte/pull/32526) | Clean up memory manager logs. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.5.1
version=0.5.3
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,12 @@ protected ChangeEventWithMetadata computeNext() {
final T heartbeatPos = getHeartbeatPosition(next);
// wrap up sync if heartbeat position crossed the target OR heartbeat position hasn't changed for
// too long
if (hasSyncFinished(heartbeatPos)) {
requestClose("Closing: Heartbeat indicates sync is done");
if (targetPosition.reachedTargetPosition(heartbeatPos)) {
requestClose("Closing: Heartbeat indicates sync is done by reaching the target position");
} else if (heartbeatPos.equals(this.lastHeartbeatPosition) && heartbeatPosNotChanging()) {
requestClose("Closing: Heartbeat indicates sync is not progressing");
}

if (!heartbeatPos.equals(lastHeartbeatPosition)) {
this.tsLastHeartbeat = LocalDateTime.now();
this.lastHeartbeatPosition = heartbeatPos;
Expand Down Expand Up @@ -150,7 +153,7 @@ protected ChangeEventWithMetadata computeNext() {
final ChangeEvent<String, String> event;
try {
event = debeziumShutdownProcedure.getRecordsRemainingAfterShutdown().poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
if (event == null || isHeartbeatEvent(event)) {
Expand All @@ -164,11 +167,6 @@ protected ChangeEventWithMetadata computeNext() {
return endOfData();
}

private boolean hasSyncFinished(final T heartbeatPos) {
return targetPosition.reachedTargetPosition(heartbeatPos)
|| (heartbeatPos.equals(this.lastHeartbeatPosition) && heartbeatPosNotChanging());
}

/**
* Debezium was built as an ever running process which keeps on listening for new changes on DB and
* immediately processing them. Airbyte needs debezium to work as a start stop mechanism. In order
Expand All @@ -195,8 +193,11 @@ private boolean isHeartbeatEvent(final ChangeEvent<String, String> event) {
}

private boolean heartbeatPosNotChanging() {
if (this.tsLastHeartbeat == null) {
return false;
}
final Duration timeElapsedSinceLastHeartbeatTs = Duration.between(this.tsLastHeartbeat, LocalDateTime.now());
LOGGER.debug("Time since last hb_pos change {}s", timeElapsedSinceLastHeartbeatTs.toSeconds());
LOGGER.info("Time since last hb_pos change {}s", timeElapsedSinceLastHeartbeatTs.toSeconds());
// wait time for no change in heartbeat position is half of initial waitTime
return timeElapsedSinceLastHeartbeatTs.compareTo(this.firstRecordWaitTime.dividedBy(2)) > 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public boolean reachedTargetPosition(final ChangeEventWithMetadata changeEventWi
return true;
} else {
final PgLsn eventLsn = extractLsn(changeEventWithMetadata.eventValueAsJson());
boolean isEventLSNAfter = targetLsn.compareTo(eventLsn) <= 0;
final boolean isEventLSNAfter = targetLsn.compareTo(eventLsn) <= 0;
if (isEventLSNAfter) {
LOGGER.info("Signalling close because record's LSN : " + eventLsn + " is after target LSN : " + targetLsn);
}
Expand All @@ -72,7 +72,11 @@ public boolean reachedTargetPosition(final ChangeEventWithMetadata changeEventWi

@Override
public boolean reachedTargetPosition(final Long positionFromHeartbeat) {
return positionFromHeartbeat != null && positionFromHeartbeat.compareTo(targetLsn.asLong()) >= 0;
final boolean reachedTargetPosition = positionFromHeartbeat != null && positionFromHeartbeat.compareTo(targetLsn.asLong()) >= 0;
if (reachedTargetPosition) {
LOGGER.info("Signalling close because heartbeat LSN : " + positionFromHeartbeat + " is after target LSN : " + targetLsn);
}
return reachedTargetPosition;
}

private PgLsn extractLsn(final JsonNode valueAsJson) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.5.0'
cdkVersionRequired = '0.5.3'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.2.23
dockerImageTag: 3.2.24
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.2.24 | 2023-11-28 | [32686](https://github.com/airbytehq/airbyte/pull/32686) | Better logging to understand dbz closing reason attribution. |
| 3.2.23 | 2023-11-28 | [32891](https://github.com/airbytehq/airbyte/pull/32891) | Fix CDK dependency in build. |
| 3.2.22 | 2023-11-22 | [32656](https://github.com/airbytehq/airbyte/pull/32656) | Adopt java CDK version 0.5.0. |
| 3.2.21 | 2023-11-07 | [31856](https://github.com/airbytehq/airbyte/pull/31856) | handle date/timestamp infinity values properly |
Expand Down

0 comments on commit e11d765

Please sign in to comment.