Skip to content

Commit

Permalink
Add records committed to job stats (#21364)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp authored Jan 13, 2023
1 parent 02e0258 commit 0a71b01
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ private static Map<JobAttemptPair, AttemptStats> hydrateSyncStats(final String j
final var attemptStats = new HashMap<JobAttemptPair, AttemptStats>();
final var syncResults = ctx.fetch(
"SELECT atmpt.attempt_number, atmpt.job_id,"
+ "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted "
+ "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted, stats.records_committed "
+ "FROM sync_stats stats "
+ "INNER JOIN attempts atmpt ON stats.attempt_id = atmpt.id "
+ "WHERE job_id IN ( " + jobIdsStr + ");");
Expand All @@ -543,6 +543,7 @@ private static Map<JobAttemptPair, AttemptStats> hydrateSyncStats(final String j
final var syncStats = new SyncStats()
.withBytesEmitted(r.get(SYNC_STATS.BYTES_EMITTED))
.withRecordsEmitted(r.get(SYNC_STATS.RECORDS_EMITTED))
.withRecordsCommitted(r.get(SYNC_STATS.RECORDS_COMMITTED))
.withEstimatedRecords(r.get(SYNC_STATS.ESTIMATED_RECORDS))
.withEstimatedBytes(r.get(SYNC_STATS.ESTIMATED_BYTES));
attemptStats.put(key, new AttemptStats(syncStats, Lists.newArrayList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,16 @@ private void hydrateWithStats(final AttemptRead a, final JobPersistence.AttemptS
.estimatedBytes(combinedStats.getEstimatedBytes())
.estimatedRecords(combinedStats.getEstimatedRecords())
.bytesEmitted(combinedStats.getBytesEmitted())
.recordsEmitted(combinedStats.getRecordsEmitted());
.recordsEmitted(combinedStats.getRecordsEmitted())
.recordsCommitted(combinedStats.getRecordsCommitted());

final var streamStats = attemptStats.perStreamStats().stream().map(s -> new AttemptStreamStats()
.streamName(s.getStreamName())
.streamNamespace(s.getStreamNamespace())
.stats(new AttemptStats()
.bytesEmitted(s.getStats().getBytesEmitted())
.recordsEmitted(s.getStats().getRecordsEmitted())
.recordsCommitted(s.getStats().getRecordsCommitted())
.estimatedBytes(s.getStats().getEstimatedBytes())
.estimatedRecords(s.getStats().getEstimatedRecords())))
.collect(Collectors.toList());
Expand Down

0 comments on commit 0a71b01

Please sign in to comment.