Skip to content

Commit

Permalink
Add totalStats and streamStats in Attempts API response (#9583)
Browse files Browse the repository at this point in the history
* expose new stat fields in Attempts api response

* remove extranneous import
  • Loading branch information
pmossman authored Jan 20, 2022
1 parent d2d0335 commit a007953
Show file tree
Hide file tree
Showing 4 changed files with 423 additions and 2 deletions.
31 changes: 31 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3114,6 +3114,37 @@ components:
recordsSynced:
type: integer
format: int64
totalStats:
$ref: "#/components/schemas/AttemptStats"
streamStats:
type: array
items:
$ref: "#/components/schemas/AttemptStreamStats"
AttemptStats:
type: object
properties:
recordsEmitted:
type: integer
format: int64
bytesEmitted:
type: integer
format: int64
stateMessagesEmitted:
type: integer
format: int64
recordsCommitted:
type: integer
format: int64
AttemptStreamStats:
type: object
required:
- streamName
- stats
properties:
streamName:
type: string
stats:
$ref: "#/components/schemas/AttemptStats"
AttemptStatus:
type: string
enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import io.airbyte.api.model.AttemptInfoRead;
import io.airbyte.api.model.AttemptRead;
import io.airbyte.api.model.AttemptStats;
import io.airbyte.api.model.AttemptStatus;
import io.airbyte.api.model.AttemptStreamStats;
import io.airbyte.api.model.JobConfigType;
import io.airbyte.api.model.JobInfoRead;
import io.airbyte.api.model.JobRead;
Expand All @@ -19,6 +21,8 @@
import io.airbyte.config.JobOutput;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.scheduler.client.SynchronousJobMetadata;
Expand All @@ -27,6 +31,8 @@
import io.airbyte.scheduler.models.Job;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class JobConverter {
Expand Down Expand Up @@ -72,21 +78,59 @@ public static AttemptRead getAttemptRead(final Attempt attempt) {
return new AttemptRead()
.id(attempt.getId())
.status(Enums.convertTo(attempt.getStatus(), AttemptStatus.class))
.bytesSynced(attempt.getOutput()
.bytesSynced(attempt.getOutput() // TODO (parker) remove after frontend switches to totalStats
.map(JobOutput::getSync)
.map(StandardSyncOutput::getStandardSyncSummary)
.map(StandardSyncSummary::getBytesSynced)
.orElse(null))
.recordsSynced(attempt.getOutput()
.recordsSynced(attempt.getOutput() // TODO (parker) remove after frontend switches to totalStats
.map(JobOutput::getSync)
.map(StandardSyncOutput::getStandardSyncSummary)
.map(StandardSyncSummary::getRecordsSynced)
.orElse(null))
.totalStats(getTotalAttemptStats(attempt))
.streamStats(getAttemptStreamStats(attempt))
.createdAt(attempt.getCreatedAtInSecond())
.updatedAt(attempt.getUpdatedAtInSecond())
.endedAt(attempt.getEndedAtInSecond().orElse(null));
}

public static AttemptStats getTotalAttemptStats(final Attempt attempt) {
final SyncStats totalStats = attempt.getOutput()
.map(JobOutput::getSync)
.map(StandardSyncOutput::getStandardSyncSummary)
.map(StandardSyncSummary::getTotalStats)
.orElse(null);

if (totalStats == null) {
return null;
}

return new AttemptStats()
.bytesEmitted(totalStats.getBytesEmitted())
.recordsEmitted(totalStats.getRecordsEmitted())
.stateMessagesEmitted(totalStats.getStateMessagesEmitted())
.recordsCommitted(totalStats.getRecordsCommitted());
}

public static List<AttemptStreamStats> getAttemptStreamStats(final Attempt attempt) {
final List<StreamSyncStats> streamStats = attempt.getOutput()
.map(JobOutput::getSync)
.map(StandardSyncOutput::getStandardSyncSummary)
.map(StandardSyncSummary::getStreamStats)
.orElse(Collections.emptyList());

return streamStats.stream()
.map(streamStat -> new AttemptStreamStats()
.streamName(streamStat.getStreamName())
.stats(new AttemptStats()
.bytesEmitted(streamStat.getStats().getBytesEmitted())
.recordsEmitted(streamStat.getStats().getRecordsEmitted())
.stateMessagesEmitted(streamStat.getStats().getStateMessagesEmitted())
.recordsCommitted(streamStat.getStats().getRecordsCommitted())))
.collect(Collectors.toList());
}

public LogRead getLogRead(final Path logPath) {
try {
return new LogRead().logLines(LogClientSingleton.getInstance().getJobLogFile(workerEnvironment, logConfigs, logPath));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.google.common.collect.Lists;
import io.airbyte.api.model.AttemptInfoRead;
import io.airbyte.api.model.AttemptRead;
import io.airbyte.api.model.AttemptStats;
import io.airbyte.api.model.AttemptStreamStats;
import io.airbyte.api.model.JobConfigType;
import io.airbyte.api.model.JobInfoRead;
import io.airbyte.api.model.JobRead;
Expand All @@ -21,6 +23,12 @@
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.JobCheckConnectionConfig;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobOutput;
import io.airbyte.config.JobOutput.OutputType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.scheduler.models.Attempt;
import io.airbyte.scheduler.models.AttemptStatus;
Expand All @@ -46,6 +54,30 @@ class JobConverterTest {
.withCheckConnection(new JobCheckConnectionConfig());
private static final Path LOG_PATH = Path.of("log_path");
private static final long CREATED_AT = System.currentTimeMillis() / 1000;
private static final long RECORDS_EMITTED = 15L;
private static final long BYTES_EMITTED = 100L;
private static final long RECORDS_COMMITTED = 10L;
private static final long STATE_MESSAGES_EMITTED = 2L;
private static final String STREAM_NAME = "stream1";

private static final JobOutput JOB_OUTPUT = new JobOutput()
.withOutputType(OutputType.SYNC)
.withSync(new StandardSyncOutput()
.withStandardSyncSummary(new StandardSyncSummary()
.withRecordsSynced(RECORDS_EMITTED)
.withBytesSynced(BYTES_EMITTED)
.withTotalStats(new SyncStats()
.withRecordsEmitted(RECORDS_EMITTED)
.withBytesEmitted(BYTES_EMITTED)
.withStateMessagesEmitted(STATE_MESSAGES_EMITTED)
.withRecordsCommitted(RECORDS_COMMITTED))
.withStreamStats(Lists.newArrayList(new StreamSyncStats()
.withStreamName(STREAM_NAME)
.withStats(new SyncStats()
.withRecordsEmitted(RECORDS_EMITTED)
.withBytesEmitted(BYTES_EMITTED)
.withStateMessagesEmitted(STATE_MESSAGES_EMITTED)
.withRecordsCommitted(RECORDS_COMMITTED))))));

private JobConverter jobConverter;
private Job job;
Expand All @@ -63,6 +95,20 @@ class JobConverterTest {
.attempt(new AttemptRead()
.id(ATTEMPT_ID)
.status(io.airbyte.api.model.AttemptStatus.RUNNING)
.recordsSynced(RECORDS_EMITTED)
.bytesSynced(BYTES_EMITTED)
.totalStats(new AttemptStats()
.recordsEmitted(RECORDS_EMITTED)
.bytesEmitted(BYTES_EMITTED)
.stateMessagesEmitted(STATE_MESSAGES_EMITTED)
.recordsCommitted(RECORDS_COMMITTED))
.streamStats(Lists.newArrayList(new AttemptStreamStats()
.streamName(STREAM_NAME)
.stats(new AttemptStats()
.recordsEmitted(RECORDS_EMITTED)
.bytesEmitted(BYTES_EMITTED)
.stateMessagesEmitted(STATE_MESSAGES_EMITTED)
.recordsCommitted(RECORDS_COMMITTED))))
.updatedAt(CREATED_AT)
.createdAt(CREATED_AT)
.endedAt(CREATED_AT))
Expand All @@ -87,10 +133,12 @@ public void setUp() {
when(job.getAttempts()).thenReturn(Lists.newArrayList(attempt));
when(attempt.getId()).thenReturn(ATTEMPT_ID);
when(attempt.getStatus()).thenReturn(ATTEMPT_STATUS);
when(attempt.getOutput()).thenReturn(Optional.of(JOB_OUTPUT));
when(attempt.getLogPath()).thenReturn(LOG_PATH);
when(attempt.getCreatedAtInSecond()).thenReturn(CREATED_AT);
when(attempt.getUpdatedAtInSecond()).thenReturn(CREATED_AT);
when(attempt.getEndedAtInSecond()).thenReturn(Optional.of(CREATED_AT));

}

@Test
Expand Down
Loading

0 comments on commit a007953

Please sign in to comment.