Skip to content

Commit

Permalink
Revert metrics reporter migration to micronaut (airbytehq#17927)
Browse files Browse the repository at this point in the history
* Revert "improve query performance (airbytehq#17862)"

This reverts commit e0db09b.

* Revert "fix metric reporter not producing metrics (airbytehq#17863)"

This reverts commit 63e39e6.

* Revert "convert airbyte-metrics-reporter to micronaut (airbytehq#17365)"

This reverts commit d30de1b.
  • Loading branch information
malikdiarra authored and jhammarstedt committed Oct 31, 2022
1 parent 87aee6a commit 85a08a4
Show file tree
Hide file tree
Showing 15 changed files with 867 additions and 1,204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,19 @@

import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
import static org.jooq.impl.SQLDataType.VARCHAR;

import io.airbyte.db.instance.configs.jooq.generated.enums.ReleaseStage;
import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType;
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.jooq.DSLContext;

/**
Expand Down Expand Up @@ -49,4 +57,173 @@ public static List<ReleaseStage> srcIdAndDestIdToReleaseStages(final DSLContext
.or(ACTOR.ID.eq(dstId)).fetch().getValues(ACTOR_DEFINITION.RELEASE_STAGE);
}

public static int numberOfPendingJobs(final DSLContext ctx) {
return ctx.selectCount().from(JOBS).where(JOBS.STATUS.eq(JobStatus.pending)).fetchOne(0, int.class);
}

public static int numberOfRunningJobs(final DSLContext ctx) {
return ctx.selectCount().from(JOBS).join(CONNECTION).on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE))
.where(JOBS.STATUS.eq(JobStatus.running).and(CONNECTION.STATUS.eq(StatusType.active)))
.fetchOne(0, int.class);
}

public static int numberOfOrphanRunningJobs(final DSLContext ctx) {
return ctx.selectCount().from(JOBS).join(CONNECTION).on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE))
.where(JOBS.STATUS.eq(JobStatus.running).and(CONNECTION.STATUS.ne(StatusType.active)))
.fetchOne(0, int.class);
}

public static Long oldestPendingJobAgeSecs(final DSLContext ctx) {
return oldestJobAgeSecs(ctx, JobStatus.pending);
}

public static Long oldestRunningJobAgeSecs(final DSLContext ctx) {
return oldestJobAgeSecs(ctx, JobStatus.running);
}

private static Long oldestJobAgeSecs(final DSLContext ctx, final JobStatus status) {
final var readableTimeField = "run_duration";
final var durationSecField = "run_duration_secs";
final var query = String.format("""
WITH
oldest_job AS (
SELECT id,
age(current_timestamp, created_at) AS %s
FROM jobs
WHERE status = '%s'
ORDER BY run_duration DESC
LIMIT 1)
SELECT id,
run_duration,
extract(epoch from run_duration) as %s
FROM oldest_job""", readableTimeField, status.getLiteral(), durationSecField);
final var res = ctx.fetch(query);
// unfortunately there are no good Jooq methods for retrieving a single record of a single column
// forcing the List cast.
final var duration = res.getValues(durationSecField, Double.class);

if (duration.size() == 0) {
return 0L;
}
// .get(0) works in the following code due to the query's SELECT 1.
final var id = res.getValues("id", String.class).get(0);
final var readableTime = res.getValues(readableTimeField, String.class).get(0);
log.info("oldest job information - id: {}, readable time: {}", id, readableTime);

// as double can have rounding errors, round down to remove noise.
return duration.get(0).longValue();
}

public static List<Long> numberOfActiveConnPerWorkspace(final DSLContext ctx) {
final var countField = "num_conn";
final var query = String.format("""
SELECT workspace_id, count(c.id) as %s
FROM actor
INNER JOIN workspace ws ON actor.workspace_id = ws.id
INNER JOIN connection c ON actor.id = c.source_id
WHERE ws.tombstone = false
AND actor.tombstone = false AND actor.actor_type = 'source'
AND c.status = 'active'
GROUP BY workspace_id;""", countField);
return ctx.fetch(query).getValues(countField, long.class);
}

public static List<Pair<JobStatus, Double>> overallJobRuntimeForTerminalJobsInLastHour(final DSLContext ctx) {
final var statusField = "status";
final var timeField = "sec";
final var query =
String.format("""
SELECT %s, extract(epoch from age(updated_at, created_at)) AS %s FROM jobs
WHERE updated_at >= NOW() - INTERVAL '1 HOUR'
AND (jobs.status = 'failed' OR jobs.status = 'succeeded' OR jobs.status = 'cancelled');""", statusField, timeField);
final var statuses = ctx.fetch(query).getValues(statusField, JobStatus.class);
final var times = ctx.fetch(query).getValues(timeField, double.class);

final var pairedRes = new ArrayList<Pair<JobStatus, Double>>();
for (int i = 0; i < statuses.size(); i++) {
final var pair = new ImmutablePair<>(statuses.get(i), times.get(i));
pairedRes.add(pair);
}

return pairedRes;
}

/*
* A connection that is not running on schedule is defined in last 24 hours if the number of runs
* are not matching with the number of expected runs according to the schedule settings. Refer to
* runbook for detailed discussion.
*
*/
public static Long numberOfJobsNotRunningOnScheduleInLastDay(final DSLContext ctx) {
final var countField = "cnt";
// This query finds all sync jobs ran in last 24 hours and count how many times they have run.
// Comparing this to the expected number of runs (24 hours divide by configured cadence in hours),
// if it runs below that expected number it will be considered as abnormal instance.
// For example, if it's configured to run every 6 hours but in last 24 hours it only has 3 runs,
// it will be considered as 1 abnormal instance.
final var queryForAbnormalSyncInHoursInLastDay =
String.format("""
select count(1) as %s
from
(
select
c.id,
count(*) as cnt
from
connection c
left join Jobs j on
j.scope::uuid = c.id
where
c.schedule is not null
and c.schedule != 'null'
and j.created_at > now() - interval '24 hours 1 minutes'
and c.status = 'active'
and j.config_type = 'sync'
and c.updated_at < now() - interval '24 hours 1 minutes'
and cast(c.schedule::jsonb->'timeUnit' as text) = '"hours"'
group by 1
having count(*) < 24 / cast(c.schedule::jsonb->'units' as integer)) as abnormal_jobs
""", countField);

// Similar to the query above, this finds if the connection cadence's timeUnit is minutes.
// thus we use 1440 (=24 hours x 60 minutes) to divide the configured cadence.
final var queryForAbnormalSyncInMinutesInLastDay =
String.format("""
select count(1) as %s from (
select
c.id,
count(*) as cnt
from
connection c
left join Jobs j on
j.scope::uuid = c.id
where
c.schedule is not null
and c.schedule != 'null'
and j.created_at > now() - interval '24 hours 1 minutes'
and c.status = 'active'
and j.config_type = 'sync'
and c.updated_at < now() - interval '24 hours 1 minutes'
and cast(c.schedule::jsonb->'timeUnit' as text) = '"minutes"'
group by 1
having count(*) < 1440 / cast(c.schedule::jsonb->'units' as integer)) as abnormal_jobs
""", countField);
return ctx.fetch(queryForAbnormalSyncInHoursInLastDay).getValues(countField, long.class).get(0)
+ ctx.fetch(queryForAbnormalSyncInMinutesInLastDay).getValues(countField, long.class).get(0);
}

public static Long numScheduledActiveConnectionsInLastDay(final DSLContext ctx) {
final var countField = "cnt";
final var queryForTotalConnections = String.format("""
select count(1) as %s
from connection c
where
c.updated_at < now() - interval '24 hours 1 minutes'
and cast(c.schedule::jsonb->'timeUnit' as text) IN ('"hours"', '"minutes"')
and c.status = 'active'
""", countField);

return ctx.fetch(queryForTotalConnections).getValues(countField, long.class).get(0);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
/**
* Enum source of truth of all Airbyte metrics. Each enum value represent a metric and is linked to
* an application and contains a description to make it easier to understand.
* <p>
*
* Each object of the enum actually represent a metric, so the Registry name is misleading. The
* reason 'Registry' is in the name is to emphasize this enum's purpose as a source of truth for all
* metrics. This also helps code readability i.e. AirbyteMetricsRegistry.metricA.
* <p>
*
* Metric Name Convention (adapted from
* https://docs.datadoghq.com/developers/guide/what-best-practices-are-recommended-for-naming-metrics-and-tags/):
* <p>
* - Use lowercase. Metric names are case-sensitive.
* - Use lowercase. Metric names are case sensitive.
* <p>
* - Use underscore to delimit names with multiple words.
* <p>
Expand Down
Loading

0 comments on commit 85a08a4

Please sign in to comment.