Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java CDK] Fix java interop compilation issue in Config/TransientErrorException. #41996

Merged
merged 13 commits into from
Jul 17, 2024
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.41.5 | 2024-07-16 | [\#41996](https://github.com/airbytehq/airbyte/pull/41996) | Add back dbz heartbeats + fix java interop compilation issue in Config/TransientErrorException. |
| 0.41.4 | 2024-07-15 | [\#41959](https://github.com/airbytehq/airbyte/pull/41959) | Allow setting `internal_message` in Config/TransientErrorException. Destinations: shorten error message for INCOMPLETE stream status. |
| 0.41.3 | 2024-07-15 | [\#41680](https://github.com/airbytehq/airbyte/pull/41680) | Fix: CompletableFutures.allOf now handles empty list and `Throwable` |
| 0.41.2 | 2024-07-12 | [\#40567](https://github.com/airbytehq/airbyte/pull/40567) | Fix BaseSqlGenerator test case (generation_id support); update minimum platform version for refreshes support. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.41.4
version=0.41.5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that due to #41647 DebeziumRecordIterator.isHeartbeatEvent() is currently broken.
Not sure if limited to a driver or a version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this PR wont help that case. Then again it won't handle or make anything worse.

Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,8 @@ class DebeziumRecordIterator<T>(
private fun heartbeatPosNotChanging(): Boolean {
if (this.tsLastHeartbeat == null) {
return false
} else if (!isTest() && receivedFirstRecord) {
akashkulk marked this conversation as resolved.
Show resolved Hide resolved
// Closing debezium due to heartbeat position not changing only exists as an escape
// hatch
// for testing setups. In production, we rely on the platform heartbeats to kill the
// sync
// ONLY if we haven't received a record from Debezium. If a record has not been received
// from Debezium and the heartbeat isn't changing, the sync should be shut down due to
// heartbeat position not changing.
return false
}

val timeElapsedSinceLastHeartbeatTs =
Duration.between(this.tsLastHeartbeat, LocalDateTime.now())
return timeElapsedSinceLastHeartbeatTs.compareTo(firstRecordWaitTime) > 0
Expand All @@ -288,10 +280,6 @@ class DebeziumRecordIterator<T>(
}
}

private fun isTest(): Boolean {
return config.has("is_test") && config["is_test"].asBoolean()
}

/**
* [DebeziumRecordIterator.heartbeatEventSourceField] acts as a cache so that we avoid using
* reflection to setAccessible for each event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ class ConfigErrorException : RuntimeException {
val internalMessage: String
val displayMessage: String

@JvmOverloads
constructor(displayMessage: String, internalMessage: String = "") : super(displayMessage) {
this.displayMessage = displayMessage
this.internalMessage = internalMessage
}

@JvmOverloads
constructor(
displayMessage: String,
exception: Throwable?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ package io.airbyte.commons.exceptions
class TransientErrorException : RuntimeException {
val internalMessage: String

@JvmOverloads
constructor(displayMessage: String, internalMessage: String = "") : super(displayMessage) {
this.internalMessage = internalMessage
}

@JvmOverloads
constructor(
displayMessage: String,
exception: Throwable?,
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.41.0'
features = ['db-sources']
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.5.0
dockerImageTag: 3.5.1
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ public void close() throws Exception {
}
}

@Override
public Optional<AirbyteStreamNameNamespacePair> getAirbyteStream() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the problem/ticket resulting from the lack of this implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default implementation of getAirbyteStream() is to return Optional.empty() so each iterator that implements the AirbyteStreamAware interface needs to implement this method.

I noticed this during testing that we were throwing a NoSuchElementException instead of the relevant TransientException

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a similar change required on other connectors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That will have been reflected in the PRs associated with WASS changes in the relevant connectors

return Optional.of(pair);
}

private boolean isCdcSync(MySqlInitialLoadStateManager initialLoadStateManager) {
if (initialLoadStateManager instanceof MySqlInitialLoadGlobalStateManager) {
LOGGER.info("Running a cdc sync");
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.5.1 | 2024-07-16 | [41996](https://github.com/airbytehq/airbyte/pull/41996) | Add back dbz heartbeat timeout. |
| 3.5.0 | 2024-07-11 | [38240](https://github.com/airbytehq/airbyte/pull/38240) | Implement WASS. |
| 3.4.12 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz heartbeat. |
| 3.4.11 | 2024-06-26 | [40561](https://github.com/airbytehq/airbyte/pull/40561) | Support PlanetScale MySQL's per-query row limit. |
Expand Down
Loading