Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloud Dashboard 1 #10628

Merged
merged 8 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,17 @@ public ConfigRepository(final ConfigPersistence persistence,
this.database = new ExceptionWrappingDatabase(database);
}

public ExceptionWrappingDatabase getDatabase() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

required to run a query against the database.

Copy link
Contributor

@cgardens cgardens Mar 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davinchia this is breaking the whole point of the ConfigRepository abstraction. It also sets an example for everyone else who joins the team that the db can now be accessed from anywhere in code which is a pattern we have worked really hard to avoid.

Is there a reason you can't write your query inside here instead like we do with other queries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mental context at time of writing:

  • I was under the impression we want to move away from this class to DatabaseConfigPersistence.java. Because of this, adding more code here didn't seem like a good idea.
  • The queries were complicated multi-join queries and not the typical CRUD queries present in the ConfigRepository class. It didn't feel it was the right place to put these here.
  • Chris mentioned it's useful to collect all the metric queries in one spot so he can re-use them for analytics if needed. A metric specific spot like this class seemed more sane than adding to the ConfigRepository class which is bloated and quite tough to read at this point. (I think we should starting composing down to table-specific classes.)
  • I feel that the accessing the db directly is 'okay' in special cases. In this case, the metric queries are quite specialised and unlikely to be used by another application. We don't want to use the normal CRUD operations here since it'll result in tens of queries vs 1.

I'm happy to move the queries here if you have feel otherwise.

return database;
}

public StandardWorkspace getStandardWorkspace(final UUID workspaceId, final boolean includeTombstone)
throws JsonValidationException, IOException, ConfigNotFoundException {
final StandardWorkspace workspace = persistence.getConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), StandardWorkspace.class);

if (!MoreBooleans.isTruthy(workspace.getTombstone()) || includeTombstone) {
return workspace;
}

throw new ConfigNotFoundException(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString());
}

Expand Down
5 changes: 5 additions & 0 deletions airbyte-metrics/lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ plugins {
dependencies {
implementation project(':airbyte-commons')
implementation project(':airbyte-config:models')
implementation project(':airbyte-db:jooq')
implementation project(':airbyte-db:lib')

implementation 'com.datadoghq:java-dogstatsd-client:4.0.0'

testImplementation project(':airbyte-config:persistence')
testImplementation 'org.testcontainers:postgresql:1.15.3'
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@
*/
public enum AirbyteMetricsRegistry {

JOB_CANCELLED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_cancelled_by_release_stage",
"increments when a job is cancelled. jobs are double counted as this is tagged by release stage."),
JOB_CREATED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_created_by_release_stage",
"increments when a new job is created. jobs are double counted as this is tagged by release stage."),
JOB_FAILED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_failed_by_release_stage",
"increments when a job fails. jobs are double counted as this is tagged by release stage."),
JOB_SUCCEEDED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_succeeded_by_release_stage",
"increments when a job succeeds. jobs are double counted as this is tagged by release stage."),
KUBE_POD_PROCESS_CREATE_TIME_MILLISECS(
MetricEmittingApps.WORKER,
"kube_pod_process_create_time_millisecs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static void count(final AirbyteMetricsRegistry metric, final double amt,
return;
}

log.info("publishing count, name: {}, value: {}", metric.metricName, amt);
log.info("publishing count, name: {}, value: {}, tags: {}", metric.metricName, amt, tags);
statsDClient.count(metric.metricName, amt, tags);
}
}
Expand All @@ -90,7 +90,7 @@ public static void gauge(final AirbyteMetricsRegistry metric, final double val,
return;
}

log.info("publishing gauge, name: {}, value: {}", metric, val);
log.info("publishing gauge, name: {}, value: {}, tags: {}", metric, val, tags);
statsDClient.gauge(metric.metricName, val, tags);
}
}
Expand All @@ -117,7 +117,7 @@ public static void recordTimeLocal(final AirbyteMetricsRegistry metric, final do
return;
}

log.info("recording histogram, name: {}, value: {}", metric.metricName, val);
log.info("recording histogram, name: {}, value: {}, tags: {}", metric.metricName, val, tags);
statsDClient.histogram(metric.metricName, val, tags);
}
}
Expand All @@ -138,7 +138,7 @@ public static void recordTimeGlobal(final AirbyteMetricsRegistry metric, final d
return;
}

log.info("recording distribution, name: {}, value: {}", metric.metricName, val);
log.info("recording distribution, name: {}, value: {}, tags: {}", metric.metricName, val, tags);
statsDClient.distribution(metric.metricName, val, tags);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

/**
* Enum containing all applications metrics are emitted for. Used to initialize
* {@link DogStatsDMetricSingleton#initialize(MetricEmittingApp, boolean)}.
* {@link DogStatsDMetricSingleton#initialize(MetricEmittingApp, DatadogClientConfiguration)}.
*
* Application Name Conventions:
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.lib;

import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION;

import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage;
import java.util.List;
import java.util.UUID;
import org.jooq.DSLContext;

/**
* Keep track of all metric queries.
*/
public class MetricQueries {

public static List<ReleaseStage> jobIdToReleaseStages(final DSLContext ctx, final long jobId) {
final var srcRelStageCol = "src_release_stage";
final var dstRelStageCol = "dst_release_stage";

final var query = String.format("""
select src_def_data.release_stage as %s,
davinchia marked this conversation as resolved.
Show resolved Hide resolved
dest_def_data.release_stage as %s
from connection
inner join jobs on connection.id=CAST(jobs.scope AS uuid)
inner join actor as dest_data on connection.destination_id = dest_data.id
inner join actor_definition as dest_def_data on dest_data.actor_definition_id = dest_def_data.id
inner join actor as src_data on connection.source_id = src_data.id
inner join actor_definition as src_def_data on src_data.actor_definition_id = src_def_data.id
where jobs.id = '%d';""", srcRelStageCol, dstRelStageCol, jobId);

final var res = ctx.fetch(query);
final var stages = res.getValues(srcRelStageCol, ReleaseStage.class);
stages.addAll(res.getValues(dstRelStageCol, ReleaseStage.class));
return stages;
}

public static List<ReleaseStage> srcIdAndDestIdToReleaseStages(final DSLContext ctx, final UUID srcId, final UUID dstId) {
return ctx.select(ACTOR_DEFINITION.RELEASE_STAGE).from(ACTOR).join(ACTOR_DEFINITION).on(ACTOR.ACTOR_DEFINITION_ID.eq(ACTOR_DEFINITION.ID))
.where(ACTOR.ID.eq(srcId))
.or(ACTOR.ID.eq(dstId)).fetch().getValues(ACTOR_DEFINITION.RELEASE_STAGE);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.lib;

/**
* Keep track of all metric tags.
*/
public class MetricTags {

public static final String RELEASE_STAGE = "release_stage:";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.llib;
davinchia marked this conversation as resolved.
Show resolved Hide resolved

import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION;
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION;
import static io.airbyte.db.instance.jobs.jooq.Tables.*;
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.jooq.enums.ActorType;
import io.airbyte.db.instance.configs.jooq.enums.NamespaceDefinitionType;
import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage;
import io.airbyte.db.instance.test.TestDatabaseProviders;
import io.airbyte.metrics.lib.MetricQueries;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import java.util.UUID;
import org.jooq.JSONB;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;

public class MetrisQueriesTest {

private static final String USER = "user";
private static final String PASS = "hunter2";

private static final UUID SRC_DEF_ID = UUID.randomUUID();
private static final UUID DST_DEF_ID = UUID.randomUUID();

private static PostgreSQLContainer<?> container;
private static Database configDb;

@BeforeAll
static void setUpAll() throws IOException, SQLException {
container = new PostgreSQLContainer<>("postgres:13-alpine")
.withUsername(USER)
.withPassword(PASS);
container.start();

final TestDatabaseProviders databaseProviders = new TestDatabaseProviders(container);
configDb = databaseProviders.createNewConfigsDatabase();
databaseProviders.createNewJobsDatabase();

// create src and dst def
configDb.transaction(ctx -> ctx
.insertInto(ACTOR_DEFINITION, ACTOR_DEFINITION.ID, ACTOR_DEFINITION.NAME, ACTOR_DEFINITION.DOCKER_REPOSITORY,
ACTOR_DEFINITION.DOCKER_IMAGE_TAG, ACTOR_DEFINITION.SPEC, ACTOR_DEFINITION.ACTOR_TYPE, ACTOR_DEFINITION.RELEASE_STAGE)
.values(SRC_DEF_ID, "srcDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.source, ReleaseStage.beta)
.values(DST_DEF_ID, "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.generally_available)
.values(UUID.randomUUID(), "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.alpha).execute());

// drop the constraint to simplify following test set up
configDb.transaction(ctx -> ctx.alterTable(ACTOR).dropForeignKey("actor_workspace_id_fkey").execute());
}

@AfterEach
void tearDown() throws SQLException {
configDb.transaction(ctx -> ctx.truncate(ACTOR));
}

@Nested
class srcIdAndDestIdToReleaseStages {

@Test
@DisplayName("should return the right release stages")
void shouldReturnReleaseStages() throws SQLException {
final var srcId = UUID.randomUUID();
final var dstId = UUID.randomUUID();

// create src and dst
configDb.transaction(
ctx -> ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.NAME, ACTOR.CONFIGURATION, ACTOR.ACTOR_TYPE)
.values(srcId, UUID.randomUUID(), SRC_DEF_ID, "src", JSONB.valueOf("{}"), ActorType.source)
.values(dstId, UUID.randomUUID(), DST_DEF_ID, "dst", JSONB.valueOf("{}"), ActorType.destination)
.execute());
final var res = configDb.query(ctx -> MetricQueries.srcIdAndDestIdToReleaseStages(ctx, srcId, dstId));
assertEquals(List.of(ReleaseStage.beta, ReleaseStage.generally_available), res);
}

@Test
@DisplayName("should not error out or return any result if not applicable")
void shouldReturnNothingIfNotApplicable() throws SQLException {
final var res = configDb.query(ctx -> MetricQueries.srcIdAndDestIdToReleaseStages(ctx, UUID.randomUUID(), UUID.randomUUID()));
assertEquals(0, res.size());
}

}

@Nested
class jobIdToReleaseStages {

@Test
@DisplayName("should return the right release stages")
void shouldReturnReleaseStages() throws SQLException {
final var srcId = UUID.randomUUID();
final var dstId = UUID.randomUUID();
// create src and dst
configDb.transaction(
ctx -> ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.NAME, ACTOR.CONFIGURATION, ACTOR.ACTOR_TYPE)
.values(srcId, UUID.randomUUID(), SRC_DEF_ID, "src", JSONB.valueOf("{}"), ActorType.source)
.values(dstId, UUID.randomUUID(), DST_DEF_ID, "dst", JSONB.valueOf("{}"), ActorType.destination)
.execute());
final var connId = UUID.randomUUID();
// create connection
configDb.transaction(
ctx -> ctx
.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID,
CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.MANUAL)
.values(connId, NamespaceDefinitionType.source, srcId, dstId, "conn", JSONB.valueOf("{}"), true)
.execute());
// create job
final var jobId = 1L;
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE).values(jobId, connId.toString()).execute());

final var res = configDb.query(ctx -> MetricQueries.jobIdToReleaseStages(ctx, jobId));
assertEquals(List.of(ReleaseStage.beta, ReleaseStage.generally_available), res);
}

@Test
@DisplayName("should not error out or return any result if not applicable")
void shouldReturnNothingIfNotApplicable() throws SQLException {
final var missingJobId = 100000L;
final var res = configDb.query(ctx -> MetricQueries.jobIdToReleaseStages(ctx, missingJobId));
assertEquals(0, res.size());
}

}

}
1 change: 1 addition & 0 deletions airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
implementation project(':airbyte-commons-docker')
implementation project(':airbyte-config:models')
implementation project(':airbyte-config:persistence')
implementation project(':airbyte-db:jooq')
implementation project(':airbyte-db:lib')
implementation project(':airbyte-metrics:lib')
implementation project(':airbyte-json-validation')
Expand Down
Loading