-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add migrations to support progress bar. #19191
Conversation
.add( | ||
DSL.field("estimated_records", SQLDataType.BIGINT.nullable(true)), | ||
DSL.field("estimated_bytes", SQLDataType.BIGINT.nullable(true)), | ||
DSL.field("stream_stats", SQLDataType.JSONB.nullable(true))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will this blob contains? Does it worth adding it in a new table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote out some detail here.
Essentially this will contain a blob that is a list of per-stream stats.
There is argument to split the per-stream stats into another table. I don't think it's necessary since all the current access patterns I know would always aggregate each record over a sync key so I don't see much value to splitting it out now. I am happy to split this out if people feel strongly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am seeing it in a the context of partial stats which could be parallel. I think that those stats will be an issue because one can erase the blob of another job. WDYT?
// As database schema changes, the generated jOOQ code can be deprecated. So | ||
// old migration may not compile if there is any generated code. | ||
final DSLContext ctx = DSL.using(context.getConnection()); | ||
ctx.alterTable("sync_stats") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the estimation will be updated? I feel it weird to have entry of this table being created before the jobs run and some after the job runs (from what I understand from the hack presentation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the current plan is for this to be updated by the workers as the sync runs - see this branch for an example.
so this entry is created as the job runs - not before!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the link. I still think that there will need some updates about how we write the stats currently, it might erase or duplicate the stats for this hob https://github.com/airbytehq/airbyte/blob/master/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java#L369
I wonder if we will need to rate limit the update later on since we potentially emit a state per record and thus put a lot of load on the DB but that's probably not a concern for this PR.
...yte/db/instance/jobs/migrations/V0_40_18_001__AddEstimatedAndStreamStatsColumnSyncStats.java
Outdated
Show resolved
Hide resolved
@gosusnp @benmoriceau from your comments I realised I could have added more details into the PR description. I have updated it to include more context on how the changes here will be used. Sorry and thanks! |
// old migration may not compile if there is any generated code. | ||
try (final DSLContext ctx = DSL.using(context.getConnection())) { | ||
addEstimatedColumnsToSyncStats(ctx); | ||
addStreamStatsTable(ctx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heads up @gosusnp I spoke to @benmoriceau and decided creating a new table now is lower effort than trying to sidestep the potential migration when we have parallel syncs. Letting you know in case you want to re-review.
// Metadata Columns | ||
final Field<UUID> id = field("id", SQLDataType.UUID.nullable(false)); | ||
final Field<Integer> attemptId = field("attempt_id", SQLDataType.INTEGER.nullable(false)); | ||
final Field<String> streamName = field("stream_name", SQLDataType.VARCHAR.nullable(false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add stream_namespace
as well since stream_name
could collision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point!
@evantahler stream_namespace is something we might need to change on the FE too.
…tehq/airbyte into davinchia/progress-bar-persistence
…nWorker. (#19360) #19191 made me realise the DefaultReplicationWorker's metric tracking today has a bug where we aren't accounting for namespace when tracking metrics today. i.e. Streams with the same name and duplicate namespace will merge metrics. While reading the code to figure out a fix, I realised we don't have a good conceptual representation of stream namespace <> name pairs within the platform today. We use a concatenated string. Though this works, it will become harder and harder to read/track as we do more operations that involve namespace i.e. progress bars and column selection. This PR introduces the AirbyteStreamNameNamespacePair object into the platform code to make it more convenient to work with Streams in the future. (Especially if we proceed with the project to make streams a first-class citizen!) The AirbyteStreamNameNamespacePair object was written to deal with the same issue of namespace <> name pair manipulation within the Java destination code. It implements the Comparable interface, which makes it convenient to use for Collections operations. For an example of how this is consumed, see #19361.
Follow up to #18953. Implement all the DB migrations required for a progress bar. The main change here is to support saving: the estimated records/bytes at the sync level the estimated records/bytes and emitted records/bytes at the stream level After this, I'll put up a PR for the persistence layer changes, which will writing to and reading from these columns. Finally, I'll wire this into the API changes, which are currently stubs. - add the estimated_records and estimated_bytes columns to the SyncStats table. - create a stream_stats table - estimated and emitted records/bytes column - contains attempt_id and stream_name columns. Unique constraints on these two columns. - foreign key to the attempt_id table. - this table hopefully sets us up for the parallel sync work.
…nWorker. (#19360) #19191 made me realise the DefaultReplicationWorker's metric tracking today has a bug where we aren't accounting for namespace when tracking metrics today. i.e. Streams with the same name and duplicate namespace will merge metrics. While reading the code to figure out a fix, I realised we don't have a good conceptual representation of stream namespace <> name pairs within the platform today. We use a concatenated string. Though this works, it will become harder and harder to read/track as we do more operations that involve namespace i.e. progress bars and column selection. This PR introduces the AirbyteStreamNameNamespacePair object into the platform code to make it more convenient to work with Streams in the future. (Especially if we proceed with the project to make streams a first-class citizen!) The AirbyteStreamNameNamespacePair object was written to deal with the same issue of namespace <> name pair manipulation within the Java destination code. It implements the Comparable interface, which makes it convenient to use for Collections operations. For an example of how this is consumed, see #19361.
Implement the persistence layer changes following #19191. This PR handles writing and reading stats to the new stream stat_table and columns in the existing sync_stats table. At the same time we introduce upserts of stats records - i.e. merge updates into a single record - in preparation for real time stats updates vs the current approach where a new stat record is always written. There will be two remaining PRs after this: - First PR will be to fully wire up and test the API. - Second PR will be to actually save stats while jobs are running.
What
Follow up to #18953.
Implement all the DB migrations required for a progress bar.
The main change here is to support saving:
After this, I'll put up a PR for the persistence layer changes, which will writing to and reading from these columns.
Finally, I'll wire this into the API changes, which are currently stubs.
To see a version of how this will be consumed please look at
How
estimated_records
andestimated_bytes
columns to the SyncStats table.add thestream_stats
column to the SyncStats table. This will contain a JSON blob with per-stream stats.attempt_id
andstream_name
columns. Unique constraints on these two columns.attempt_id
table.Recommended reading order
The one file.
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changesTests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.