Skip to content

Commit

Permalink
Add migrations to support progress bar. (#19191)
Browse files Browse the repository at this point in the history
Follow up to #18953.

Implement all the DB migrations required for a progress bar.

The main change here is to support saving:

the estimated records/bytes at the sync level
the estimated records/bytes and emitted records/bytes at the stream level
After this, I'll put up a PR for the persistence layer changes, which will writing to and reading from these columns.

Finally, I'll wire this into the API changes, which are currently stubs.

- add the estimated_records and estimated_bytes columns to the SyncStats table.
- create a stream_stats table
  - estimated and emitted records/bytes column
  - contains attempt_id and stream_name columns. Unique constraints on these two columns.
  - foreign key to the attempt_id table.
  - this table hopefully sets us up for the parallel sync work.
  • Loading branch information
davinchia authored and akashkulk committed Nov 17, 2022
1 parent ad40d0e commit 02ac7ea
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ void testBootloaderAppBlankDb() throws Exception {
bootloader.load();

val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
assertEquals("0.40.18.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.18.002", jobsMigrator.getLatestMigration().getVersion().getVersion());

val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.jobs.migrations;

import static org.jooq.impl.DSL.currentOffsetDateTime;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.foreignKey;
import static org.jooq.impl.DSL.primaryKey;
import static org.jooq.impl.DSL.unique;

import java.time.OffsetDateTime;
import java.util.UUID;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The estimated columns contains the overall estimated records and bytes for an attempt.
* <p>
* The new stream_stats table contains the estimated and emitted records/bytes for an attempt at the
* per-stream level. This lets us track per-stream stats as an attempt is in progress.
*/
public class V0_40_18_002__AddProgressBarStats extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_18_002__AddProgressBarStats.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
try (final DSLContext ctx = DSL.using(context.getConnection())) {
addEstimatedColumnsToSyncStats(ctx);
addStreamStatsTable(ctx);
}
}

private static void addEstimatedColumnsToSyncStats(final DSLContext ctx) {
ctx.alterTable("sync_stats")
.add(
field("estimated_records", SQLDataType.BIGINT.nullable(true)),
field("estimated_bytes", SQLDataType.BIGINT.nullable(true)))
.execute();
}

private static void addStreamStatsTable(final DSLContext ctx) {
// Metadata Columns
final Field<UUID> id = field("id", SQLDataType.UUID.nullable(false));
final Field<Integer> attemptId = field("attempt_id", SQLDataType.INTEGER.nullable(false));
final Field<String> streamNamespace = field("stream_namespace", SQLDataType.VARCHAR.nullable(false));
final Field<String> streamName = field("stream_name", SQLDataType.VARCHAR.nullable(false));

// Stats Columns
final Field<Long> recordsEmitted = field("records_emitted", SQLDataType.BIGINT.nullable(true));
final Field<Long> bytesEmitted = field("bytes_emitted", SQLDataType.BIGINT.nullable(true));
final Field<Long> estimatedRecords = field("estimated_records", SQLDataType.BIGINT.nullable(true));
final Field<Long> estimatedBytes = field("estimated_bytes", SQLDataType.BIGINT.nullable(true));

// Time Columns
final Field<OffsetDateTime> createdAt =
field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));
final Field<OffsetDateTime> updatedAt =
field("updated_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));

ctx.createTableIfNotExists("stream_stats")
.columns(
id, attemptId, streamNamespace, streamName, recordsEmitted, bytesEmitted, estimatedRecords, estimatedBytes, createdAt, updatedAt)
.constraints(
primaryKey(id),
foreignKey(attemptId).references("attempts", "id").onDeleteCascade(),
// Prevent duplicate stat records of the same stream and attempt.
unique("attempt_id", "stream_name"))
.execute();

// Create an index on attempt_id, since all read queries on this table as of this migration will be
// WHERE clauses on the attempt id.
ctx.createIndex("index").on("stream_stats", "attempt_id").execute();

}

}
26 changes: 26 additions & 0 deletions airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ create table "public"."normalization_summaries"(
constraint "normalization_summaries_pkey"
primary key ("id")
);
create table "public"."stream_stats"(
"id" uuid not null,
"attempt_id" int4 not null,
"stream_namespace" varchar(2147483647) not null,
"stream_name" varchar(2147483647) not null,
"records_emitted" int8 null,
"bytes_emitted" int8 null,
"estimated_records" int8 null,
"estimated_bytes" int8 null,
"created_at" timestamptz(35) not null default null,
"updated_at" timestamptz(35) not null default null,
constraint "stream_stats_pkey"
primary key ("id")
);
create table "public"."sync_stats"(
"id" uuid not null,
"attempt_id" int8 not null,
Expand All @@ -75,13 +89,19 @@ create table "public"."sync_stats"(
"max_seconds_between_state_message_emitted_and_committed" int8 null,
"created_at" timestamptz(35) not null default null,
"updated_at" timestamptz(35) not null default null,
"estimated_records" int8 null,
"estimated_bytes" int8 null,
constraint "sync_stats_pkey"
primary key ("id")
);
alter table "public"."normalization_summaries"
add constraint "normalization_summaries_attempt_id_fkey"
foreign key ("attempt_id")
references "public"."attempts" ("id");
alter table "public"."stream_stats"
add constraint "stream_stats_attempt_id_fkey"
foreign key ("attempt_id")
references "public"."attempts" ("id");
alter table "public"."sync_stats"
add constraint "sync_stats_attempt_id_fkey"
foreign key ("attempt_id")
Expand All @@ -101,5 +121,11 @@ create index "jobs_scope_idx" on "public"."jobs"("scope" asc);
create index "jobs_status_idx" on "public"."jobs"("status" asc);
create unique index "normalization_summaries_pkey" on "public"."normalization_summaries"("id" asc);
create index "normalization_summary_attempt_id_idx" on "public"."normalization_summaries"("attempt_id" asc);
create index "index" on "public"."stream_stats"("attempt_id" asc);
create unique index "stream_stats_attempt_id_stream_name_key" on "public"."stream_stats"(
"attempt_id" asc,
"stream_name" asc
);
create unique index "stream_stats_pkey" on "public"."stream_stats"("id" asc);
create index "attempt_id_idx" on "public"."sync_stats"("attempt_id" asc);
create unique index "sync_stats_pkey" on "public"."sync_stats"("id" asc);

0 comments on commit 02ac7ea

Please sign in to comment.