diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 602b45f2e897..6086830250f4 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -3114,6 +3114,37 @@ components: recordsSynced: type: integer format: int64 + totalStats: + $ref: "#/components/schemas/AttemptStats" + streamStats: + type: array + items: + $ref: "#/components/schemas/AttemptStreamStats" + AttemptStats: + type: object + properties: + recordsEmitted: + type: integer + format: int64 + bytesEmitted: + type: integer + format: int64 + stateMessagesEmitted: + type: integer + format: int64 + recordsCommitted: + type: integer + format: int64 + AttemptStreamStats: + type: object + required: + - streamName + - stats + properties: + streamName: + type: string + stats: + $ref: "#/components/schemas/AttemptStats" AttemptStatus: type: string enum: diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index 9841a41d16eb..5c4f74b52e79 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -6,7 +6,9 @@ import io.airbyte.api.model.AttemptInfoRead; import io.airbyte.api.model.AttemptRead; +import io.airbyte.api.model.AttemptStats; import io.airbyte.api.model.AttemptStatus; +import io.airbyte.api.model.AttemptStreamStats; import io.airbyte.api.model.JobConfigType; import io.airbyte.api.model.JobInfoRead; import io.airbyte.api.model.JobRead; @@ -19,6 +21,8 @@ import io.airbyte.config.JobOutput; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; +import io.airbyte.config.StreamSyncStats; +import io.airbyte.config.SyncStats; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.scheduler.client.SynchronousJobMetadata; @@ -27,6 +31,8 @@ import io.airbyte.scheduler.models.Job; import java.io.IOException; import java.nio.file.Path; +import java.util.Collections; +import java.util.List; import java.util.stream.Collectors; public class JobConverter { @@ -72,21 +78,59 @@ public static AttemptRead getAttemptRead(final Attempt attempt) { return new AttemptRead() .id(attempt.getId()) .status(Enums.convertTo(attempt.getStatus(), AttemptStatus.class)) - .bytesSynced(attempt.getOutput() + .bytesSynced(attempt.getOutput() // TODO (parker) remove after frontend switches to totalStats .map(JobOutput::getSync) .map(StandardSyncOutput::getStandardSyncSummary) .map(StandardSyncSummary::getBytesSynced) .orElse(null)) - .recordsSynced(attempt.getOutput() + .recordsSynced(attempt.getOutput() // TODO (parker) remove after frontend switches to totalStats .map(JobOutput::getSync) .map(StandardSyncOutput::getStandardSyncSummary) .map(StandardSyncSummary::getRecordsSynced) .orElse(null)) + .totalStats(getTotalAttemptStats(attempt)) + .streamStats(getAttemptStreamStats(attempt)) .createdAt(attempt.getCreatedAtInSecond()) .updatedAt(attempt.getUpdatedAtInSecond()) .endedAt(attempt.getEndedAtInSecond().orElse(null)); } + public static AttemptStats getTotalAttemptStats(final Attempt attempt) { + final SyncStats totalStats = attempt.getOutput() + .map(JobOutput::getSync) + .map(StandardSyncOutput::getStandardSyncSummary) + .map(StandardSyncSummary::getTotalStats) + .orElse(null); + + if (totalStats == null) { + return null; + } + + return new AttemptStats() + .bytesEmitted(totalStats.getBytesEmitted()) + .recordsEmitted(totalStats.getRecordsEmitted()) + .stateMessagesEmitted(totalStats.getStateMessagesEmitted()) + .recordsCommitted(totalStats.getRecordsCommitted()); + } + + public static List getAttemptStreamStats(final Attempt attempt) { + final List streamStats = attempt.getOutput() + .map(JobOutput::getSync) + .map(StandardSyncOutput::getStandardSyncSummary) + .map(StandardSyncSummary::getStreamStats) + .orElse(Collections.emptyList()); + + return streamStats.stream() + .map(streamStat -> new AttemptStreamStats() + .streamName(streamStat.getStreamName()) + .stats(new AttemptStats() + .bytesEmitted(streamStat.getStats().getBytesEmitted()) + .recordsEmitted(streamStat.getStats().getRecordsEmitted()) + .stateMessagesEmitted(streamStat.getStats().getStateMessagesEmitted()) + .recordsCommitted(streamStat.getStats().getRecordsCommitted()))) + .collect(Collectors.toList()); + } + public LogRead getLogRead(final Path logPath) { try { return new LogRead().logLines(LogClientSingleton.getInstance().getJobLogFile(workerEnvironment, logConfigs, logPath)); diff --git a/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java b/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java index 7ed276081c53..37a9d8cab50b 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java @@ -12,6 +12,8 @@ import com.google.common.collect.Lists; import io.airbyte.api.model.AttemptInfoRead; import io.airbyte.api.model.AttemptRead; +import io.airbyte.api.model.AttemptStats; +import io.airbyte.api.model.AttemptStreamStats; import io.airbyte.api.model.JobConfigType; import io.airbyte.api.model.JobInfoRead; import io.airbyte.api.model.JobRead; @@ -21,6 +23,12 @@ import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.JobCheckConnectionConfig; import io.airbyte.config.JobConfig; +import io.airbyte.config.JobOutput; +import io.airbyte.config.JobOutput.OutputType; +import io.airbyte.config.StandardSyncOutput; +import io.airbyte.config.StandardSyncSummary; +import io.airbyte.config.StreamSyncStats; +import io.airbyte.config.SyncStats; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.scheduler.models.Attempt; import io.airbyte.scheduler.models.AttemptStatus; @@ -46,6 +54,30 @@ class JobConverterTest { .withCheckConnection(new JobCheckConnectionConfig()); private static final Path LOG_PATH = Path.of("log_path"); private static final long CREATED_AT = System.currentTimeMillis() / 1000; + private static final long RECORDS_EMITTED = 15L; + private static final long BYTES_EMITTED = 100L; + private static final long RECORDS_COMMITTED = 10L; + private static final long STATE_MESSAGES_EMITTED = 2L; + private static final String STREAM_NAME = "stream1"; + + private static final JobOutput JOB_OUTPUT = new JobOutput() + .withOutputType(OutputType.SYNC) + .withSync(new StandardSyncOutput() + .withStandardSyncSummary(new StandardSyncSummary() + .withRecordsSynced(RECORDS_EMITTED) + .withBytesSynced(BYTES_EMITTED) + .withTotalStats(new SyncStats() + .withRecordsEmitted(RECORDS_EMITTED) + .withBytesEmitted(BYTES_EMITTED) + .withStateMessagesEmitted(STATE_MESSAGES_EMITTED) + .withRecordsCommitted(RECORDS_COMMITTED)) + .withStreamStats(Lists.newArrayList(new StreamSyncStats() + .withStreamName(STREAM_NAME) + .withStats(new SyncStats() + .withRecordsEmitted(RECORDS_EMITTED) + .withBytesEmitted(BYTES_EMITTED) + .withStateMessagesEmitted(STATE_MESSAGES_EMITTED) + .withRecordsCommitted(RECORDS_COMMITTED)))))); private JobConverter jobConverter; private Job job; @@ -63,6 +95,20 @@ class JobConverterTest { .attempt(new AttemptRead() .id(ATTEMPT_ID) .status(io.airbyte.api.model.AttemptStatus.RUNNING) + .recordsSynced(RECORDS_EMITTED) + .bytesSynced(BYTES_EMITTED) + .totalStats(new AttemptStats() + .recordsEmitted(RECORDS_EMITTED) + .bytesEmitted(BYTES_EMITTED) + .stateMessagesEmitted(STATE_MESSAGES_EMITTED) + .recordsCommitted(RECORDS_COMMITTED)) + .streamStats(Lists.newArrayList(new AttemptStreamStats() + .streamName(STREAM_NAME) + .stats(new AttemptStats() + .recordsEmitted(RECORDS_EMITTED) + .bytesEmitted(BYTES_EMITTED) + .stateMessagesEmitted(STATE_MESSAGES_EMITTED) + .recordsCommitted(RECORDS_COMMITTED)))) .updatedAt(CREATED_AT) .createdAt(CREATED_AT) .endedAt(CREATED_AT)) @@ -87,10 +133,12 @@ public void setUp() { when(job.getAttempts()).thenReturn(Lists.newArrayList(attempt)); when(attempt.getId()).thenReturn(ATTEMPT_ID); when(attempt.getStatus()).thenReturn(ATTEMPT_STATUS); + when(attempt.getOutput()).thenReturn(Optional.of(JOB_OUTPUT)); when(attempt.getLogPath()).thenReturn(LOG_PATH); when(attempt.getCreatedAtInSecond()).thenReturn(CREATED_AT); when(attempt.getUpdatedAtInSecond()).thenReturn(CREATED_AT); when(attempt.getEndedAtInSecond()).thenReturn(Optional.of(CREATED_AT)); + } @Test diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index bff6feaf4560..4e9ff250a302 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -1023,9 +1023,32 @@

Example data

}, "attempts" : [ { "attempt" : { + "totalStats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, "createdAt" : 5, "bytesSynced" : 9, "endedAt" : 7, + "streamStats" : [ { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + }, { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + } ], "id" : 5, "recordsSynced" : 3, "updatedAt" : 2 @@ -1035,9 +1058,32 @@

Example data

} }, { "attempt" : { + "totalStats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, "createdAt" : 5, "bytesSynced" : 9, "endedAt" : 7, + "streamStats" : [ { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + }, { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + } ], "id" : 5, "recordsSynced" : 3, "updatedAt" : 2 @@ -1262,9 +1308,32 @@

Example data

}, "attempts" : [ { "attempt" : { + "totalStats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, "createdAt" : 5, "bytesSynced" : 9, "endedAt" : 7, + "streamStats" : [ { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + }, { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + } ], "id" : 5, "recordsSynced" : 3, "updatedAt" : 2 @@ -1274,9 +1343,32 @@

Example data

} }, { "attempt" : { + "totalStats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, "createdAt" : 5, "bytesSynced" : 9, "endedAt" : 7, + "streamStats" : [ { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + }, { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + } ], "id" : 5, "recordsSynced" : 3, "updatedAt" : 2 @@ -2845,9 +2937,32 @@

Example data

}, "attempts" : [ { "attempt" : { + "totalStats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, "createdAt" : 5, "bytesSynced" : 9, "endedAt" : 7, + "streamStats" : [ { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + }, { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + } ], "id" : 5, "recordsSynced" : 3, "updatedAt" : 2 @@ -2857,9 +2972,32 @@

Example data

} }, { "attempt" : { + "totalStats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, "createdAt" : 5, "bytesSynced" : 9, "endedAt" : 7, + "streamStats" : [ { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + }, { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + } ], "id" : 5, "recordsSynced" : 3, "updatedAt" : 2 @@ -2933,9 +3071,32 @@

Example data

}, "attempts" : [ { "attempt" : { + "totalStats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, "createdAt" : 5, "bytesSynced" : 9, "endedAt" : 7, + "streamStats" : [ { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + }, { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + } ], "id" : 5, "recordsSynced" : 3, "updatedAt" : 2 @@ -2945,9 +3106,32 @@

Example data

} }, { "attempt" : { + "totalStats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, "createdAt" : 5, "bytesSynced" : 9, "endedAt" : 7, + "streamStats" : [ { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + }, { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + } ], "id" : 5, "recordsSynced" : 3, "updatedAt" : 2 @@ -3021,16 +3205,62 @@

Example data

"updatedAt" : 1 }, "attempts" : [ { + "totalStats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, "createdAt" : 5, "bytesSynced" : 9, "endedAt" : 7, + "streamStats" : [ { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + }, { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + } ], "id" : 5, "recordsSynced" : 3, "updatedAt" : 2 }, { + "totalStats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, "createdAt" : 5, "bytesSynced" : 9, "endedAt" : 7, + "streamStats" : [ { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + }, { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + } ], "id" : 5, "recordsSynced" : 3, "updatedAt" : 2 @@ -3043,16 +3273,62 @@

Example data

"updatedAt" : 1 }, "attempts" : [ { + "totalStats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, "createdAt" : 5, "bytesSynced" : 9, "endedAt" : 7, + "streamStats" : [ { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + }, { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + } ], "id" : 5, "recordsSynced" : 3, "updatedAt" : 2 }, { + "totalStats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, "createdAt" : 5, "bytesSynced" : 9, "endedAt" : 7, + "streamStats" : [ { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + }, { + "stats" : { + "stateMessagesEmitted" : 7, + "recordsCommitted" : 1, + "bytesEmitted" : 4, + "recordsEmitted" : 2 + }, + "streamName" : "streamName" + } ], "id" : 5, "recordsSynced" : 3, "updatedAt" : 2 @@ -7033,7 +7309,9 @@

Table of Contents

  • AirbyteStreamConfiguration -
  • AttemptInfoRead -
  • AttemptRead -
  • +
  • AttemptStats -
  • AttemptStatus -
  • +
  • AttemptStreamStats -
  • AuthSpecification -
  • CheckConnectionRead -
  • CheckOperationRead -
  • @@ -7213,6 +7491,18 @@

    AttemptRead - endedAt (optional)
    Long format: int64
    bytesSynced (optional)
    Long format: int64
    recordsSynced (optional)
    Long format: int64
    +
    totalStats (optional)
    +
    streamStats (optional)
    + + +
    +

    AttemptStats - Up

    +
    +
    +
    recordsEmitted (optional)
    Long format: int64
    +
    bytesEmitted (optional)
    Long format: int64
    +
    stateMessagesEmitted (optional)
    Long format: int64
    +
    recordsCommitted (optional)
    Long format: int64
    @@ -7221,6 +7511,14 @@

    AttemptStatus -

    +
    +

    AttemptStreamStats - Up

    +
    +
    +
    streamName
    +
    stats
    +
    +