Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add totalStats and streamStats in Attempts API response #9583

Merged
merged 2 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3114,6 +3114,37 @@ components:
recordsSynced:
type: integer
format: int64
totalStats:
$ref: "#/components/schemas/AttemptStats"
streamStats:
type: array
items:
$ref: "#/components/schemas/AttemptStreamStats"
AttemptStats:
type: object
properties:
recordsEmitted:
type: integer
format: int64
bytesEmitted:
type: integer
format: int64
stateMessagesEmitted:
type: integer
format: int64
recordsCommitted:
type: integer
format: int64
AttemptStreamStats:
type: object
required:
- streamName
- stats
properties:
streamName:
type: string
stats:
$ref: "#/components/schemas/AttemptStats"
AttemptStatus:
type: string
enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import io.airbyte.api.model.AttemptInfoRead;
import io.airbyte.api.model.AttemptRead;
import io.airbyte.api.model.AttemptStats;
import io.airbyte.api.model.AttemptStatus;
import io.airbyte.api.model.AttemptStreamStats;
import io.airbyte.api.model.JobConfigType;
import io.airbyte.api.model.JobInfoRead;
import io.airbyte.api.model.JobRead;
Expand All @@ -19,6 +21,8 @@
import io.airbyte.config.JobOutput;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.scheduler.client.SynchronousJobMetadata;
Expand All @@ -27,6 +31,8 @@
import io.airbyte.scheduler.models.Job;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class JobConverter {
Expand Down Expand Up @@ -72,21 +78,59 @@ public static AttemptRead getAttemptRead(final Attempt attempt) {
return new AttemptRead()
.id(attempt.getId())
.status(Enums.convertTo(attempt.getStatus(), AttemptStatus.class))
.bytesSynced(attempt.getOutput()
.bytesSynced(attempt.getOutput() // TODO (parker) remove after frontend switches to totalStats
.map(JobOutput::getSync)
.map(StandardSyncOutput::getStandardSyncSummary)
.map(StandardSyncSummary::getBytesSynced)
.orElse(null))
.recordsSynced(attempt.getOutput()
.recordsSynced(attempt.getOutput() // TODO (parker) remove after frontend switches to totalStats
.map(JobOutput::getSync)
.map(StandardSyncOutput::getStandardSyncSummary)
.map(StandardSyncSummary::getRecordsSynced)
.orElse(null))
.totalStats(getTotalAttemptStats(attempt))
.streamStats(getAttemptStreamStats(attempt))
.createdAt(attempt.getCreatedAtInSecond())
.updatedAt(attempt.getUpdatedAtInSecond())
.endedAt(attempt.getEndedAtInSecond().orElse(null));
}

public static AttemptStats getTotalAttemptStats(final Attempt attempt) {
final SyncStats totalStats = attempt.getOutput()
.map(JobOutput::getSync)
.map(StandardSyncOutput::getStandardSyncSummary)
.map(StandardSyncSummary::getTotalStats)
.orElse(null);

if (totalStats == null) {
return null;
}

return new AttemptStats()
.bytesEmitted(totalStats.getBytesEmitted())
.recordsEmitted(totalStats.getRecordsEmitted())
.stateMessagesEmitted(totalStats.getStateMessagesEmitted())
.recordsCommitted(totalStats.getRecordsCommitted());
}

public static List<AttemptStreamStats> getAttemptStreamStats(final Attempt attempt) {
final List<StreamSyncStats> streamStats = attempt.getOutput()
.map(JobOutput::getSync)
.map(StandardSyncOutput::getStandardSyncSummary)
.map(StandardSyncSummary::getStreamStats)
.orElse(Collections.emptyList());

return streamStats.stream()
.map(streamStat -> new AttemptStreamStats()
.streamName(streamStat.getStreamName())
.stats(new AttemptStats()
.bytesEmitted(streamStat.getStats().getBytesEmitted())
.recordsEmitted(streamStat.getStats().getRecordsEmitted())
.stateMessagesEmitted(streamStat.getStats().getStateMessagesEmitted())
.recordsCommitted(streamStat.getStats().getRecordsCommitted())))
.collect(Collectors.toList());
}

public LogRead getLogRead(final Path logPath) {
try {
return new LogRead().logLines(LogClientSingleton.getInstance().getJobLogFile(workerEnvironment, logConfigs, logPath));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.google.common.collect.Lists;
import io.airbyte.api.model.AttemptInfoRead;
import io.airbyte.api.model.AttemptRead;
import io.airbyte.api.model.AttemptStats;
import io.airbyte.api.model.AttemptStreamStats;
import io.airbyte.api.model.JobConfigType;
import io.airbyte.api.model.JobInfoRead;
import io.airbyte.api.model.JobRead;
Expand All @@ -21,6 +23,12 @@
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.JobCheckConnectionConfig;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobOutput;
import io.airbyte.config.JobOutput.OutputType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.scheduler.models.Attempt;
import io.airbyte.scheduler.models.AttemptStatus;
Expand All @@ -46,6 +54,30 @@ class JobConverterTest {
.withCheckConnection(new JobCheckConnectionConfig());
private static final Path LOG_PATH = Path.of("log_path");
private static final long CREATED_AT = System.currentTimeMillis() / 1000;
private static final long RECORDS_EMITTED = 15L;
private static final long BYTES_EMITTED = 100L;
private static final long RECORDS_COMMITTED = 10L;
private static final long STATE_MESSAGES_EMITTED = 2L;
private static final String STREAM_NAME = "stream1";

private static final JobOutput JOB_OUTPUT = new JobOutput()
.withOutputType(OutputType.SYNC)
.withSync(new StandardSyncOutput()
.withStandardSyncSummary(new StandardSyncSummary()
.withRecordsSynced(RECORDS_EMITTED)
.withBytesSynced(BYTES_EMITTED)
.withTotalStats(new SyncStats()
.withRecordsEmitted(RECORDS_EMITTED)
.withBytesEmitted(BYTES_EMITTED)
.withStateMessagesEmitted(STATE_MESSAGES_EMITTED)
.withRecordsCommitted(RECORDS_COMMITTED))
.withStreamStats(Lists.newArrayList(new StreamSyncStats()
.withStreamName(STREAM_NAME)
.withStats(new SyncStats()
.withRecordsEmitted(RECORDS_EMITTED)
.withBytesEmitted(BYTES_EMITTED)
.withStateMessagesEmitted(STATE_MESSAGES_EMITTED)
.withRecordsCommitted(RECORDS_COMMITTED))))));

private JobConverter jobConverter;
private Job job;
Expand All @@ -63,6 +95,20 @@ class JobConverterTest {
.attempt(new AttemptRead()
.id(ATTEMPT_ID)
.status(io.airbyte.api.model.AttemptStatus.RUNNING)
.recordsSynced(RECORDS_EMITTED)
.bytesSynced(BYTES_EMITTED)
.totalStats(new AttemptStats()
.recordsEmitted(RECORDS_EMITTED)
.bytesEmitted(BYTES_EMITTED)
.stateMessagesEmitted(STATE_MESSAGES_EMITTED)
.recordsCommitted(RECORDS_COMMITTED))
.streamStats(Lists.newArrayList(new AttemptStreamStats()
.streamName(STREAM_NAME)
.stats(new AttemptStats()
.recordsEmitted(RECORDS_EMITTED)
.bytesEmitted(BYTES_EMITTED)
.stateMessagesEmitted(STATE_MESSAGES_EMITTED)
.recordsCommitted(RECORDS_COMMITTED))))
.updatedAt(CREATED_AT)
.createdAt(CREATED_AT)
.endedAt(CREATED_AT))
Expand All @@ -87,10 +133,12 @@ public void setUp() {
when(job.getAttempts()).thenReturn(Lists.newArrayList(attempt));
when(attempt.getId()).thenReturn(ATTEMPT_ID);
when(attempt.getStatus()).thenReturn(ATTEMPT_STATUS);
when(attempt.getOutput()).thenReturn(Optional.of(JOB_OUTPUT));
when(attempt.getLogPath()).thenReturn(LOG_PATH);
when(attempt.getCreatedAtInSecond()).thenReturn(CREATED_AT);
when(attempt.getUpdatedAtInSecond()).thenReturn(CREATED_AT);
when(attempt.getEndedAtInSecond()).thenReturn(Optional.of(CREATED_AT));

}

@Test
Expand Down
Loading