Skip to content

Commit

Permalink
Remerge Progress Bar Read API. (#21124)
Browse files Browse the repository at this point in the history
Let's try #20937 again, this time with better test for error cases.

See original PR for description.

This PR adds testing and logic to handle empty/bad job input.
  • Loading branch information
davinchia authored and jbfbell committed Jan 13, 2023
1 parent 44ebed3 commit 836e2e5
Show file tree
Hide file tree
Showing 11 changed files with 341 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import io.airbyte.commons.enums.Enums;
Expand Down Expand Up @@ -72,6 +73,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.InsertValuesStepN;
Expand Down Expand Up @@ -511,6 +513,76 @@ public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) t
});
}

@Override
public Map<JobAttemptPair, AttemptStats> getAttemptStats(final List<Long> jobIds) throws IOException {
if (jobIds == null || jobIds.isEmpty()) {
return Map.of();
}

final var jobIdsStr = StringUtils.join(jobIds, ',');
return jobDatabase.query(ctx -> {
// Instead of one massive join query, separate this query into two queries for better readability
// for now.
// We can combine the queries at a later date if this still proves to be not efficient enough.
final Map<JobAttemptPair, AttemptStats> attemptStats = hydrateSyncStats(jobIdsStr, ctx);
hydrateStreamStats(jobIdsStr, ctx, attemptStats);
return attemptStats;
});
}

private static Map<JobAttemptPair, AttemptStats> hydrateSyncStats(final String jobIdsStr, final DSLContext ctx) {
final var attemptStats = new HashMap<JobAttemptPair, AttemptStats>();
final var syncResults = ctx.fetch(
"SELECT atmpt.attempt_number, atmpt.job_id,"
+ "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted "
+ "FROM sync_stats stats "
+ "INNER JOIN attempts atmpt ON stats.attempt_id = atmpt.id "
+ "WHERE job_id IN ( " + jobIdsStr + ");");
syncResults.forEach(r -> {
final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER));
final var syncStats = new SyncStats()
.withBytesEmitted(r.get(SYNC_STATS.BYTES_EMITTED))
.withRecordsEmitted(r.get(SYNC_STATS.RECORDS_EMITTED))
.withEstimatedRecords(r.get(SYNC_STATS.ESTIMATED_RECORDS))
.withEstimatedBytes(r.get(SYNC_STATS.ESTIMATED_BYTES));
attemptStats.put(key, new AttemptStats(syncStats, Lists.newArrayList()));
});
return attemptStats;
}

/**
* This method needed to be called after
* {@link DefaultJobPersistence#hydrateSyncStats(String, DSLContext)} as it assumes hydrateSyncStats
* has prepopulated the map.
*/
private static void hydrateStreamStats(final String jobIdsStr, final DSLContext ctx, final Map<JobAttemptPair, AttemptStats> attemptStats) {
final var streamResults = ctx.fetch(
"SELECT atmpt.attempt_number, atmpt.job_id, "
+ "stats.stream_name, stats.stream_namespace, stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted "
+ "FROM stream_stats stats "
+ "INNER JOIN attempts atmpt ON atmpt.id = stats.attempt_id "
+ "WHERE attempt_id IN "
+ "( SELECT id FROM attempts WHERE job_id IN ( " + jobIdsStr + "));");

streamResults.forEach(r -> {
final var streamSyncStats = new StreamSyncStats()
.withStreamNamespace(r.get(STREAM_STATS.STREAM_NAMESPACE))
.withStreamName(r.get(STREAM_STATS.STREAM_NAME))
.withStats(new SyncStats()
.withBytesEmitted(r.get(STREAM_STATS.BYTES_EMITTED))
.withRecordsEmitted(r.get(STREAM_STATS.RECORDS_EMITTED))
.withEstimatedRecords(r.get(STREAM_STATS.ESTIMATED_RECORDS))
.withEstimatedBytes(r.get(STREAM_STATS.ESTIMATED_BYTES)));

final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER));
if (!attemptStats.containsKey(key)) {
LOGGER.error("{} stream stats entry does not have a corresponding sync stats entry. This suggest the database is in a bad state.", key);
return;
}
attemptStats.get(key).perStreamStats().add(streamSyncStats);
});
}

@Override
public List<NormalizationSummary> getNormalizationSummary(final long jobId, final int attemptNumber) throws IOException {
return jobDatabase
Expand All @@ -528,6 +600,10 @@ static Long getAttemptId(final long jobId, final int attemptNumber, final DSLCon
final Optional<Record> record =
ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId,
attemptNumber).stream().findFirst();
if (record.isEmpty()) {
return -1L;
}

return record.get().get("id", Long.class);
}

Expand Down Expand Up @@ -849,7 +925,7 @@ private static Job getJobFromRecord(final Record record) {

private static Attempt getAttemptFromRecord(final Record record) {
return new Attempt(
record.get(ATTEMPT_NUMBER, Long.class),
record.get(ATTEMPT_NUMBER, int.class),
record.get(JOB_ID, Long.class),
Path.of(record.get("log_path", String.class)),
record.get("attempt_output", String.class) == null ? null : Jsons.deserialize(record.get("attempt_output", String.class), JobOutput.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public interface JobPersistence {
*/
record AttemptStats(SyncStats combinedStats, List<StreamSyncStats> perStreamStats) {}

record JobAttemptPair(long id, int attemptNumber) {}

/**
* Retrieve the combined and per stream stats for a single attempt.
*
Expand All @@ -57,6 +59,19 @@ record AttemptStats(SyncStats combinedStats, List<StreamSyncStats> perStreamStat
*/
AttemptStats getAttemptStats(long jobId, int attemptNumber) throws IOException;

/**
* Alternative method to retrieve combined and per stream stats per attempt for a list of jobs to
* avoid overloading the database with too many queries.
* <p>
* This implementation is intended to utilise complex joins under the hood to reduce the potential
* N+1 database pattern.
*
* @param jobIds
* @return
* @throws IOException
*/
Map<JobAttemptPair, AttemptStats> getAttemptStats(List<Long> jobIds) throws IOException;

List<NormalizationSummary> getNormalizationSummary(long jobId, int attemptNumber) throws IOException;

Job getJob(long jobId) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

public class Attempt {

private final long id;
private final int attemptNumber;
private final long jobId;
private final JobOutput output;
private final AttemptStatus status;
Expand All @@ -23,7 +23,7 @@ public class Attempt {
private final long createdAtInSecond;
private final Long endedAtInSecond;

public Attempt(final long id,
public Attempt(final int attemptNumber,
final long jobId,
final Path logPath,
final @Nullable JobOutput output,
Expand All @@ -32,7 +32,7 @@ public Attempt(final long id,
final long createdAtInSecond,
final long updatedAtInSecond,
final @Nullable Long endedAtInSecond) {
this.id = id;
this.attemptNumber = attemptNumber;
this.jobId = jobId;
this.output = output;
this.status = status;
Expand All @@ -43,8 +43,8 @@ public Attempt(final long id,
this.endedAtInSecond = endedAtInSecond;
}

public long getId() {
return id;
public int getAttemptNumber() {
return attemptNumber;
}

public long getJobId() {
Expand Down Expand Up @@ -92,7 +92,7 @@ public boolean equals(final Object o) {
return false;
}
final Attempt attempt = (Attempt) o;
return id == attempt.id &&
return attemptNumber == attempt.attemptNumber &&
jobId == attempt.jobId &&
updatedAtInSecond == attempt.updatedAtInSecond &&
createdAtInSecond == attempt.createdAtInSecond &&
Expand All @@ -105,13 +105,13 @@ public boolean equals(final Object o) {

@Override
public int hashCode() {
return Objects.hash(id, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
return Objects.hash(attemptNumber, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
}

@Override
public String toString() {
return "Attempt{" +
"id=" + id +
"id=" + attemptNumber +
", jobId=" + jobId +
", output=" + output +
", status=" + status +
Expand Down
Loading

0 comments on commit 836e2e5

Please sign in to comment.