-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Progress Bar Estimate #19814
Progress Bar Estimate #19814
Conversation
.withType(io.airbyte.protocol.models.AirbyteTraceMessage.Type.ERROR) | ||
.withEmittedAt(emittedAt) | ||
.withError(new AirbyteErrorTraceMessage().withMessage(message)); | ||
return createErrorTraceMessage(message, emittedAt, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consolidate the various trace message creation into one function to avoid duplication.
airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java
Outdated
Show resolved
Hide resolved
I have updated the logic and tests to account for the difference between Stream and Sync estimates! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good in principle, left some questions.
private final StateDeltaTracker stateDeltaTracker; | ||
private final StateMetricsTracker stateMetricsTracker; | ||
private final List<AirbyteTraceMessage> destinationErrorTraceMessages; | ||
private final List<AirbyteTraceMessage> sourceErrorTraceMessages; | ||
private final StateAggregator stateAggregator; | ||
private final boolean logConnectorMessages = new EnvVariableFeatureFlags().logConnectorMessages(); | ||
|
||
// These variables support SYNC level estimates and are meant for sources where stream level | ||
// estimates are not possible e.g. CDC sources. | ||
private Long totalRecordsEstimatedSync; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we are adding a new concept of Sync vs Stream, instead, would it make sense to use STREAM vs GLOBAL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm following from the AirbyteTraceEstimateMessage type. Slight preference to keep the names consistent though happy to change the name within this file to Global.
With the AirbyteTraceEstiamteMessage, I do think STREAM vs SYNC are decent names for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking about consistency for the near future, Sync feels very coupled to how we current run the replication. For STREAM, I think we want to modify our current replication model to be able to replicate data at stream level rather than connection. If we are running per stream replication in parallel, Sync mode for progress looks outdated.
It probably feels more like future-proofing, I don't have reasons to feel strongly either way at this moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with your thoughts. Honestly, not 100% how the stream-as-first-class citizen refactor will play with CDC w.r.t the stats here.
In that case, I'm guessing we will be able to do away with the SYNC field. I'm inclined to keep this as is for now and revisit when we cross that. How does that sound?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me, conceptually, I think we are tracking the right way, we can address the naming later on.
airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java
Outdated
Show resolved
Hide resolved
airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java
Show resolved
Hide resolved
airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java
Show resolved
Hide resolved
Follow up from ##19814, where we introduced the StreamStats object to consolidate/simplify some of the stats memory objects. In this PR, we extend the StreamStats object to also include the emitted records and bytes. - Make StreamStats into a proper object. We cannot use a record as record fields are immutable. We need mutable fields to count. - Consolidate the emitted records into StreamStats. - Take the chance to move all the stats/metrics related classes into a book_keeping package to keep things clean.
What
Implement estimate message processing allowing the platform to hold on to estimate message counts in memory.
The estimate message is protocol message connectors can choose to emit to provide support for progress bar calculations. There are two kinds of estimates, per-Sync or per-Stream. Sources cannot emit both types in a single sync.
Per-stream estimates are what we usually expect. Per-sync estimates are for sources that cannot provide more granular estimates for whatever reasons e.g. CDC sources.
In a follow up PR, the platform will periodically save these messages through the save stats api.
How
getStreamToEstimatedRecords
,getStreamToEstimatedBytes
,getTotalRecordsEstimated
andgetTotalBytesEstimated
to theMessageTracker
interface.Also took the chance to clean up some of the testing code.
Recommended reading order
MessageTracker.java
andAirbyteMessageTracker.java
for the interface changes.AirbyteMessageTrackerTest.java
for tests.🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changesTests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.