From d98ddbbae7ee55cc1b7298802f100d0940bcb692 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 28 Dec 2022 13:47:42 -0800 Subject: [PATCH] Implement Progress Bar Persistence Read/Write (#20787) Implement the persistence layer changes following #19191. This PR handles writing and reading stats to the new stream stat_table and columns in the existing sync_stats table. At the same time we introduce upserts of stats records - i.e. merge updates into a single record - in preparation for real time stats updates vs the current approach where a new stat record is always written. There will be two remaining PRs after this: - First PR will be to fully wire up and test the API. - Second PR will be to actually save stats while jobs are running. --- airbyte-api/src/main/openapi/config.yaml | 2 + .../airbyte/bootloader/BootloaderAppTest.java | 2 +- airbyte-commons/src/main/resources/log4j2.xml | 5 + .../src/main/resources/types/SyncStats.yaml | 36 +++-- ...V0_40_26_001__CorrectStreamStatsTable.java | 42 ++++++ .../resources/jobs_database/schema_dump.txt | 11 +- .../job/DefaultJobPersistence.java | 138 +++++++++++++++-- .../persistence/job/JobPersistence.java | 30 +++- .../job/DefaultJobPersistenceTest.java | 139 +++++++++++++++++- .../server/apis/AttemptApiController.java | 4 +- .../server/handlers/AttemptHandler.java | 29 ++++ .../api/generated-api-html/index.html | 29 ++++ 12 files changed, 428 insertions(+), 39 deletions(-) create mode 100644 airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_26_001__CorrectStreamStatsTable.java diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 4883fb4acc39..fbaa8c6a0688 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -3970,6 +3970,8 @@ components: properties: streamName: type: string + streamNamespace: + type: string stats: $ref: "#/components/schemas/AttemptStats" AttemptFailureSummary: diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index 65566dd091a5..8f184141bd2e 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -137,7 +137,7 @@ void testBootloaderAppBlankDb() throws Exception { bootloader.load(); val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); - assertEquals("0.40.18.002", jobsMigrator.getLatestMigration().getVersion().getVersion()); + assertEquals("0.40.26.001", jobsMigrator.getLatestMigration().getVersion().getVersion()); val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); // this line should change with every new migration diff --git a/airbyte-commons/src/main/resources/log4j2.xml b/airbyte-commons/src/main/resources/log4j2.xml index c1427f510fe7..22d52667d0ee 100644 --- a/airbyte-commons/src/main/resources/log4j2.xml +++ b/airbyte-commons/src/main/resources/log4j2.xml @@ -209,6 +209,11 @@ If a database has tons of DDL queries, the logs would be filled with such messages--> + + + + + diff --git a/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml b/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml index 6410a3695292..5dc9e1a0feae 100644 --- a/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml +++ b/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml @@ -9,18 +9,23 @@ required: - bytesEmitted additionalProperties: true properties: - recordsEmitted: - type: integer bytesEmitted: type: integer - sourceStateMessagesEmitted: - description: Number of State messages emitted by the Source Connector - type: integer destinationStateMessagesEmitted: description: Number of State messages emitted by the Destination Connector type: integer - recordsCommitted: - type: integer # if unset, committed records could not be computed + destinationWriteEndTime: + type: integer + description: The exit time of the destination container/pod + destinationWriteStartTime: + type: integer + description: The boot time of the destination container/pod + estimatedBytes: + type: integer + description: The total estimated number of bytes for the sync + estimatedRecords: + type: integer + description: The total estimated number of records for the sync meanSecondsBeforeSourceStateMessageEmitted: type: integer maxSecondsBeforeSourceStateMessageEmitted: @@ -29,21 +34,22 @@ properties: type: integer meanSecondsBetweenStateMessageEmittedandCommitted: type: integer - replicationStartTime: + recordsEmitted: type: integer - description: The start of the replication activity + recordsCommitted: + type: integer # if unset, committed records could not be computed replicationEndTime: type: integer description: The end of the replication activity - sourceReadStartTime: + replicationStartTime: type: integer - description: The boot time of the source container/pod + description: The start of the replication activity sourceReadEndTime: type: integer description: The exit time of the source container/pod - destinationWriteStartTime: + sourceReadStartTime: type: integer - description: The boot time of the destination container/pod - destinationWriteEndTime: + description: The boot time of the source container/pod + sourceStateMessagesEmitted: + description: Number of State messages emitted by the Source Connector type: integer - description: The exit time of the destination container/pod diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_26_001__CorrectStreamStatsTable.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_26_001__CorrectStreamStatsTable.java new file mode 100644 index 000000000000..f3087889d24d --- /dev/null +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_26_001__CorrectStreamStatsTable.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.jobs.migrations; + +import static org.jooq.impl.DSL.constraint; + +import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.flywaydb.core.api.migration.Context; +import org.jooq.DSLContext; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class V0_40_26_001__CorrectStreamStatsTable extends BaseJavaMigration { + + private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_26_001__CorrectStreamStatsTable.class); + + @Override + public void migrate(final Context context) throws Exception { + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + + // Warning: please do not use any jOOQ generated code to write a migration. + // As database schema changes, the generated jOOQ code can be deprecated. So + // old migration may not compile if there is any generated code. + try (final DSLContext ctx = DSL.using(context.getConnection())) { + // This actually needs to be bigint to match the id column on the attempts table. + String streamStats = "stream_stats"; + ctx.alterTable(streamStats).alter("attempt_id").set(SQLDataType.BIGINT.nullable(false)).execute(); + // Not all streams provide a namespace. + ctx.alterTable(streamStats).alter("stream_namespace").set(SQLDataType.VARCHAR.nullable(true)).execute(); + + // The constraint should also take into account the stream namespace. Drop the constraint and + // recreate it. + ctx.alterTable(streamStats).dropUnique("stream_stats_attempt_id_stream_name_key").execute(); + ctx.alterTable(streamStats).add(constraint("uniq_stream_attempt").unique("attempt_id", "stream_name", "stream_namespace")).execute(); + } + } + +} diff --git a/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt b/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt index 628285ccbd3b..100af44d0893 100644 --- a/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt +++ b/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt @@ -63,8 +63,8 @@ create table "public"."normalization_summaries"( ); create table "public"."stream_stats"( "id" uuid not null, - "attempt_id" int4 not null, - "stream_namespace" varchar(2147483647) not null, + "attempt_id" int8 not null, + "stream_namespace" varchar(2147483647) null, "stream_name" varchar(2147483647) not null, "records_emitted" int8 null, "bytes_emitted" int8 null, @@ -122,10 +122,11 @@ create index "jobs_status_idx" on "public"."jobs"("status" asc); create unique index "normalization_summaries_pkey" on "public"."normalization_summaries"("id" asc); create index "normalization_summary_attempt_id_idx" on "public"."normalization_summaries"("attempt_id" asc); create index "index" on "public"."stream_stats"("attempt_id" asc); -create unique index "stream_stats_attempt_id_stream_name_key" on "public"."stream_stats"( +create unique index "stream_stats_pkey" on "public"."stream_stats"("id" asc); +create unique index "uniq_stream_attempt" on "public"."stream_stats"( "attempt_id" asc, - "stream_name" asc + "stream_name" asc, + "stream_namespace" asc ); -create unique index "stream_stats_pkey" on "public"."stream_stats"("id" asc); create index "attempt_id_idx" on "public"."sync_stats"("attempt_id" asc); create unique index "sync_stats_pkey" on "public"."sync_stats"("id" asc); diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index cd39b41e59f1..1c5dd8253cdb 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -7,6 +7,7 @@ import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS; import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS; import static io.airbyte.db.instance.jobs.jooq.generated.Tables.NORMALIZATION_SUMMARIES; +import static io.airbyte.db.instance.jobs.jooq.generated.Tables.STREAM_STATS; import static io.airbyte.db.instance.jobs.jooq.generated.Tables.SYNC_STATS; import com.fasterxml.jackson.core.JsonProcessingException; @@ -33,7 +34,9 @@ import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; import io.airbyte.config.NormalizationSummary; +import io.airbyte.config.StreamSyncStats; import io.airbyte.config.SyncStats; +import io.airbyte.config.persistence.PersistenceHelpers; import io.airbyte.db.Database; import io.airbyte.db.ExceptionWrappingDatabase; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; @@ -337,8 +340,6 @@ public Optional getAttemptTemporalWorkflowId(final long jobId, final int public void writeOutput(final long jobId, final int attemptNumber, final JobOutput output) throws IOException { final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); - final SyncStats syncStats = output.getSync().getStandardSyncSummary().getTotalStats(); - final NormalizationSummary normalizationSummary = output.getSync().getNormalizationSummary(); jobDatabase.transaction(ctx -> { ctx.update(ATTEMPTS) @@ -348,10 +349,12 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp .execute(); final Long attemptId = getAttemptId(jobId, attemptNumber, ctx); + final SyncStats syncStats = output.getSync().getStandardSyncSummary().getTotalStats(); if (syncStats != null) { - writeSyncStats(now, syncStats, attemptId, ctx); + saveToSyncStatsTable(now, syncStats, attemptId, ctx); } + final NormalizationSummary normalizationSummary = output.getSync().getNormalizationSummary(); if (normalizationSummary != null) { ctx.insertInto(NORMALIZATION_SUMMARIES) .set(NORMALIZATION_SUMMARIES.ID, UUID.randomUUID()) @@ -369,14 +372,65 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp } - private static void writeSyncStats(final OffsetDateTime now, final SyncStats syncStats, final Long attemptId, final DSLContext ctx) { + @Override + public void writeStats(final long jobId, + final int attemptNumber, + final long estimatedRecords, + final long estimatedBytes, + final long recordsEmitted, + final long bytesEmitted, + final List streamStats) + throws IOException { + final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); + jobDatabase.transaction(ctx -> { + final var attemptId = getAttemptId(jobId, attemptNumber, ctx); + + final var syncStats = new SyncStats() + .withEstimatedRecords(estimatedRecords) + .withEstimatedBytes(estimatedBytes) + .withRecordsEmitted(recordsEmitted) + .withBytesEmitted(bytesEmitted); + saveToSyncStatsTable(now, syncStats, attemptId, ctx); + + saveToStreamStatsTable(now, streamStats, attemptId, ctx); + return null; + }); + + } + + private static void saveToSyncStatsTable(final OffsetDateTime now, final SyncStats syncStats, final Long attemptId, final DSLContext ctx) { + // Although JOOQ supports upsert using the onConflict statement, we cannot use it as the table + // currently has duplicate records and also doesn't contain the unique constraint on the attempt_id + // column JOOQ requires. We are forced to check for existence. + final var isExisting = ctx.fetchExists(SYNC_STATS, SYNC_STATS.ATTEMPT_ID.eq(attemptId)); + if (isExisting) { + ctx.update(SYNC_STATS) + .set(SYNC_STATS.UPDATED_AT, now) + .set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted()) + .set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted()) + .set(SYNC_STATS.ESTIMATED_RECORDS, syncStats.getEstimatedRecords()) + .set(SYNC_STATS.ESTIMATED_BYTES, syncStats.getEstimatedBytes()) + .set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsCommitted()) + .set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted()) + .set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted()) + .set(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMaxSecondsBeforeSourceStateMessageEmitted()) + .set(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMeanSecondsBeforeSourceStateMessageEmitted()) + .set(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted()) + .set(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted()) + .where(SYNC_STATS.ATTEMPT_ID.eq(attemptId)) + .execute(); + return; + } + ctx.insertInto(SYNC_STATS) .set(SYNC_STATS.ID, UUID.randomUUID()) - .set(SYNC_STATS.UPDATED_AT, now) .set(SYNC_STATS.CREATED_AT, now) .set(SYNC_STATS.ATTEMPT_ID, attemptId) + .set(SYNC_STATS.UPDATED_AT, now) .set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted()) .set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted()) + .set(SYNC_STATS.ESTIMATED_RECORDS, syncStats.getEstimatedRecords()) + .set(SYNC_STATS.ESTIMATED_BYTES, syncStats.getEstimatedBytes()) .set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsCommitted()) .set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted()) .set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted()) @@ -387,6 +441,50 @@ private static void writeSyncStats(final OffsetDateTime now, final SyncStats syn .execute(); } + private static void saveToStreamStatsTable(final OffsetDateTime now, + final List perStreamStats, + final Long attemptId, + final DSLContext ctx) { + Optional.ofNullable(perStreamStats).orElse(Collections.emptyList()).forEach( + streamStats -> { + // We cannot entirely rely on JOOQ's generated SQL for upserts as it does not support null fields + // for conflict detection. We are forced to separately check for existence. + final var stats = streamStats.getStats(); + final var isExisting = + ctx.fetchExists(STREAM_STATS, STREAM_STATS.ATTEMPT_ID.eq(attemptId).and(STREAM_STATS.STREAM_NAME.eq(streamStats.getStreamName())) + .and(PersistenceHelpers.isNullOrEquals(STREAM_STATS.STREAM_NAMESPACE, streamStats.getStreamNamespace()))); + if (isExisting) { + ctx.update(STREAM_STATS) + .set(STREAM_STATS.UPDATED_AT, now) + .set(STREAM_STATS.BYTES_EMITTED, stats.getBytesEmitted()) + .set(STREAM_STATS.RECORDS_EMITTED, stats.getRecordsEmitted()) + .set(STREAM_STATS.ESTIMATED_RECORDS, stats.getEstimatedRecords()) + .set(STREAM_STATS.ESTIMATED_BYTES, stats.getEstimatedBytes()) + .where(STREAM_STATS.ATTEMPT_ID.eq(attemptId)) + .execute(); + return; + } + + ctx.insertInto(STREAM_STATS) + .set(STREAM_STATS.ID, UUID.randomUUID()) + .set(STREAM_STATS.ATTEMPT_ID, attemptId) + .set(STREAM_STATS.STREAM_NAME, streamStats.getStreamName()) + .set(STREAM_STATS.STREAM_NAMESPACE, streamStats.getStreamNamespace()) + .set(STREAM_STATS.CREATED_AT, now) + .set(STREAM_STATS.UPDATED_AT, now) + .set(STREAM_STATS.BYTES_EMITTED, stats.getBytesEmitted()) + .set(STREAM_STATS.RECORDS_EMITTED, stats.getRecordsEmitted()) + .set(STREAM_STATS.ESTIMATED_BYTES, stats.getEstimatedBytes()) + .set(STREAM_STATS.ESTIMATED_RECORDS, stats.getEstimatedRecords()) + .set(STREAM_STATS.UPDATED_AT, now) + .set(STREAM_STATS.BYTES_EMITTED, stats.getBytesEmitted()) + .set(STREAM_STATS.RECORDS_EMITTED, stats.getRecordsEmitted()) + .set(STREAM_STATS.ESTIMATED_BYTES, stats.getEstimatedBytes()) + .set(STREAM_STATS.ESTIMATED_RECORDS, stats.getEstimatedRecords()) + .execute(); + }); + } + @Override public void writeAttemptFailureSummary(final long jobId, final int attemptNumber, final AttemptFailureSummary failureSummary) throws IOException { final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); @@ -400,14 +498,16 @@ public void writeAttemptFailureSummary(final long jobId, final int attemptNumber } @Override - public List getSyncStats(final long jobId, final int attemptNumber) throws IOException { + public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) throws IOException { return jobDatabase .query(ctx -> { final Long attemptId = getAttemptId(jobId, attemptNumber, ctx); - return ctx.select(DSL.asterisk()).from(DSL.table("sync_stats")).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId)) - .fetch(getSyncStatsRecordMapper()) - .stream() - .toList(); + final var syncStats = ctx.select(DSL.asterisk()).from(SYNC_STATS).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId)) + .orderBy(SYNC_STATS.UPDATED_AT.desc()) + .fetchOne(getSyncStatsRecordMapper()); + final var perStreamStats = ctx.select(DSL.asterisk()).from(STREAM_STATS).where(STREAM_STATS.ATTEMPT_ID.eq(attemptId)) + .fetch(getStreamStatsRecordsMapper()); + return new AttemptStats(syncStats, perStreamStats); }); } @@ -423,7 +523,8 @@ public List getNormalizationSummary(final long jobId, fina }); } - private static Long getAttemptId(final long jobId, final int attemptNumber, final DSLContext ctx) { + @VisibleForTesting + static Long getAttemptId(final long jobId, final int attemptNumber, final DSLContext ctx) { final Optional record = ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId, attemptNumber).stream().findFirst(); @@ -432,6 +533,7 @@ private static Long getAttemptId(final long jobId, final int attemptNumber, fina private static RecordMapper getSyncStatsRecordMapper() { return record -> new SyncStats().withBytesEmitted(record.get(SYNC_STATS.BYTES_EMITTED)).withRecordsEmitted(record.get(SYNC_STATS.RECORDS_EMITTED)) + .withEstimatedBytes(record.get(SYNC_STATS.ESTIMATED_BYTES)).withEstimatedRecords(record.get(SYNC_STATS.ESTIMATED_RECORDS)) .withSourceStateMessagesEmitted(record.get(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED)) .withDestinationStateMessagesEmitted(record.get(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED)) .withRecordsCommitted(record.get(SYNC_STATS.RECORDS_COMMITTED)) @@ -441,8 +543,19 @@ private static RecordMapper getSyncStatsRecordMapper() { .withMaxSecondsBetweenStateMessageEmittedandCommitted(record.get(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED)); } + private static RecordMapper getStreamStatsRecordsMapper() { + return record -> { + final var stats = new SyncStats() + .withEstimatedRecords(record.get(STREAM_STATS.ESTIMATED_RECORDS)).withEstimatedBytes(record.get(STREAM_STATS.ESTIMATED_BYTES)) + .withRecordsEmitted(record.get(STREAM_STATS.RECORDS_EMITTED)).withBytesEmitted(record.get(STREAM_STATS.BYTES_EMITTED)); + return new StreamSyncStats() + .withStreamName(record.get(STREAM_STATS.STREAM_NAME)).withStreamNamespace(record.get(STREAM_STATS.STREAM_NAMESPACE)) + .withStats(stats); + }; + } + private static RecordMapper getNormalizationSummaryRecordMapper() { - final RecordMapper recordMapper = record -> { + return record -> { try { return new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli()) .withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli()) @@ -451,7 +564,6 @@ private static RecordMapper getNormalizationSummar throw new RuntimeException(e); } }; - return recordMapper; } private static List deserializeFailureReasons(final Record record) throws JsonProcessingException { diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java index 96a526249af6..895382c40b20 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java @@ -12,6 +12,7 @@ import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; import io.airbyte.config.NormalizationSummary; +import io.airbyte.config.StreamSyncStats; import io.airbyte.config.SyncStats; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; import io.airbyte.persistence.job.models.AttemptNormalizationStatus; @@ -36,7 +37,25 @@ */ public interface JobPersistence { - List getSyncStats(long jobId, int attemptNumber) throws IOException; + // + // SIMPLE GETTERS + // + + /** + * Convenience POJO for various stats data structures. + * + * @param combinedStats + * @param perStreamStats + */ + record AttemptStats(SyncStats combinedStats, List perStreamStats) {} + + /** + * Retrieve the combined and per stream stats for a single attempt. + * + * @return {@link AttemptStats} + * @throws IOException + */ + AttemptStats getAttemptStats(long jobId, int attemptNumber) throws IOException; List getNormalizationSummary(long jobId, int attemptNumber) throws IOException; @@ -138,6 +157,15 @@ public interface JobPersistence { */ void writeOutput(long jobId, int attemptNumber, JobOutput output) throws IOException; + void writeStats(long jobId, + int attemptNumber, + long estimatedRecords, + long estimatedBytes, + long recordsEmitted, + long bytesEmitted, + List streamStats) + throws IOException; + /** * Writes a summary of all failures that occurred during the attempt. * diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java index 34adf975fcf3..0b48aaf036c4 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java @@ -7,6 +7,7 @@ import static io.airbyte.db.instance.jobs.jooq.generated.Tables.AIRBYTE_METADATA; import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS; import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS; +import static io.airbyte.db.instance.jobs.jooq.generated.Tables.STREAM_STATS; import static io.airbyte.db.instance.jobs.jooq.generated.Tables.SYNC_STATS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -39,12 +40,14 @@ import io.airbyte.config.NormalizationSummary; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; +import io.airbyte.config.StreamSyncStats; import io.airbyte.config.SyncStats; import io.airbyte.db.Database; import io.airbyte.db.factory.DSLContextFactory; import io.airbyte.db.factory.DataSourceFactory; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; import io.airbyte.db.instance.test.TestDatabaseProviders; +import io.airbyte.persistence.job.JobPersistence.AttemptStats; import io.airbyte.persistence.job.models.Attempt; import io.airbyte.persistence.job.models.AttemptStatus; import io.airbyte.persistence.job.models.AttemptWithJobInfo; @@ -88,7 +91,7 @@ import org.junit.jupiter.params.provider.CsvSource; import org.testcontainers.containers.PostgreSQLContainer; -@SuppressWarnings("PMD.JUnitTestsShouldIncludeAssert") +@SuppressWarnings({"PMD.JUnitTestsShouldIncludeAssert", "PMD.AvoidDuplicateLiterals"}) @DisplayName("DefaultJobPersistance") class DefaultJobPersistenceTest { @@ -288,7 +291,7 @@ void testWriteOutput() throws IOException { assertEquals(Optional.of(jobOutput), updated.getAttempts().get(0).getOutput()); assertNotEquals(created.getAttempts().get(0).getUpdatedAtInSecond(), updated.getAttempts().get(0).getUpdatedAtInSecond()); - final SyncStats storedSyncStats = jobPersistence.getSyncStats(jobId, attemptNumber).stream().findFirst().get(); + final SyncStats storedSyncStats = jobPersistence.getAttemptStats(jobId, attemptNumber).combinedStats(); assertEquals(100L, storedSyncStats.getBytesEmitted()); assertEquals(9L, storedSyncStats.getRecordsEmitted()); assertEquals(10L, storedSyncStats.getRecordsCommitted()); @@ -322,6 +325,138 @@ void testWriteAttemptFailureSummary() throws IOException { assertNotEquals(created.getAttempts().get(0).getUpdatedAtInSecond(), updated.getAttempts().get(0).getUpdatedAtInSecond()); } + @Nested + @DisplayName("Test writing in progress stats") + class WriteStats { + + @Test + @DisplayName("Writing stats the first time should only write record and bytes information correctly") + void testWriteStatsFirst() throws IOException { + final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); + final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); + final var streamStats = List.of( + new StreamSyncStats().withStreamName("name1").withStreamNamespace("ns") + .withStats(new SyncStats().withBytesEmitted(500L).withRecordsEmitted(500L).withEstimatedBytes(10000L).withEstimatedRecords(2000L)), + new StreamSyncStats().withStreamName("name2").withStreamNamespace("ns") + .withStats(new SyncStats().withBytesEmitted(500L).withRecordsEmitted(500L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); + jobPersistence.writeStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, streamStats); + + final AttemptStats stats = jobPersistence.getAttemptStats(jobId, attemptNumber); + final var combined = stats.combinedStats(); + assertEquals(1000, combined.getBytesEmitted()); + assertEquals(1000, combined.getRecordsEmitted()); + assertEquals(1000, combined.getEstimatedBytes()); + assertEquals(1000, combined.getEstimatedRecords()); + + // As of this writing, committed and state messages are not expected. + assertEquals(null, combined.getRecordsCommitted()); + assertEquals(null, combined.getDestinationStateMessagesEmitted()); + + final var actStreamStats = stats.perStreamStats(); + assertEquals(2, actStreamStats.size()); + assertEquals(streamStats, actStreamStats); + } + + @Test + @DisplayName("Writing stats multiple times should write record and bytes information correctly without exceptions") + void testWriteStatsRepeated() throws IOException { + final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); + final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); + + // First write. + var streamStats = List.of( + new StreamSyncStats().withStreamName("name1").withStreamNamespace("ns") + .withStats(new SyncStats().withBytesEmitted(500L).withRecordsEmitted(500L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); + jobPersistence.writeStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, streamStats); + + // Second write. + when(timeSupplier.get()).thenReturn(Instant.now()); + streamStats = List.of( + new StreamSyncStats().withStreamName("name1").withStreamNamespace("ns") + .withStats(new SyncStats().withBytesEmitted(1000L).withRecordsEmitted(1000L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); + jobPersistence.writeStats(jobId, attemptNumber, 2000, 2000, 2000, 2000, streamStats); + + final AttemptStats stats = jobPersistence.getAttemptStats(jobId, attemptNumber); + final var combined = stats.combinedStats(); + assertEquals(2000, combined.getBytesEmitted()); + assertEquals(2000, combined.getRecordsEmitted()); + assertEquals(2000, combined.getEstimatedBytes()); + assertEquals(2000, combined.getEstimatedRecords()); + + final var actStreamStats = stats.perStreamStats(); + assertEquals(1, actStreamStats.size()); + assertEquals(streamStats, actStreamStats); + + } + + @Test + @DisplayName("Writing multiple stats of the same attempt id, stream name and namespace should update the previous record") + void testWriteStatsUpsert() throws IOException, SQLException { + final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); + final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); + + // First write. + var streamStats = List.of( + new StreamSyncStats().withStreamName("name1").withStreamNamespace("ns") + .withStats(new SyncStats().withBytesEmitted(500L).withRecordsEmitted(500L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); + jobPersistence.writeStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, streamStats); + + // Second write. + when(timeSupplier.get()).thenReturn(Instant.now()); + streamStats = List.of( + new StreamSyncStats().withStreamName("name1").withStreamNamespace("ns") + .withStats(new SyncStats().withBytesEmitted(1000L).withRecordsEmitted(1000L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); + jobPersistence.writeStats(jobId, attemptNumber, 2000, 2000, 2000, 2000, streamStats); + + final var syncStatsRec = jobDatabase.query(ctx -> { + final var attemptId = DefaultJobPersistence.getAttemptId(jobId, attemptNumber, ctx); + return ctx.fetch("SELECT * from sync_stats where attempt_id = ?", attemptId).stream().findFirst().get(); + }); + + // Check time stamps to confirm upsert. + assertNotEquals(syncStatsRec.get(SYNC_STATS.CREATED_AT), syncStatsRec.get(SYNC_STATS.UPDATED_AT)); + + final var streamStatsRec = jobDatabase.query(ctx -> { + final var attemptId = DefaultJobPersistence.getAttemptId(jobId, attemptNumber, ctx); + return ctx.fetch("SELECT * from stream_stats where attempt_id = ?", attemptId).stream().findFirst().get(); + }); + // Check time stamps to confirm upsert. + assertNotEquals(streamStatsRec.get(STREAM_STATS.CREATED_AT), streamStatsRec.get(STREAM_STATS.UPDATED_AT)); + } + + @Test + @DisplayName("Writing multiple stats a stream with null namespace should write correctly without exceptions") + void testWriteNullNamespace() throws IOException { + final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); + final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); + + // First write. + var streamStats = List.of( + new StreamSyncStats().withStreamName("name1") + .withStats(new SyncStats().withBytesEmitted(500L).withRecordsEmitted(500L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); + jobPersistence.writeStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, streamStats); + + // Second write. + when(timeSupplier.get()).thenReturn(Instant.now()); + streamStats = List.of( + new StreamSyncStats().withStreamName("name1") + .withStats(new SyncStats().withBytesEmitted(1000L).withRecordsEmitted(1000L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); + jobPersistence.writeStats(jobId, attemptNumber, 2000, 2000, 2000, 2000, streamStats); + + final AttemptStats stats = jobPersistence.getAttemptStats(jobId, attemptNumber); + final var combined = stats.combinedStats(); + assertEquals(2000, combined.getBytesEmitted()); + assertEquals(2000, combined.getRecordsEmitted()); + assertEquals(2000, combined.getEstimatedBytes()); + assertEquals(2000, combined.getEstimatedRecords()); + + final var actStreamStats = stats.perStreamStats(); + assertEquals(1, actStreamStats.size()); + assertEquals(streamStats, actStreamStats); + } + + } + @Test @DisplayName("When getting the last replication job should return the most recently created job") void testGetLastSyncJobWithMultipleAttempts() throws IOException { diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java index e5856dc8848d..66dd9cded057 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java @@ -21,8 +21,8 @@ public AttemptApiController(final AttemptHandler attemptHandler) { } @Override - public InternalOperationResult saveStats(final SaveStatsRequestBody saveStatsRequestBody) { - throw new UnsupportedOperationException(); + public InternalOperationResult saveStats(final SaveStatsRequestBody requestBody) { + return ApiHelper.execute(() -> attemptHandler.saveStats(requestBody)); } @Override diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/AttemptHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/AttemptHandler.java index 7c7470218818..95579074ed08 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/AttemptHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/AttemptHandler.java @@ -5,9 +5,13 @@ package io.airbyte.server.handlers; import io.airbyte.api.model.generated.InternalOperationResult; +import io.airbyte.api.model.generated.SaveStatsRequestBody; import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody; +import io.airbyte.config.StreamSyncStats; +import io.airbyte.config.SyncStats; import io.airbyte.persistence.job.JobPersistence; import java.io.IOException; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,4 +36,29 @@ public InternalOperationResult setWorkflowInAttempt(SetWorkflowInAttemptRequestB return new InternalOperationResult().succeeded(true); } + public InternalOperationResult saveStats(final SaveStatsRequestBody requestBody) { + try { + final var stats = requestBody.getStats(); + final var streamStats = requestBody.getStreamStats().stream() + .map(s -> new StreamSyncStats() + .withStreamName(s.getStreamName()) + .withStreamNamespace(s.getStreamNamespace()) + .withStats(new SyncStats() + .withBytesEmitted(s.getStats().getBytesEmitted()) + .withRecordsEmitted(s.getStats().getRecordsEmitted()) + .withEstimatedBytes(s.getStats().getEstimatedBytes()) + .withEstimatedRecords(s.getStats().getEstimatedRecords()))) + .collect(Collectors.toList()); + + jobPersistence.writeStats(requestBody.getJobId(), requestBody.getAttemptNumber(), + stats.getEstimatedRecords(), stats.getEstimatedBytes(), stats.getRecordsEmitted(), stats.getBytesEmitted(), streamStats); + + } catch (final IOException ioe) { + LOGGER.error("IOException when setting temporal workflow in attempt;", ioe); + return new InternalOperationResult().succeeded(false); + } + + return new InternalOperationResult().succeeded(true); + } + } diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 0f5b3cfee5bb..1dec3f64555b 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -1301,6 +1301,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { @@ -1311,6 +1312,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, @@ -1358,6 +1360,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { @@ -1368,6 +1371,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, @@ -1695,6 +1699,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { @@ -1705,6 +1710,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, @@ -1752,6 +1758,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { @@ -1762,6 +1769,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, @@ -4138,6 +4146,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { @@ -4148,6 +4157,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, @@ -4195,6 +4205,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { @@ -4205,6 +4216,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, @@ -4447,6 +4459,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { @@ -4457,6 +4470,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, @@ -4504,6 +4518,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { @@ -4514,6 +4529,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, @@ -4634,6 +4650,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { @@ -4644,6 +4661,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, @@ -4691,6 +4709,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { @@ -4701,6 +4720,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, @@ -4894,6 +4914,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { @@ -4904,6 +4925,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, @@ -4946,6 +4968,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { @@ -4956,6 +4979,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, @@ -5015,6 +5039,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { @@ -5025,6 +5050,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, @@ -5067,6 +5093,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" }, { "stats" : { @@ -5077,6 +5104,7 @@

Example data

"estimatedRecords" : 1, "recordsEmitted" : 2 }, + "streamNamespace" : "streamNamespace", "streamName" : "streamName" } ], "id" : 5, @@ -10344,6 +10372,7 @@

AttemptStreamStats -
streamName
+
streamNamespace (optional)
stats