-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add count in state message for incremental syncs #33005
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
...yte-cdk/airbyte-commons/src/main/java/io/airbyte/commons/version/AirbyteProtocolVersion.java
Outdated
Show resolved
Hide resolved
...ain/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.java
Show resolved
Hide resolved
...-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.java
Outdated
Show resolved
Hide resolved
cursorInfo.map(CursorInfo::getCursor).orElse(null), | ||
cursorInfo.map(CursorInfo::getCursorRecordCount).orElse(null)); | ||
} | ||
LOGGER.info("State report for stream {} - original: {} = {} (count {}) -> latest: {} = {} (count {})", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably want to keep the reduced verbosity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we still keep the reduced verbosity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry missed this one before. fixed.
...s/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.java
Outdated
Show resolved
Hide resolved
if (trackSchemaHistory && schemaHistoryManager == null) { | ||
throw new RuntimeException("Schema History Tracking is true but manager is not initialised"); | ||
} | ||
if (offsetManager == null) { | ||
throw new RuntimeException("Offset can not be null"); | ||
} | ||
|
||
return cdcStateHandler.saveState(offset, schemaHistoryManager != null ? schemaHistoryManager.read() : null); | ||
final AirbyteMessage message = cdcStateHandler.saveState(offset, schemaHistoryManager != null ? schemaHistoryManager.read() : null); | ||
message.getState().withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious why is recordCount a double? seems like it can only be an integer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protocol defined this as a "number" in Json, which translated to double value in java thus here comes casting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small nits
/publish-java-cdk |
/publish-java-cdk
|
Co-authored-by: xiaohansong <xiaohansong@users.noreply.github.com>
Co-authored-by: xiaohansong <xiaohansong@users.noreply.github.com>
What
Previous merged PR #33312 will only send counts in initial syncs. To send counts in follow up incremental syncs we need this PR to be merged in
How
Base class will have counts stat added into state message; and those base classes will be inherited by MySQL incremental iterator.
Recommended reading order
x.java
y.python
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user?
For connector PRs, use this section to explain which type of semantic versioning bump occurs as a result of the changes. Refer to our Semantic Versioning for Connectors guidelines for more information. Breaking changes to connectors must be documented by an Airbyte engineer (PR author, or reviewer for community PRs) by using the Breaking Change Release Playbook.
If there are breaking changes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Actions
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.0.0.1
Dockerfile
has version0.0.1
README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog with an entry for the initial version. See changelog exampledocs/integrations/README.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
Updating a connector
Community member or Airbyter
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
Connector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:generateScaffolds
then checking in your changesUpdating the Python CDK
Airbyter
Before merging:
--use-local-cdk --name=source-<connector>
as optionsairbyte-ci connectors --use-local-cdk --name=source-<connector> test
After merging: