Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Populate and read from SyncStats table #16476

Merged
merged 21 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void testBootloaderAppBlankDb() throws Exception {
bootloader.load();

val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
assertEquals("0.40.3.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.4.001", jobsMigrator.getLatestMigration().getVersion().getVersion());

val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.jobs.migrations;

import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_40_4_001__ChangeSyncStatsForeignKey extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_4_001__ChangeSyncStatsForeignKey.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
final DSLContext ctx = DSL.using(context.getConnection());
changeForeignKeyType(ctx);
}

private void changeForeignKeyType(final DSLContext ctx) throws Exception {
ctx.alterTable("sync_stats").alter("attempt_id").set(SQLDataType.BIGINT.nullable(false)).execute();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ create table "public"."jobs"(
);
create table "public"."sync_stats"(
"id" uuid not null,
"attempt_id" int4 not null,
"attempt_id" int8 not null,
"records_emitted" int8 null,
"bytes_emitted" int8 null,
"source_state_messages_emitted" int8 null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.scheduler.persistence;

import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.SYNC_STATS;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
Expand All @@ -23,9 +24,11 @@
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
import io.airbyte.config.SyncStats;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
import io.airbyte.db.instance.jobs.jooq.generated.tables.records.SyncStatsRecord;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.scheduler.models.Attempt;
import io.airbyte.scheduler.models.AttemptStatus;
Expand Down Expand Up @@ -61,6 +64,7 @@
import org.jooq.JSONB;
import org.jooq.Named;
import org.jooq.Record;
import org.jooq.RecordMapper;
import org.jooq.Result;
import org.jooq.Sequence;
import org.jooq.Table;
Expand Down Expand Up @@ -305,14 +309,37 @@ public Optional<String> getAttemptTemporalWorkflowId(final long jobId, final int
}

@Override
public <T> void writeOutput(final long jobId, final int attemptNumber, final T output) throws IOException {
public <T> void writeOutput(final long jobId, final int attemptNumber, final T output, final SyncStats syncStats) throws IOException {
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);
jobDatabase.transaction(
ctx -> ctx.update(ATTEMPTS)
.set(ATTEMPTS.OUTPUT, JSONB.valueOf(Jsons.serialize(output)))
.set(ATTEMPTS.UPDATED_AT, now)
.where(ATTEMPTS.JOB_ID.eq(jobId), ATTEMPTS.ATTEMPT_NUMBER.eq(attemptNumber))
.execute());
jobDatabase.transaction(ctx -> {
ctx.update(ATTEMPTS)
.set(ATTEMPTS.OUTPUT, JSONB.valueOf(Jsons.serialize(output)))
.set(ATTEMPTS.UPDATED_AT, now)
.where(ATTEMPTS.JOB_ID.eq(jobId), ATTEMPTS.ATTEMPT_NUMBER.eq(attemptNumber))
.execute();
final Optional<Record> record =
ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId,
attemptNumber).stream().findFirst();
final Long attemptId = record.get().get("id", Long.class);

ctx.insertInto(SYNC_STATS)
.set(SYNC_STATS.ID, UUID.randomUUID())
.set(SYNC_STATS.UPDATED_AT, now)
.set(SYNC_STATS.CREATED_AT, now)
.set(SYNC_STATS.ATTEMPT_ID, attemptId)
.set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted())
.set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted())
.set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsEmitted())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getRecordsCommitted?

.set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted())
.set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted())
.set(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMaxSecondsBeforeSourceStateMessageEmitted())
.set(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMeanSecondsBeforeSourceStateMessageEmitted())
.set(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted())
.set(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted())
.execute();
return null;
});

}

@Override
Expand All @@ -327,6 +354,38 @@ public void writeAttemptFailureSummary(final long jobId, final int attemptNumber
.execute());
}

@Override
public List<SyncStats> getSyncStats(final Long attemptId) throws IOException {
return jobDatabase
.query(ctx -> ctx.select(DSL.asterisk()).from(SYNC_STATS).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId)).fetch(getSyncStatsRecordMapper())
.stream()
.flatMap(row -> Stream.of(new SyncStats().withBytesEmitted(row.getBytesEmitted()).withRecordsEmitted(row.getRecordsEmitted())
.withSourceStateMessagesEmitted(row.getSourceStateMessagesEmitted())
.withDestinationStateMessagesEmitted(row.getDestinationStateMessagesEmitted()).withRecordsCommitted(row.getRecordsCommitted())
.withMeanSecondsBeforeSourceStateMessageEmitted(row.getMeanSecondsBeforeSourceStateMessageEmitted())
.withMaxSecondsBeforeSourceStateMessageEmitted(row.getMaxSecondsBeforeSourceStateMessageEmitted())
.withMeanSecondsBetweenStateMessageEmittedandCommitted(row.getMeanSecondsBetweenStateMessageEmittedAndCommitted())
.withMaxSecondsBetweenStateMessageEmittedandCommitted(row.getMaxSecondsBetweenStateMessageEmittedAndCommitted())))
.toList());
}

private static RecordMapper<Record, SyncStatsRecord> getSyncStatsRecordMapper() {
return record -> new SyncStatsRecord(
UUID.fromString(record.get(SYNC_STATS.ID, String.class)),
record.get(SYNC_STATS.ATTEMPT_ID, Long.class),
record.get(SYNC_STATS.RECORDS_EMITTED, Long.class),
record.get(SYNC_STATS.BYTES_EMITTED, Long.class),
record.get(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, Long.class),
record.get(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, Long.class),
record.get(SYNC_STATS.RECORDS_COMMITTED, Long.class),
record.get(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, Long.class),
record.get(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, Long.class),
record.get(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, Long.class),
record.get(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, Long.class),
record.get(SYNC_STATS.CREATED_AT, OffsetDateTime.class),
record.get(SYNC_STATS.UPDATED_AT, OffsetDateTime.class));
}

@Override
public Job getJob(final long jobId) throws IOException {
return jobDatabase.query(ctx -> getJob(ctx, jobId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.SyncStats;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
import io.airbyte.scheduler.models.AttemptWithJobInfo;
import io.airbyte.scheduler.models.Job;
Expand All @@ -29,6 +30,8 @@
*/
public interface JobPersistence {

List<SyncStats> getSyncStats(Long attemptId) throws IOException;

Job getJob(long jobId) throws IOException;

//
Expand Down Expand Up @@ -125,7 +128,7 @@ public interface JobPersistence {
* StandardSyncOutput#state in the configs database by calling
* ConfigRepository#updateConnectionState, which takes care of persisting the connection state.
*/
<T> void writeOutput(long jobId, int attemptNumber, T output) throws IOException;
<T> void writeOutput(long jobId, int attemptNumber, T output, SyncStats syncStats) throws IOException;

/**
* Writes a summary of all failures that occurred during the attempt.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.config.JobOutput;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.SyncStats;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
Expand Down Expand Up @@ -249,14 +252,34 @@ void testWriteOutput() throws IOException {
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
final Job created = jobPersistence.getJob(jobId);
final JobOutput jobOutput = new JobOutput().withOutputType(JobOutput.OutputType.DISCOVER_CATALOG);
final SyncStats syncStats =
new SyncStats().withBytesEmitted(100L).withRecordsEmitted(10L).withRecordsCommitted(10L).withDestinationStateMessagesEmitted(1L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different values for recordsEmitted and recordsCommitted would have caught the typo.

.withSourceStateMessagesEmitted(4L).withMaxSecondsBeforeSourceStateMessageEmitted(5L).withMeanSecondsBeforeSourceStateMessageEmitted(2L)
.withMaxSecondsBetweenStateMessageEmittedandCommitted(10L).withMeanSecondsBetweenStateMessageEmittedandCommitted(3L);
final StandardSyncOutput standardSyncOutput =
new StandardSyncOutput().withStandardSyncSummary(new StandardSyncSummary().withTotalStats(syncStats));
final JobOutput jobOutput = new JobOutput().withOutputType(JobOutput.OutputType.DISCOVER_CATALOG).withSync(standardSyncOutput);

when(timeSupplier.get()).thenReturn(Instant.ofEpochMilli(4242));
jobPersistence.writeOutput(jobId, attemptNumber, jobOutput);
jobPersistence.writeOutput(jobId, attemptNumber, jobOutput,
jobOutput.getSync().getStandardSyncSummary().getTotalStats());

final Job updated = jobPersistence.getJob(jobId);
assertEquals(Optional.of(jobOutput), updated.getAttempts().get(0).getOutput());
assertNotEquals(created.getAttempts().get(0).getUpdatedAtInSecond(), updated.getAttempts().get(0).getUpdatedAtInSecond());
assertNotEquals(created.getAttempts().get(0).getUpdatedAtInSecond(),
updated.getAttempts().get(0).getUpdatedAtInSecond());

final SyncStats storedSyncStats = jobPersistence.getSyncStats(updated.getId()).stream().findFirst().get();
assertEquals(100L, storedSyncStats.getBytesEmitted());
assertEquals(10L, storedSyncStats.getRecordsEmitted());
assertEquals(10L, storedSyncStats.getRecordsCommitted());
assertEquals(4L, storedSyncStats.getSourceStateMessagesEmitted());
assertEquals(1L, storedSyncStats.getDestinationStateMessagesEmitted());
assertEquals(5L, storedSyncStats.getMaxSecondsBeforeSourceStateMessageEmitted());
assertEquals(2L, storedSyncStats.getMeanSecondsBeforeSourceStateMessageEmitted());
assertEquals(10L, storedSyncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted());
assertEquals(3L, storedSyncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted());

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.SyncStats;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.ConfigNotFoundException;
Expand Down Expand Up @@ -172,7 +173,8 @@ public void jobSuccess(final JobSuccessInput input) {

if (input.getStandardSyncOutput() != null) {
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
jobPersistence.writeOutput(jobId, attemptId, jobOutput);
final SyncStats syncStats = jobOutput.getSync().getStandardSyncSummary().getTotalStats();
jobPersistence.writeOutput(jobId, attemptId, jobOutput, syncStats);
} else {
log.warn("The job {} doesn't have any output for the attempt {}", jobId, attemptId);
}
Expand Down Expand Up @@ -230,7 +232,8 @@ public void attemptFailure(final AttemptFailureInput input) {

if (input.getStandardSyncOutput() != null) {
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
jobPersistence.writeOutput(jobId, attemptId, jobOutput);
final SyncStats syncStats = jobOutput.getSync().getStandardSyncSummary().getTotalStats();
jobPersistence.writeOutput(jobId, attemptId, jobOutput, syncStats);
}

emitJobIdToReleaseStagesMetric(OssMetricsRegistry.ATTEMPT_FAILED_BY_RELEASE_STAGE, jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class Update {
void setJobSuccess() throws IOException {
jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, standardSyncOutput));

Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput);
Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput, jobOutput.getSync().getStandardSyncSummary().getTotalStats());
Mockito.verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID);
Mockito.verify(mJobNotifier).successJob(Mockito.any());
Mockito.verify(mJobtracker).trackSync(Mockito.any(), eq(JobState.SUCCEEDED));
Expand Down Expand Up @@ -339,7 +339,7 @@ void setAttemptFailure() throws IOException {
jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID, standardSyncOutput, failureSummary));

Mockito.verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID);
Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput);
Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput, jobOutput.getSync().getStandardSyncSummary().getTotalStats());
Mockito.verify(mJobPersistence).writeAttemptFailureSummary(JOB_ID, ATTEMPT_ID, failureSummary);
}

Expand Down