-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Populate and read from SyncStats table #16476
Conversation
This reverts commit 64d024d.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall.
.set(SYNC_STATS.ATTEMPT_ID, attemptId) | ||
.set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted()) | ||
.set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted()) | ||
.set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsEmitted()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getRecordsCommitted
?
@@ -249,14 +252,34 @@ void testWriteOutput() throws IOException { | |||
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); | |||
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); | |||
final Job created = jobPersistence.getJob(jobId); | |||
final JobOutput jobOutput = new JobOutput().withOutputType(JobOutput.OutputType.DISCOVER_CATALOG); | |||
final SyncStats syncStats = | |||
new SyncStats().withBytesEmitted(100L).withRecordsEmitted(10L).withRecordsCommitted(10L).withDestinationStateMessagesEmitted(1L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Different values for recordsEmitted
and recordsCommitted
would have caught the typo.
- Populate sync stats table when job is complete - Method to read from sync stats table
- Populate sync stats table when job is complete - Method to read from sync stats table
What
This PR does a few things around the SyncStats table:
writeOutput
method in DefaultJobPersistence.java. Because we are continuing to store sync stats in the JSON blob, I put the write to sync_stats inside the transaction that writes to the sync output to the attempts table. Once we move SyncStats completely out of that JSON blob, we should have a separate method for writing to the sync stats table.Recommended reading order