From 836e2e5b66e6ac21432b8256356d37aad0f6f735 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Fri, 6 Jan 2023 14:19:41 -0800 Subject: [PATCH] Remerge Progress Bar Read API. (#21124) Let's try #20937 again, this time with better test for error cases. See original PR for description. This PR adds testing and logic to handle empty/bad job input. --- .../job/DefaultJobPersistence.java | 78 ++++++++- .../persistence/job/JobPersistence.java | 15 ++ .../persistence/job/models/Attempt.java | 16 +- .../job/DefaultJobPersistenceTest.java | 152 +++++++++++++----- .../persistence/job/models/AttemptTest.java | 2 +- .../persistence/job/models/JobTest.java | 2 +- .../server/converters/JobConverter.java | 2 +- .../server/handlers/JobHistoryHandler.java | 68 +++++++- .../server/converters/JobConverterTest.java | 6 +- .../handlers/JobHistoryHandlerTest.java | 82 +++++++--- ...obCreationAndStatusUpdateActivityImpl.java | 5 +- 11 files changed, 341 insertions(+), 87 deletions(-) diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index 1c5dd8253cdb..2dabf08f5672 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.collect.UnmodifiableIterator; import io.airbyte.commons.enums.Enums; @@ -72,6 +73,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; import org.jooq.DSLContext; import org.jooq.Field; import org.jooq.InsertValuesStepN; @@ -511,6 +513,76 @@ public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) t }); } + @Override + public Map getAttemptStats(final List jobIds) throws IOException { + if (jobIds == null || jobIds.isEmpty()) { + return Map.of(); + } + + final var jobIdsStr = StringUtils.join(jobIds, ','); + return jobDatabase.query(ctx -> { + // Instead of one massive join query, separate this query into two queries for better readability + // for now. + // We can combine the queries at a later date if this still proves to be not efficient enough. + final Map attemptStats = hydrateSyncStats(jobIdsStr, ctx); + hydrateStreamStats(jobIdsStr, ctx, attemptStats); + return attemptStats; + }); + } + + private static Map hydrateSyncStats(final String jobIdsStr, final DSLContext ctx) { + final var attemptStats = new HashMap(); + final var syncResults = ctx.fetch( + "SELECT atmpt.attempt_number, atmpt.job_id," + + "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " + + "FROM sync_stats stats " + + "INNER JOIN attempts atmpt ON stats.attempt_id = atmpt.id " + + "WHERE job_id IN ( " + jobIdsStr + ");"); + syncResults.forEach(r -> { + final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER)); + final var syncStats = new SyncStats() + .withBytesEmitted(r.get(SYNC_STATS.BYTES_EMITTED)) + .withRecordsEmitted(r.get(SYNC_STATS.RECORDS_EMITTED)) + .withEstimatedRecords(r.get(SYNC_STATS.ESTIMATED_RECORDS)) + .withEstimatedBytes(r.get(SYNC_STATS.ESTIMATED_BYTES)); + attemptStats.put(key, new AttemptStats(syncStats, Lists.newArrayList())); + }); + return attemptStats; + } + + /** + * This method needed to be called after + * {@link DefaultJobPersistence#hydrateSyncStats(String, DSLContext)} as it assumes hydrateSyncStats + * has prepopulated the map. + */ + private static void hydrateStreamStats(final String jobIdsStr, final DSLContext ctx, final Map attemptStats) { + final var streamResults = ctx.fetch( + "SELECT atmpt.attempt_number, atmpt.job_id, " + + "stats.stream_name, stats.stream_namespace, stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " + + "FROM stream_stats stats " + + "INNER JOIN attempts atmpt ON atmpt.id = stats.attempt_id " + + "WHERE attempt_id IN " + + "( SELECT id FROM attempts WHERE job_id IN ( " + jobIdsStr + "));"); + + streamResults.forEach(r -> { + final var streamSyncStats = new StreamSyncStats() + .withStreamNamespace(r.get(STREAM_STATS.STREAM_NAMESPACE)) + .withStreamName(r.get(STREAM_STATS.STREAM_NAME)) + .withStats(new SyncStats() + .withBytesEmitted(r.get(STREAM_STATS.BYTES_EMITTED)) + .withRecordsEmitted(r.get(STREAM_STATS.RECORDS_EMITTED)) + .withEstimatedRecords(r.get(STREAM_STATS.ESTIMATED_RECORDS)) + .withEstimatedBytes(r.get(STREAM_STATS.ESTIMATED_BYTES))); + + final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER)); + if (!attemptStats.containsKey(key)) { + LOGGER.error("{} stream stats entry does not have a corresponding sync stats entry. This suggest the database is in a bad state.", key); + return; + } + attemptStats.get(key).perStreamStats().add(streamSyncStats); + }); + } + @Override public List getNormalizationSummary(final long jobId, final int attemptNumber) throws IOException { return jobDatabase @@ -528,6 +600,10 @@ static Long getAttemptId(final long jobId, final int attemptNumber, final DSLCon final Optional record = ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId, attemptNumber).stream().findFirst(); + if (record.isEmpty()) { + return -1L; + } + return record.get().get("id", Long.class); } @@ -849,7 +925,7 @@ private static Job getJobFromRecord(final Record record) { private static Attempt getAttemptFromRecord(final Record record) { return new Attempt( - record.get(ATTEMPT_NUMBER, Long.class), + record.get(ATTEMPT_NUMBER, int.class), record.get(JOB_ID, Long.class), Path.of(record.get("log_path", String.class)), record.get("attempt_output", String.class) == null ? null : Jsons.deserialize(record.get("attempt_output", String.class), JobOutput.class), diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java index 895382c40b20..da7b3a98474e 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java @@ -49,6 +49,8 @@ public interface JobPersistence { */ record AttemptStats(SyncStats combinedStats, List perStreamStats) {} + record JobAttemptPair(long id, int attemptNumber) {} + /** * Retrieve the combined and per stream stats for a single attempt. * @@ -57,6 +59,19 @@ record AttemptStats(SyncStats combinedStats, List perStreamStat */ AttemptStats getAttemptStats(long jobId, int attemptNumber) throws IOException; + /** + * Alternative method to retrieve combined and per stream stats per attempt for a list of jobs to + * avoid overloading the database with too many queries. + *

+ * This implementation is intended to utilise complex joins under the hood to reduce the potential + * N+1 database pattern. + * + * @param jobIds + * @return + * @throws IOException + */ + Map getAttemptStats(List jobIds) throws IOException; + List getNormalizationSummary(long jobId, int attemptNumber) throws IOException; Job getJob(long jobId) throws IOException; diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java index 110deaecab7b..a3dc08b076d2 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java @@ -13,7 +13,7 @@ public class Attempt { - private final long id; + private final int attemptNumber; private final long jobId; private final JobOutput output; private final AttemptStatus status; @@ -23,7 +23,7 @@ public class Attempt { private final long createdAtInSecond; private final Long endedAtInSecond; - public Attempt(final long id, + public Attempt(final int attemptNumber, final long jobId, final Path logPath, final @Nullable JobOutput output, @@ -32,7 +32,7 @@ public Attempt(final long id, final long createdAtInSecond, final long updatedAtInSecond, final @Nullable Long endedAtInSecond) { - this.id = id; + this.attemptNumber = attemptNumber; this.jobId = jobId; this.output = output; this.status = status; @@ -43,8 +43,8 @@ public Attempt(final long id, this.endedAtInSecond = endedAtInSecond; } - public long getId() { - return id; + public int getAttemptNumber() { + return attemptNumber; } public long getJobId() { @@ -92,7 +92,7 @@ public boolean equals(final Object o) { return false; } final Attempt attempt = (Attempt) o; - return id == attempt.id && + return attemptNumber == attempt.attemptNumber && jobId == attempt.jobId && updatedAtInSecond == attempt.updatedAtInSecond && createdAtInSecond == attempt.createdAtInSecond && @@ -105,13 +105,13 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(id, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond); + return Objects.hash(attemptNumber, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond); } @Override public String toString() { return "Attempt{" + - "id=" + id + + "id=" + attemptNumber + ", jobId=" + jobId + ", output=" + output + ", status=" + status + diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java index 0b48aaf036c4..a06422c25bd4 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java @@ -14,6 +14,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -48,6 +49,7 @@ import io.airbyte.db.instance.jobs.JobsDatabaseSchema; import io.airbyte.db.instance.test.TestDatabaseProviders; import io.airbyte.persistence.job.JobPersistence.AttemptStats; +import io.airbyte.persistence.job.JobPersistence.JobAttemptPair; import io.airbyte.persistence.job.models.Attempt; import io.airbyte.persistence.job.models.AttemptStatus; import io.airbyte.persistence.job.models.AttemptWithJobInfo; @@ -142,7 +144,7 @@ static void dbDown() { container.close(); } - private static Attempt createAttempt(final long id, final long jobId, final AttemptStatus status, final Path logPath) { + private static Attempt createAttempt(final int id, final long jobId, final AttemptStatus status, final Path logPath) { return new Attempt( id, jobId, @@ -155,7 +157,7 @@ private static Attempt createAttempt(final long id, final long jobId, final Atte NOW.getEpochSecond()); } - private static Attempt createUnfinishedAttempt(final long id, final long jobId, final AttemptStatus status, final Path logPath) { + private static Attempt createUnfinishedAttempt(final int id, final long jobId, final AttemptStatus status, final Path logPath) { return new Attempt( id, jobId, @@ -238,7 +240,7 @@ void testCompleteAttemptFailed() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.INCOMPLETE, - Lists.newArrayList(createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH)), + Lists.newArrayList(createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -256,7 +258,7 @@ void testCompleteAttemptSuccess() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.SUCCEEDED, - Lists.newArrayList(createAttempt(0L, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)), + Lists.newArrayList(createAttempt(0, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -326,8 +328,8 @@ void testWriteAttemptFailureSummary() throws IOException { } @Nested - @DisplayName("Test writing in progress stats") - class WriteStats { + @DisplayName("Stats Related Tests") + class Stats { @Test @DisplayName("Writing stats the first time should only write record and bytes information correctly") @@ -455,6 +457,81 @@ void testWriteNullNamespace() throws IOException { assertEquals(streamStats, actStreamStats); } + @Test + @DisplayName("Writing multiple stats a stream with null namespace should write correctly without exceptions") + void testGetStatsNoResult() throws IOException { + final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); + final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); + + final AttemptStats stats = jobPersistence.getAttemptStats(jobId, attemptNumber); + assertNull(stats.combinedStats()); + assertEquals(0, stats.perStreamStats().size()); + + } + + @Test + @DisplayName("Retrieving all attempts stats for a job should return the right information") + void testGetMultipleStats() throws IOException { + final long jobOneId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); + final int jobOneAttemptNumberOne = jobPersistence.createAttempt(jobOneId, LOG_PATH); + + // First write for first attempt. + var streamStats = List.of( + new StreamSyncStats().withStreamName("name1") + .withStats(new SyncStats().withBytesEmitted(500L).withRecordsEmitted(500L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); + jobPersistence.writeStats(jobOneId, jobOneAttemptNumberOne, 1000, 1000, 1000, 1000, streamStats); + + // Second write for first attempt. This is the record that should be returned. + when(timeSupplier.get()).thenReturn(Instant.now()); + streamStats = List.of( + new StreamSyncStats().withStreamName("name1") + .withStats(new SyncStats().withBytesEmitted(1000L).withRecordsEmitted(1000L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); + jobPersistence.writeStats(jobOneId, jobOneAttemptNumberOne, 2000, 2000, 2000, 2000, streamStats); + jobPersistence.failAttempt(jobOneId, jobOneAttemptNumberOne); + + // Second attempt for first job. + final int jobOneAttemptNumberTwo = jobPersistence.createAttempt(jobOneId, LOG_PATH); + jobPersistence.writeStats(jobOneId, jobOneAttemptNumberTwo, 1000, 1000, 1000, 1000, streamStats); + + // First attempt for second job. + final long jobTwoId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); + final int jobTwoAttemptNumberOne = jobPersistence.createAttempt(jobTwoId, LOG_PATH); + jobPersistence.writeStats(jobTwoId, jobTwoAttemptNumberOne, 1000, 1000, 1000, 1000, streamStats); + + final var stats = jobPersistence.getAttemptStats(List.of(jobOneId, jobTwoId)); + final var exp = Map.of( + new JobAttemptPair(jobOneId, jobOneAttemptNumberOne), + new AttemptStats( + new SyncStats().withRecordsEmitted(2000L).withBytesEmitted(2000L).withEstimatedBytes(2000L).withEstimatedRecords(2000L), + List.of(new StreamSyncStats().withStreamName("name1").withStats( + new SyncStats().withEstimatedBytes(10000L).withEstimatedRecords(2000L).withBytesEmitted(1000L).withRecordsEmitted(1000L)))), + new JobAttemptPair(jobOneId, jobOneAttemptNumberTwo), + new AttemptStats( + new SyncStats().withRecordsEmitted(1000L).withBytesEmitted(1000L).withEstimatedBytes(1000L).withEstimatedRecords(1000L), + List.of(new StreamSyncStats().withStreamName("name1").withStats( + new SyncStats().withEstimatedBytes(10000L).withEstimatedRecords(2000L).withBytesEmitted(1000L).withRecordsEmitted(1000L)))), + new JobAttemptPair(jobTwoId, jobTwoAttemptNumberOne), + new AttemptStats( + new SyncStats().withRecordsEmitted(1000L).withBytesEmitted(1000L).withEstimatedBytes(1000L).withEstimatedRecords(1000L), + List.of(new StreamSyncStats().withStreamName("name1").withStats( + new SyncStats().withEstimatedBytes(10000L).withEstimatedRecords(2000L).withBytesEmitted(1000L).withRecordsEmitted(1000L))))); + + assertEquals(exp, stats); + + } + + @Test + @DisplayName("Retrieving stats for an empty list should not cause an exception.") + void testGetStatsForEmptyJobList() throws IOException { + assertNotNull(jobPersistence.getAttemptStats(List.of())); + } + + @Test + @DisplayName("Retrieving stats for a bad job attempt input should not cause an exception.") + void testGetStatsForBadJobAttemptInput() throws IOException { + assertNotNull(jobPersistence.getAttemptStats(-1, -1)); + } + } @Test @@ -471,8 +548,8 @@ void testGetLastSyncJobWithMultipleAttempts() throws IOException { SYNC_JOB_CONFIG, JobStatus.INCOMPLETE, Lists.newArrayList( - createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH), - createAttempt(1L, jobId, AttemptStatus.FAILED, LOG_PATH)), + createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH), + createAttempt(1, jobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(Optional.of(expected), actual); @@ -514,15 +591,14 @@ void testExportImport() throws IOException, SQLException { jobPersistence.importDatabase("test", outputStreams); final List actualList = jobPersistence.listJobs(SPEC_JOB_CONFIG.getConfigType(), CONNECTION_ID.toString(), 9999, 0); - final Job actual = actualList.get(0); final Job expected = createJob( jobId, SPEC_JOB_CONFIG, JobStatus.SUCCEEDED, Lists.newArrayList( - createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH), - createAttempt(1L, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)), + createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH), + createAttempt(1, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)), NOW.getEpochSecond()); assertEquals(1, actualList.size()); @@ -557,8 +633,8 @@ void testListJobsWithTimestamp() throws IOException { assertEquals(jobs.size(), 1); assertEquals(jobs.get(0).getId(), syncJobId); assertEquals(jobs.get(0).getAttempts().size(), 2); - assertEquals(jobs.get(0).getAttempts().get(0).getId(), 0); - assertEquals(jobs.get(0).getAttempts().get(1).getId(), 1); + assertEquals(jobs.get(0).getAttempts().get(0).getAttemptNumber(), 0); + assertEquals(jobs.get(0).getAttempts().get(1).getAttemptNumber(), 1); final Path syncJobThirdAttemptLogPath = LOG_PATH.resolve("3"); final int syncJobAttemptNumber2 = jobPersistence.createAttempt(syncJobId, syncJobThirdAttemptLogPath); @@ -578,12 +654,12 @@ void testListJobsWithTimestamp() throws IOException { assertEquals(secondQueryJobs.size(), 2); assertEquals(secondQueryJobs.get(0).getId(), syncJobId); assertEquals(secondQueryJobs.get(0).getAttempts().size(), 1); - assertEquals(secondQueryJobs.get(0).getAttempts().get(0).getId(), 2); + assertEquals(secondQueryJobs.get(0).getAttempts().get(0).getAttemptNumber(), 2); assertEquals(secondQueryJobs.get(1).getId(), newSyncJobId); assertEquals(secondQueryJobs.get(1).getAttempts().size(), 2); - assertEquals(secondQueryJobs.get(1).getAttempts().get(0).getId(), 0); - assertEquals(secondQueryJobs.get(1).getAttempts().get(1).getId(), 1); + assertEquals(secondQueryJobs.get(1).getAttempts().get(0).getAttemptNumber(), 0); + assertEquals(secondQueryJobs.get(1).getAttempts().get(1).getAttemptNumber(), 1); Long maxEndedAtTimestampAfterSecondQuery = -1L; for (final Job c : secondQueryJobs) { @@ -628,35 +704,35 @@ void testListAttemptsWithJobInfo() throws IOException { assertEquals(6, allAttempts.size()); assertEquals(job1, allAttempts.get(0).getJobInfo().getId()); - assertEquals(job1Attempt1, allAttempts.get(0).getAttempt().getId()); + assertEquals(job1Attempt1, allAttempts.get(0).getAttempt().getAttemptNumber()); assertEquals(job2, allAttempts.get(1).getJobInfo().getId()); - assertEquals(job2Attempt1, allAttempts.get(1).getAttempt().getId()); + assertEquals(job2Attempt1, allAttempts.get(1).getAttempt().getAttemptNumber()); assertEquals(job2, allAttempts.get(2).getJobInfo().getId()); - assertEquals(job2Attempt2, allAttempts.get(2).getAttempt().getId()); + assertEquals(job2Attempt2, allAttempts.get(2).getAttempt().getAttemptNumber()); assertEquals(job1, allAttempts.get(3).getJobInfo().getId()); - assertEquals(job1Attempt2, allAttempts.get(3).getAttempt().getId()); + assertEquals(job1Attempt2, allAttempts.get(3).getAttempt().getAttemptNumber()); assertEquals(job1, allAttempts.get(4).getJobInfo().getId()); - assertEquals(job1Attempt3, allAttempts.get(4).getAttempt().getId()); + assertEquals(job1Attempt3, allAttempts.get(4).getAttempt().getAttemptNumber()); assertEquals(job2, allAttempts.get(5).getJobInfo().getId()); - assertEquals(job2Attempt3, allAttempts.get(5).getAttempt().getId()); + assertEquals(job2Attempt3, allAttempts.get(5).getAttempt().getAttemptNumber()); final List attemptsAfterTimestamp = jobPersistence.listAttemptsWithJobInfo(ConfigType.SYNC, Instant.ofEpochSecond(allAttempts.get(2).getAttempt().getEndedAtInSecond().orElseThrow())); assertEquals(3, attemptsAfterTimestamp.size()); assertEquals(job1, attemptsAfterTimestamp.get(0).getJobInfo().getId()); - assertEquals(job1Attempt2, attemptsAfterTimestamp.get(0).getAttempt().getId()); + assertEquals(job1Attempt2, attemptsAfterTimestamp.get(0).getAttempt().getAttemptNumber()); assertEquals(job1, attemptsAfterTimestamp.get(1).getJobInfo().getId()); - assertEquals(job1Attempt3, attemptsAfterTimestamp.get(1).getAttempt().getId()); + assertEquals(job1Attempt3, attemptsAfterTimestamp.get(1).getAttempt().getAttemptNumber()); assertEquals(job2, attemptsAfterTimestamp.get(2).getJobInfo().getId()); - assertEquals(job2Attempt3, attemptsAfterTimestamp.get(2).getAttempt().getId()); + assertEquals(job2Attempt3, attemptsAfterTimestamp.get(2).getAttempt().getAttemptNumber()); } private static Supplier incrementingSecondSupplier(final Instant startTime) { @@ -887,7 +963,7 @@ void testCreateAttempt() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.RUNNING, - Lists.newArrayList(createUnfinishedAttempt(0L, jobId, AttemptStatus.RUNNING, LOG_PATH)), + Lists.newArrayList(createUnfinishedAttempt(0, jobId, AttemptStatus.RUNNING, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -901,12 +977,12 @@ void testCreateAttemptAttemptId() throws IOException { final Job jobAfterOneAttempts = jobPersistence.getJob(jobId); assertEquals(0, attemptNumber1); - assertEquals(0, jobAfterOneAttempts.getAttempts().get(0).getId()); + assertEquals(0, jobAfterOneAttempts.getAttempts().get(0).getAttemptNumber()); final int attemptNumber2 = jobPersistence.createAttempt(jobId, LOG_PATH); final Job jobAfterTwoAttempts = jobPersistence.getJob(jobId); assertEquals(1, attemptNumber2); - assertEquals(Sets.newHashSet(0L, 1L), jobAfterTwoAttempts.getAttempts().stream().map(Attempt::getId).collect(Collectors.toSet())); + assertEquals(Sets.newHashSet(0, 1), jobAfterTwoAttempts.getAttempts().stream().map(Attempt::getAttemptNumber).collect(Collectors.toSet())); } @Test @@ -922,7 +998,7 @@ void testCreateAttemptWhileAttemptAlreadyRunning() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.RUNNING, - Lists.newArrayList(createUnfinishedAttempt(0L, jobId, AttemptStatus.RUNNING, LOG_PATH)), + Lists.newArrayList(createUnfinishedAttempt(0, jobId, AttemptStatus.RUNNING, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -941,7 +1017,7 @@ void testCreateAttemptTerminal() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.SUCCEEDED, - Lists.newArrayList(createAttempt(0L, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)), + Lists.newArrayList(createAttempt(0, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -1336,8 +1412,8 @@ void testGetNextJobWithMultipleAttempts() throws IOException { SPEC_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList( - createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH), - createAttempt(1L, jobId, AttemptStatus.FAILED, LOG_PATH)), + createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH), + createAttempt(1, jobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(Optional.of(expected), actual); @@ -1562,8 +1638,8 @@ void testListJobsWithMultipleAttempts() throws IOException { SPEC_JOB_CONFIG, JobStatus.SUCCEEDED, Lists.newArrayList( - createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH), - createAttempt(1L, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)), + createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH), + createAttempt(1, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)), NOW.getEpochSecond()); assertEquals(1, actualList.size()); @@ -1682,7 +1758,7 @@ void testListJobsWithStatus() throws IOException { SPEC_JOB_CONFIG, JobStatus.INCOMPLETE, Lists.newArrayList( - createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH)), + createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(1, actualList.size()); @@ -1720,7 +1796,7 @@ void testListJobsWithStatusAndConfigType() throws IOException, InterruptedExcept SPEC_JOB_CONFIG, JobStatus.INCOMPLETE, Lists.newArrayList( - createAttempt(0L, failedSpecJobId, AttemptStatus.FAILED, LOG_PATH)), + createAttempt(0, failedSpecJobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond(), SPEC_SCOPE); @@ -1757,12 +1833,12 @@ void testListJobsWithStatusesAndConfigTypesForConnection() throws IOException, I Set.of(ConfigType.SYNC, ConfigType.CHECK_CONNECTION_DESTINATION), Set.of(JobStatus.PENDING, JobStatus.SUCCEEDED)); final Job expectedDesiredJob1 = createJob(desiredJobId1, SYNC_JOB_CONFIG, JobStatus.SUCCEEDED, - Lists.newArrayList(createAttempt(0L, desiredJobId1, AttemptStatus.SUCCEEDED, LOG_PATH)), + Lists.newArrayList(createAttempt(0, desiredJobId1, AttemptStatus.SUCCEEDED, LOG_PATH)), NOW.getEpochSecond(), desiredConnectionId.toString()); final Job expectedDesiredJob2 = createJob(desiredJobId2, SYNC_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList(), NOW.getEpochSecond(), desiredConnectionId.toString()); final Job expectedDesiredJob3 = createJob(desiredJobId3, CHECK_JOB_CONFIG, JobStatus.SUCCEEDED, - Lists.newArrayList(createAttempt(0L, desiredJobId3, AttemptStatus.SUCCEEDED, LOG_PATH)), + Lists.newArrayList(createAttempt(0, desiredJobId3, AttemptStatus.SUCCEEDED, LOG_PATH)), NOW.getEpochSecond(), desiredConnectionId.toString()); final Job expectedDesiredJob4 = createJob(desiredJobId4, CHECK_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList(), NOW.getEpochSecond(), desiredConnectionId.toString()); diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java index 0913a29ca734..badc1ac68d70 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java @@ -19,7 +19,7 @@ void testIsAttemptInTerminalState() { } private static Attempt attemptWithStatus(final AttemptStatus attemptStatus) { - return new Attempt(1L, 1L, null, null, attemptStatus, null, 0L, 0L, null); + return new Attempt(1, 1L, null, null, attemptStatus, null, 0L, 0L, null); } } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java index 3e10fa003d36..4cdb4f15403d 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java @@ -68,7 +68,7 @@ void testGetLastFailedAttempt() { final Job job = jobWithAttemptWithStatus(AttemptStatus.FAILED, AttemptStatus.FAILED); assertTrue(job.getLastFailedAttempt().isPresent()); - assertEquals(2, job.getLastFailedAttempt().get().getId()); + assertEquals(2, job.getLastFailedAttempt().get().getAttemptNumber()); } @Test diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index 52c28f3640f1..f478e85b90c0 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -137,7 +137,7 @@ public AttemptInfoRead getAttemptInfoRead(final Attempt attempt) { public static AttemptRead getAttemptRead(final Attempt attempt) { return new AttemptRead() - .id(attempt.getId()) + .id((long) attempt.getAttemptNumber()) .status(Enums.convertTo(attempt.getStatus(), AttemptStatus.class)) .bytesSynced(attempt.getOutput() // TODO (parker) remove after frontend switches to totalStats .map(JobOutput::getSync) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java index f61476a18600..c1dadf31f148 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java @@ -5,7 +5,11 @@ package io.airbyte.server.handlers; import com.google.common.base.Preconditions; +import io.airbyte.api.model.generated.AttemptInfoRead; import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList; +import io.airbyte.api.model.generated.AttemptRead; +import io.airbyte.api.model.generated.AttemptStats; +import io.airbyte.api.model.generated.AttemptStreamStats; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.DestinationDefinitionIdRequestBody; import io.airbyte.api.model.generated.DestinationDefinitionRead; @@ -33,6 +37,7 @@ import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.persistence.job.JobPersistence.JobAttemptPair; import io.airbyte.persistence.job.models.Job; import io.airbyte.persistence.job.models.JobStatus; import io.airbyte.server.converters.JobConverter; @@ -41,11 +46,14 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class JobHistoryHandler { private final ConnectionsHandler connectionsHandler; @@ -118,16 +126,60 @@ public JobReadList listJobsFor(final JobListRequestBody request) throws IOExcept (request.getPagination() != null && request.getPagination().getRowOffset() != null) ? request.getPagination().getRowOffset() : 0); } - final Long totalJobCount = jobPersistence.getJobCount(configTypes, configId); + final List jobReads = jobs.stream().map(JobConverter::getJobWithAttemptsRead).collect(Collectors.toList()); + final var jobIds = jobReads.stream().map(r -> r.getJob().getId()).toList(); + final Map stats = jobPersistence.getAttemptStats(jobIds); + for (final JobWithAttemptsRead jwar : jobReads) { + for (final AttemptRead a : jwar.getAttempts()) { + final var stat = stats.get(new JobAttemptPair(jwar.getJob().getId(), a.getId().intValue())); + if (stat == null) { + log.error("Missing stats for job {} attempt {}", jwar.getJob().getId(), a.getId().intValue()); + continue; + } - final List jobReads = jobs - .stream() - .map(JobConverter::getJobWithAttemptsRead) - .collect(Collectors.toList()); + hydrateWithStats(a, stat); + } + } + final Long totalJobCount = jobPersistence.getJobCount(configTypes, configId); return new JobReadList().jobs(jobReads).totalJobCount(totalJobCount); } + /** + * Retrieve stats for a given job id and attempt number and hydrate the api model with the retrieved + * information. + * + * @param jobId the job the attempt belongs to. Used as an index to retrieve stats. + * @param a the attempt to hydrate stats for. + */ + private void hydrateWithStats(final AttemptRead a, final JobPersistence.AttemptStats attemptStats) { + a.setTotalStats(new AttemptStats()); + + final var combinedStats = attemptStats.combinedStats(); + if (combinedStats == null) { + // If overall stats are missing, assume stream stats are also missing, since overall stats are + // easier to produce than stream stats. Exit early. + return; + } + + a.getTotalStats() + .estimatedBytes(combinedStats.getEstimatedBytes()) + .estimatedRecords(combinedStats.getEstimatedRecords()) + .bytesEmitted(combinedStats.getBytesEmitted()) + .recordsEmitted(combinedStats.getRecordsEmitted()); + + final var streamStats = attemptStats.perStreamStats().stream().map(s -> new AttemptStreamStats() + .streamName(s.getStreamName()) + .streamNamespace(s.getStreamNamespace()) + .stats(new AttemptStats() + .bytesEmitted(s.getStats().getBytesEmitted()) + .recordsEmitted(s.getStats().getRecordsEmitted()) + .estimatedBytes(s.getStats().getEstimatedBytes()) + .estimatedRecords(s.getStats().getEstimatedRecords()))) + .collect(Collectors.toList()); + a.setStreamStats(streamStats); + } + public JobInfoRead getJobInfo(final JobIdRequestBody jobIdRequestBody) throws IOException { final Job job = jobPersistence.getJob(jobIdRequestBody.getId()); return jobConverter.getJobInfoRead(job); @@ -143,6 +195,12 @@ public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody) final Job job = jobPersistence.getJob(jobIdRequestBody.getId()); final JobInfoRead jobinfoRead = jobConverter.getJobInfoRead(job); + for (final AttemptInfoRead a : jobinfoRead.getAttempts()) { + final int attemptNumber = a.getAttempt().getId().intValue(); + final var attemptStats = jobPersistence.getAttemptStats(job.getId(), attemptNumber); + hydrateWithStats(a.getAttempt(), attemptStats); + } + final JobDebugInfoRead jobDebugInfoRead = buildJobDebugInfoRead(jobinfoRead); if (temporalClient != null) { final UUID connectionId = UUID.fromString(job.getScope()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java b/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java index fe1d084d92ce..f73ec6e62aad 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java @@ -68,7 +68,7 @@ class JobConverterTest { private static final long JOB_ID = 100L; - private static final long ATTEMPT_ID = 1002L; + private static final Integer ATTEMPT_NUMBER = 0; private static final String JOB_CONFIG_ID = "123"; private static final JobStatus JOB_STATUS = JobStatus.RUNNING; private static final AttemptStatus ATTEMPT_STATUS = AttemptStatus.RUNNING; @@ -124,7 +124,7 @@ class JobConverterTest { .updatedAt(CREATED_AT)) .attempts(Lists.newArrayList(new AttemptInfoRead() .attempt(new AttemptRead() - .id(ATTEMPT_ID) + .id((long) ATTEMPT_NUMBER) .status(io.airbyte.api.model.generated.AttemptStatus.RUNNING) .recordsSynced(RECORDS_EMITTED) .bytesSynced(BYTES_EMITTED) @@ -195,7 +195,7 @@ public void setUp() { when(job.getCreatedAtInSecond()).thenReturn(CREATED_AT); when(job.getUpdatedAtInSecond()).thenReturn(CREATED_AT); when(job.getAttempts()).thenReturn(Lists.newArrayList(attempt)); - when(attempt.getId()).thenReturn(ATTEMPT_ID); + when(attempt.getAttemptNumber()).thenReturn(ATTEMPT_NUMBER); when(attempt.getStatus()).thenReturn(ATTEMPT_STATUS); when(attempt.getOutput()).thenReturn(Optional.of(JOB_OUTPUT)); when(attempt.getLogPath()).thenReturn(LOG_PATH); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java index cb68ad2b8205..105e2fbb9c17 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java @@ -6,6 +6,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -14,6 +16,7 @@ import io.airbyte.api.model.generated.AttemptNormalizationStatusRead; import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList; import io.airbyte.api.model.generated.AttemptRead; +import io.airbyte.api.model.generated.AttemptStreamStats; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.DestinationIdRequestBody; import io.airbyte.api.model.generated.DestinationRead; @@ -42,9 +45,13 @@ import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; +import io.airbyte.config.StreamSyncStats; +import io.airbyte.config.SyncStats; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.persistence.job.JobPersistence.AttemptStats; +import io.airbyte.persistence.job.JobPersistence.JobAttemptPair; import io.airbyte.persistence.job.models.Attempt; import io.airbyte.persistence.job.models.AttemptNormalizationStatus; import io.airbyte.persistence.job.models.AttemptStatus; @@ -63,6 +70,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -77,7 +85,7 @@ class JobHistoryHandlerTest { private static final long JOB_ID = 100L; - private static final long ATTEMPT_ID = 1002L; + private static final int ATTEMPT_NUMBER = 0; private static final String JOB_CONFIG_ID = "ef296385-6796-413f-ac1b-49c4caba3f2b"; private static final JobStatus JOB_STATUS = JobStatus.SUCCEEDED; private static final JobConfig.ConfigType CONFIG_TYPE = JobConfig.ConfigType.CHECK_CONNECTION_SOURCE; @@ -89,17 +97,24 @@ class JobHistoryHandlerTest { private static final LogRead EMPTY_LOG_READ = new LogRead().logLines(new ArrayList<>()); private static final long CREATED_AT = System.currentTimeMillis() / 1000; - private SourceRead sourceRead; - private ConnectionRead connectionRead; - private DestinationRead destinationRead; + private static final AttemptStats ATTEMPT_STATS = new AttemptStats(new SyncStats().withBytesEmitted(10L).withRecordsEmitted(10L), + List.of( + new StreamSyncStats().withStreamNamespace("ns1").withStreamName("stream1") + .withStats(new SyncStats().withRecordsEmitted(5L).withBytesEmitted(5L)), + new StreamSyncStats().withStreamName("stream2") + .withStats(new SyncStats().withRecordsEmitted(5L).withBytesEmitted(5L)))); + + private static final io.airbyte.api.model.generated.AttemptStats ATTEMPT_STATS_API = new io.airbyte.api.model.generated.AttemptStats() + .bytesEmitted(10L).recordsEmitted(10L); + + private static final List ATTEMPT_STREAM_STATS = List.of( + new AttemptStreamStats().streamNamespace("ns1").streamName("stream1") + .stats(new io.airbyte.api.model.generated.AttemptStats().recordsEmitted(5L).bytesEmitted(5L)), + new AttemptStreamStats().streamName("stream2").stats(new io.airbyte.api.model.generated.AttemptStats().recordsEmitted(5L).bytesEmitted(5L))); + private ConnectionsHandler connectionsHandler; private SourceHandler sourceHandler; private DestinationHandler destinationHandler; - private SourceDefinitionsHandler sourceDefinitionsHandler; - private DestinationDefinitionsHandler destinationDefinitionsHandler; - private StandardDestinationDefinition standardDestinationDefinition; - private StandardSourceDefinition standardSourceDefinition; - private AirbyteVersion airbyteVersion; private Job testJob; private Attempt testJobAttempt; private JobPersistence jobPersistence; @@ -134,7 +149,7 @@ private static List toAttemptInfoList(final List attem private static AttemptRead toAttemptRead(final Attempt a) { return new AttemptRead() - .id(a.getId()) + .id((long) a.getAttemptNumber()) .status(Enums.convertTo(a.getStatus(), io.airbyte.api.model.generated.AttemptStatus.class)) .createdAt(a.getCreatedAtInSecond()) .updatedAt(a.getUpdatedAtInSecond()) @@ -142,22 +157,22 @@ private static AttemptRead toAttemptRead(final Attempt a) { } private static Attempt createAttempt(final long jobId, final long timestamps, final AttemptStatus status) { - return new Attempt(ATTEMPT_ID, jobId, LOG_PATH, null, status, null, timestamps, timestamps, timestamps); + return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, status, null, timestamps, timestamps, timestamps); } @BeforeEach - void setUp() throws IOException, JsonValidationException, ConfigNotFoundException { + void setUp() { testJobAttempt = createAttempt(JOB_ID, CREATED_AT, AttemptStatus.SUCCEEDED); testJob = new Job(JOB_ID, JOB_CONFIG.getConfigType(), JOB_CONFIG_ID, JOB_CONFIG, ImmutableList.of(testJobAttempt), JOB_STATUS, null, CREATED_AT, CREATED_AT); connectionsHandler = mock(ConnectionsHandler.class); sourceHandler = mock(SourceHandler.class); - sourceDefinitionsHandler = mock(SourceDefinitionsHandler.class); destinationHandler = mock(DestinationHandler.class); - destinationDefinitionsHandler = mock(DestinationDefinitionsHandler.class); - airbyteVersion = mock(AirbyteVersion.class); jobPersistence = mock(JobPersistence.class); + final SourceDefinitionsHandler sourceDefinitionsHandler = mock(SourceDefinitionsHandler.class); + final DestinationDefinitionsHandler destinationDefinitionsHandler = mock(DestinationDefinitionsHandler.class); + final AirbyteVersion airbyteVersion = mock(AirbyteVersion.class); jobHistoryHandler = new JobHistoryHandler(jobPersistence, WorkerEnvironment.DOCKER, LogConfigs.EMPTY, connectionsHandler, sourceHandler, sourceDefinitionsHandler, destinationHandler, destinationDefinitionsHandler, airbyteVersion); } @@ -182,6 +197,9 @@ void testListJobs() throws IOException { when(jobPersistence.listJobs(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID, pagesize, rowOffset)) .thenReturn(List.of(latestJobNoAttempt, successfulJob)); when(jobPersistence.getJobCount(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID)).thenReturn(2L); + when(jobPersistence.getAttemptStats(List.of(200L, 100L))).thenReturn(Map.of( + new JobAttemptPair(100, 0), ATTEMPT_STATS, + new JobAttemptPair(jobId2, 0), ATTEMPT_STATS)); final var requestBody = new JobListRequestBody() .configTypes(Collections.singletonList(CONFIG_TYPE_FOR_API)) @@ -189,8 +207,8 @@ void testListJobs() throws IOException { .pagination(new Pagination().pageSize(pagesize).rowOffset(rowOffset)); final var jobReadList = jobHistoryHandler.listJobsFor(requestBody); - final var successfulJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(successfulJob)).attempts(ImmutableList.of(toAttemptRead( - testJobAttempt))); + final var expAttemptRead = toAttemptRead(testJobAttempt).totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS); + final var successfulJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(successfulJob)).attempts(ImmutableList.of(expAttemptRead)); final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJobNoAttempt)).attempts(Collections.emptyList()); final JobReadList expectedJobReadList = new JobReadList().jobs(List.of(latestJobWithAttemptRead, successfulJobWithAttemptRead)).totalJobCount(2L); @@ -223,6 +241,10 @@ void testListJobsFor() throws IOException { when(jobPersistence.listJobs(configTypes, JOB_CONFIG_ID, pagesize, rowOffset)).thenReturn(List.of(latestJob, secondJob, firstJob)); when(jobPersistence.getJobCount(configTypes, JOB_CONFIG_ID)).thenReturn(3L); + when(jobPersistence.getAttemptStats(List.of(300L, 200L, 100L))).thenReturn(Map.of( + new JobAttemptPair(100, 0), ATTEMPT_STATS, + new JobAttemptPair(secondJobId, 0), ATTEMPT_STATS, + new JobAttemptPair(latestJobId, 0), ATTEMPT_STATS)); final JobListRequestBody requestBody = new JobListRequestBody() .configTypes(List.of(CONFIG_TYPE_FOR_API, JobConfigType.SYNC, JobConfigType.DISCOVER_SCHEMA)) @@ -231,9 +253,11 @@ void testListJobsFor() throws IOException { final JobReadList jobReadList = jobHistoryHandler.listJobsFor(requestBody); final var firstJobWithAttemptRead = - new JobWithAttemptsRead().job(toJobInfo(firstJob)).attempts(ImmutableList.of(toAttemptRead(testJobAttempt))); + new JobWithAttemptsRead().job(toJobInfo(firstJob)) + .attempts(ImmutableList.of(toAttemptRead(testJobAttempt).totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS))); final var secondJobWithAttemptRead = - new JobWithAttemptsRead().job(toJobInfo(secondJob)).attempts(ImmutableList.of(toAttemptRead(secondJobAttempt))); + new JobWithAttemptsRead().job(toJobInfo(secondJob)) + .attempts(ImmutableList.of(toAttemptRead(secondJobAttempt).totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS))); final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJob)).attempts(Collections.emptyList()); final JobReadList expectedJobReadList = new JobReadList().jobs(List.of(latestJobWithAttemptRead, secondJobWithAttemptRead, firstJobWithAttemptRead)).totalJobCount(3L); @@ -257,6 +281,9 @@ void testListJobsIncludingJobId() throws IOException { when(jobPersistence.listJobsIncludingId(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID, jobId2, pagesize)) .thenReturn(List.of(latestJobNoAttempt, successfulJob)); when(jobPersistence.getJobCount(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID)).thenReturn(2L); + when(jobPersistence.getAttemptStats(List.of(200L, 100L))).thenReturn(Map.of( + new JobAttemptPair(100, 0), ATTEMPT_STATS, + new JobAttemptPair(jobId2, 0), ATTEMPT_STATS)); final var requestBody = new JobListRequestBody() .configTypes(Collections.singletonList(CONFIG_TYPE_FOR_API)) @@ -266,7 +293,7 @@ void testListJobsIncludingJobId() throws IOException { final var jobReadList = jobHistoryHandler.listJobsFor(requestBody); final var successfulJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(successfulJob)).attempts(ImmutableList.of(toAttemptRead( - testJobAttempt))); + testJobAttempt).totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS))); final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJobNoAttempt)).attempts(Collections.emptyList()); final JobReadList expectedJobReadList = new JobReadList().jobs(List.of(latestJobWithAttemptRead, successfulJobWithAttemptRead)).totalJobCount(2L); @@ -305,16 +332,16 @@ void testGetJobInfoLight() throws IOException { @Test @DisplayName("Should return the right info to debug this job") void testGetDebugJobInfo() throws IOException, JsonValidationException, ConfigNotFoundException, URISyntaxException { - standardSourceDefinition = SourceDefinitionHelpers.generateSourceDefinition(); + final StandardSourceDefinition standardSourceDefinition = SourceDefinitionHelpers.generateSourceDefinition(); final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); - sourceRead = SourceHelpers.getSourceRead(source, standardSourceDefinition); + final SourceRead sourceRead = SourceHelpers.getSourceRead(source, standardSourceDefinition); - standardDestinationDefinition = DestinationDefinitionHelpers.generateDestination(); + final StandardDestinationDefinition standardDestinationDefinition = DestinationDefinitionHelpers.generateDestination(); final DestinationConnection destination = DestinationHelpers.generateDestination(UUID.randomUUID()); - destinationRead = DestinationHelpers.getDestinationRead(destination, standardDestinationDefinition); + final DestinationRead destinationRead = DestinationHelpers.getDestinationRead(destination, standardDestinationDefinition); final StandardSync standardSync = ConnectionHelpers.generateSyncWithSourceId(source.getSourceId()); - connectionRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync); + final ConnectionRead connectionRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync); when(connectionsHandler.getConnection(UUID.fromString(testJob.getScope()))).thenReturn(connectionRead); final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody(); @@ -325,10 +352,13 @@ void testGetDebugJobInfo() throws IOException, JsonValidationException, ConfigNo destinationIdRequestBody.setDestinationId(connectionRead.getDestinationId()); when(destinationHandler.getDestination(destinationIdRequestBody)).thenReturn(destinationRead); when(jobPersistence.getJob(JOB_ID)).thenReturn(testJob); + when(jobPersistence.getAttemptStats(anyLong(), anyInt())).thenReturn(ATTEMPT_STATS); final JobIdRequestBody requestBody = new JobIdRequestBody().id(JOB_ID); final JobDebugInfoRead jobDebugInfoActual = jobHistoryHandler.getJobDebugInfo(requestBody); - final JobDebugInfoRead exp = new JobDebugInfoRead().job(toDebugJobInfo(testJob)).attempts(toAttemptInfoList(ImmutableList.of(testJobAttempt))); + final List attemptInfoReads = toAttemptInfoList(ImmutableList.of(testJobAttempt)); + attemptInfoReads.forEach(read -> read.getAttempt().totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS)); + final JobDebugInfoRead exp = new JobDebugInfoRead().job(toDebugJobInfo(testJob)).attempts(attemptInfoReads); assertEquals(exp, jobDebugInfoActual); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 39b089f8fef2..37a9f7647629 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -431,7 +431,7 @@ private boolean checkActiveJobPreviousAttempt(final Job activeJob, final int att if (activeJob.getAttempts().size() > minAttemptSize) { final Optional optionalAttempt = activeJob.getAttempts().stream() - .filter(attempt -> attempt.getId() == (attemptId - 1)).findFirst(); + .filter(attempt -> attempt.getAttemptNumber() == (attemptId - 1)).findFirst(); result = optionalAttempt.isPresent() && optionalAttempt.get().getStatus().equals(FAILED); } @@ -451,8 +451,7 @@ private void failNonTerminalJobs(final UUID connectionId) { continue; } - // the Attempt object 'id' is actually the value of the attempt_number column in the db - final int attemptNumber = (int) attempt.getId(); + final int attemptNumber = attempt.getAttemptNumber(); log.info("Failing non-terminal attempt {} for non-terminal job {}", attemptNumber, jobId); jobPersistence.failAttempt(jobId, attemptNumber); jobPersistence.writeAttemptFailureSummary(jobId, attemptNumber,