From 0a71b019b52a64d58cef6795d4194f0e10cdc86a Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Thu, 12 Jan 2023 17:51:51 -0800 Subject: [PATCH] Add records committed to job stats (#21364) --- .../io/airbyte/persistence/job/DefaultJobPersistence.java | 3 ++- .../java/io/airbyte/server/handlers/JobHistoryHandler.java | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index 2dabf08f5672..c5ea5d7b1cc2 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -534,7 +534,7 @@ private static Map hydrateSyncStats(final String j final var attemptStats = new HashMap(); final var syncResults = ctx.fetch( "SELECT atmpt.attempt_number, atmpt.job_id," - + "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " + + "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted, stats.records_committed " + "FROM sync_stats stats " + "INNER JOIN attempts atmpt ON stats.attempt_id = atmpt.id " + "WHERE job_id IN ( " + jobIdsStr + ");"); @@ -543,6 +543,7 @@ private static Map hydrateSyncStats(final String j final var syncStats = new SyncStats() .withBytesEmitted(r.get(SYNC_STATS.BYTES_EMITTED)) .withRecordsEmitted(r.get(SYNC_STATS.RECORDS_EMITTED)) + .withRecordsCommitted(r.get(SYNC_STATS.RECORDS_COMMITTED)) .withEstimatedRecords(r.get(SYNC_STATS.ESTIMATED_RECORDS)) .withEstimatedBytes(r.get(SYNC_STATS.ESTIMATED_BYTES)); attemptStats.put(key, new AttemptStats(syncStats, Lists.newArrayList())); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java index 4cc1380525e1..d12fdad8a77c 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java @@ -168,7 +168,8 @@ private void hydrateWithStats(final AttemptRead a, final JobPersistence.AttemptS .estimatedBytes(combinedStats.getEstimatedBytes()) .estimatedRecords(combinedStats.getEstimatedRecords()) .bytesEmitted(combinedStats.getBytesEmitted()) - .recordsEmitted(combinedStats.getRecordsEmitted()); + .recordsEmitted(combinedStats.getRecordsEmitted()) + .recordsCommitted(combinedStats.getRecordsCommitted()); final var streamStats = attemptStats.perStreamStats().stream().map(s -> new AttemptStreamStats() .streamName(s.getStreamName()) @@ -176,6 +177,7 @@ private void hydrateWithStats(final AttemptRead a, final JobPersistence.AttemptS .stats(new AttemptStats() .bytesEmitted(s.getStats().getBytesEmitted()) .recordsEmitted(s.getStats().getRecordsEmitted()) + .recordsCommitted(s.getStats().getRecordsCommitted()) .estimatedBytes(s.getStats().getEstimatedBytes()) .estimatedRecords(s.getStats().getEstimatedRecords()))) .collect(Collectors.toList());