Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query to find out jobs running unusally long #17978

Merged
merged 6 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ public enum OssMetricsRegistry implements MetricsRegistry {
MetricEmittingApps.WORKER,
"job_succeeded_by_release_stage",
"increments when a job succeeds. jobs are double counted as this is tagged by release stage."),
KUBE_POD_PROCESS_CREATE_TIME_MILLISECS(
MetricEmittingApps.WORKER,
"kube_pod_process_create_time_millisecs",
"time taken to create a new kube pod process"),
JSON_STRING_LENGTH(
MetricEmittingApps.WORKER,
"json_string_length",
Expand All @@ -79,30 +75,43 @@ public enum OssMetricsRegistry implements MetricsRegistry {
MetricEmittingApps.WORKER,
"json_size",
"size of the json object"),
KUBE_POD_PROCESS_CREATE_TIME_MILLISECS(
MetricEmittingApps.WORKER,
"kube_pod_process_create_time_millisecs",
"time taken to create a new kube pod process"),
NUM_ABNORMAL_SCHEDULED_SYNCS_IN_LAST_DAY(
MetricEmittingApps.METRICS_REPORTER,
"num_abnormal_scheduled_syncs_last_day",
"number of abnormal syncs that have skipped at least 1 scheduled run in last day."),
NUM_ACTIVE_CONN_PER_WORKSPACE(
MetricEmittingApps.METRICS_REPORTER,
"num_active_conn_per_workspace",
"number of active connections per workspace"),
NUM_PENDING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_pending_jobs",
"number of pending jobs"),
NUM_RUNNING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_running_jobs",
"number of running jobs"),
NUM_ORPHAN_RUNNING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_orphan_running_jobs",
"number of jobs reported as running that as associated to connection inactive or deprecated"),
NUM_ACTIVE_CONN_PER_WORKSPACE(
MetricEmittingApps.METRICS_REPORTER,
"num_active_conn_per_workspace",
"number of active connections per workspace"),
NUM_ABNORMAL_SCHEDULED_SYNCS_IN_LAST_DAY(
NUM_RUNNING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_abnormal_scheduled_syncs_last_day",
"number of abnormal syncs that have skipped at least 1 scheduled run in last day."),
"num_running_jobs",
"number of running jobs"),
NUM_SOURCE_STREAMS_WITH_RECORD_SCHEMA_VALIDATION_ERRORS(MetricEmittingApps.WORKER,
"record_schema_validation_error",
"number of record schema validation errors"),
NUM_TOTAL_SCHEDULED_SYNCS_IN_LAST_DAY(
MetricEmittingApps.METRICS_REPORTER,
"num_total_scheduled_syncs_last_day",
"number of total syncs runs in last day."),

NUM_UNUSUALLY_LONG_SYNCS(
MetricEmittingApps.METRICS_REPORTER,
"num_unusually_long_syncs",
"number of unusual long syncs compared to their historic performance."),

OLDEST_PENDING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER,
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved
"oldest_pending_job_age_secs",
"oldest pending job in seconds"),
Expand All @@ -112,6 +121,9 @@ public enum OssMetricsRegistry implements MetricsRegistry {
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(MetricEmittingApps.METRICS_REPORTER,
"overall_job_runtime_in_last_hour_by_terminal_state_secs",
"overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states."),
STATE_METRIC_TRACKER_ERROR(MetricEmittingApps.WORKER,
"state_timestamp_metric_tracker_error",
"number of syncs where the state timestamp metric tracker ran out of memory or was unable to match destination state message to source state message"),
TEMPORAL_WORKFLOW_ATTEMPT(MetricEmittingApps.WORKER,
"temporal_workflow_attempt",
"count of the number of workflow attempts"),
Expand All @@ -120,13 +132,7 @@ public enum OssMetricsRegistry implements MetricsRegistry {
"count of the number of successful workflow syncs."),
TEMPORAL_WORKFLOW_FAILURE(MetricEmittingApps.WORKER,
"temporal_workflow_failure",
"count of the number of workflow failures"),
NUM_SOURCE_STREAMS_WITH_RECORD_SCHEMA_VALIDATION_ERRORS(MetricEmittingApps.WORKER,
"record_schema_validation_error",
"number of record schema validation errors"),
STATE_METRIC_TRACKER_ERROR(MetricEmittingApps.WORKER,
"state_timestamp_metric_tracker_error",
"number of syncs where the state timestamp metric tracker ran out of memory or was unable to match destination state message to source state message");
"count of the number of workflow failures");

private final MetricEmittingApp application;
private final String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,24 @@ public Duration getDuration() {

}

@Singleton
final class NumUnusuallyLongSyncs extends Emitter {

NumUnusuallyLongSyncs(final MetricClient client, final MetricRepository db) {
super(client, () -> {
final var count = db.numberOfJobsRunningUnusuallyLong();
client.gauge(OssMetricsRegistry.NUM_UNUSUALLY_LONG_SYNCS, count);
return null;
});
}

@Override
public Duration getDuration() {
return Duration.ofMinutes(15);
}

}

@Singleton
final class TotalScheduledSyncs extends Emitter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,66 @@ having count(*) < 1440 / cast(c.schedule::jsonb->'units' as integer)
+ ctx.fetchOne(queryForAbnormalSyncInMinutesInLastDay).get("cnt", long.class);
}

long numberOfJobsRunningUnusuallyLong() {
// Definition of unusually long means runtime is more than 2x historic avg run time or 15
// minutes more than avg run time, whichever is greater.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update comment to specify that we ignore jobs with less than 4 runs to:

  1. not count starting job.
  2. give jobs time to build up run history.

final var query =
"""
-- pick average running time and last sync running time in attempts table.
select
current_running_attempts.connection_id,
current_running_attempts.running_time,
historic_avg_running_attempts.avg_run_sec
from
(
-- Sub-query-1: query the currently running attempt's running time.
(
select
jobs.scope as connection_id,
extract(epoch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this extract should be collapsed into one row similar to line 176

from
age(NOW(), attempts.created_at)) as running_time
Copy link
Contributor

@davinchia davinchia Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I started working on this, I remember the attempts table not being in a great state. Unfortunately, I never found the time to come back to this.

Want to confirm this pulls the latest running attempt for the connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the problem still exists, but coupling attempts status with jobs status could eliminate the problem:

select count(*) from attempts inner join jobs on jobs.id = attempts.job_id where attempts.status = 'running' and jobs.status = 'running'; yields 62 result which is much more reasonable

from
jobs
join attempts on
jobs.id = attempts.job_id
where
jobs.status = 'running'
and attempts.status = 'running'
and jobs.config_type = 'sync' )
as current_running_attempts
join
-- Sub-query-2: query historic attempts' average running time within last week.
(
select
jobs.scope as connection_id,
avg(extract(epoch from age(attempts.updated_at, attempts.created_at))) as avg_run_sec
from
jobs
join attempts on
jobs.id = attempts.job_id
where
-- 168 hours is 1 week: we look for all attempts in last week to calculate its average running time.
attempts.updated_at >= NOW() - interval '168 HOUR'
and jobs.status = 'succeeded'
and attempts.status = 'succeeded'
and jobs.config_type = 'sync'
group by
connection_id
having
count(1) > 4
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved
) as historic_avg_running_attempts
on
current_running_attempts.connection_id = historic_avg_running_attempts.connection_id)
where
-- Find if currently running time takes 2x more time than average running time,
-- and it's 15 minutes (900 seconds) more than average running time so it won't alert on noises for quick sync jobs.
current_running_attempts.running_time > greatest(historic_avg_running_attempts.avg_run_sec * 2, historic_avg_running_attempts.avg_run_sec + 900)
""";
final var queryResults = ctx.fetch(query);
return queryResults.getValues("connection_id").size();
}

Map<JobStatus, Double> overallJobRuntimeForTerminalJobsInLastHour() {
final var query = """
SELECT status, extract(epoch from age(updated_at, created_at)) AS sec FROM jobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.WORKSPACE;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -23,6 +24,7 @@
import io.airbyte.db.instance.configs.jooq.generated.enums.NamespaceDefinitionType;
import io.airbyte.db.instance.configs.jooq.generated.enums.ReleaseStage;
import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType;
import io.airbyte.db.instance.jobs.jooq.generated.enums.AttemptStatus;
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobConfigType;
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus;
import io.airbyte.db.instance.test.TestDatabaseProviders;
Expand Down Expand Up @@ -92,6 +94,7 @@ void setUp() {
ctx.truncate(ACTOR).execute();
ctx.truncate(CONNECTION).cascade().execute();
ctx.truncate(JOBS).cascade().execute();
ctx.truncate(ATTEMPTS).cascade().execute();
ctx.truncate(WORKSPACE).cascade().execute();
}

Expand Down Expand Up @@ -503,4 +506,122 @@ void shouldNotCountNormalJobsInAbnormalMetric() {

}

@Nested
class UnusuallyLongJobs {

@Test
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved
void shouldCountInJobsWithUnusuallyLongTime() throws SQLException {
final var connectionId = UUID.randomUUID();
final var syncConfigType = JobConfigType.sync;

// Current job has been running for 12 hours while the previous 5 jobs runs 2 hours. Avg will be 2
// hours.
// Thus latest job will be counted as an unusually long-running job.
ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT, JOBS.UPDATED_AT, JOBS.CONFIG_TYPE)
.values(100L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.HOURS),
OffsetDateTime.now().minus(26, ChronoUnit.HOURS), syncConfigType)
.values(1L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(20, ChronoUnit.HOURS),
OffsetDateTime.now().minus(18, ChronoUnit.HOURS), syncConfigType)
.values(2L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(18, ChronoUnit.HOURS),
OffsetDateTime.now().minus(16, ChronoUnit.HOURS), syncConfigType)
.values(3L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(16, ChronoUnit.HOURS),
OffsetDateTime.now().minus(14, ChronoUnit.HOURS), syncConfigType)
.values(4L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(14, ChronoUnit.HOURS),
OffsetDateTime.now().minus(12, ChronoUnit.HOURS), syncConfigType)
.values(5L, connectionId.toString(), JobStatus.running, OffsetDateTime.now().minus(12, ChronoUnit.HOURS),
OffsetDateTime.now().minus(12, ChronoUnit.HOURS), syncConfigType)
.execute();

ctx.insertInto(ATTEMPTS, ATTEMPTS.ID, ATTEMPTS.JOB_ID, ATTEMPTS.STATUS, ATTEMPTS.CREATED_AT, ATTEMPTS.UPDATED_AT)
.values(100L, 100L, AttemptStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.HOURS),
OffsetDateTime.now().minus(26, ChronoUnit.HOURS))
.values(1L, 1L, AttemptStatus.succeeded, OffsetDateTime.now().minus(20, ChronoUnit.HOURS),
OffsetDateTime.now().minus(18, ChronoUnit.HOURS))
.values(2L, 2L, AttemptStatus.succeeded, OffsetDateTime.now().minus(18, ChronoUnit.HOURS),
OffsetDateTime.now().minus(16, ChronoUnit.HOURS))
.values(3L, 3L, AttemptStatus.succeeded, OffsetDateTime.now().minus(16, ChronoUnit.HOURS),
OffsetDateTime.now().minus(14, ChronoUnit.HOURS))
.values(4L, 4L, AttemptStatus.succeeded, OffsetDateTime.now().minus(14, ChronoUnit.HOURS),
OffsetDateTime.now().minus(12, ChronoUnit.HOURS))
.values(5L, 5L, AttemptStatus.running, OffsetDateTime.now().minus(12, ChronoUnit.HOURS),
OffsetDateTime.now().minus(12, ChronoUnit.HOURS))
.execute();

final var numOfJubsRunningUnusallyLong = db.numberOfJobsRunningUnusuallyLong();
assertEquals(1, numOfJubsRunningUnusallyLong);
}

@Test
void shouldNotCountInJobsWithinFifteenMinutes() throws SQLException {
final var connectionId = UUID.randomUUID();
final var syncConfigType = JobConfigType.sync;

// Latest job runs 14 minutes while the previous 5 jobs runs average about 3 minutes.
// Despite it has been more than 2x than avg it's still within 15 minutes threshold, thus this
// shouldn't be
// counted in.
ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT, JOBS.UPDATED_AT, JOBS.CONFIG_TYPE)
.values(100L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(26, ChronoUnit.MINUTES), syncConfigType)
.values(1L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(20, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(18, ChronoUnit.MINUTES), syncConfigType)
.values(2L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(18, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(16, ChronoUnit.MINUTES), syncConfigType)
.values(3L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(16, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(14, ChronoUnit.MINUTES), syncConfigType)
.values(4L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(14, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(2, ChronoUnit.MINUTES), syncConfigType)
.values(5L, connectionId.toString(), JobStatus.running, OffsetDateTime.now().minus(14, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(2, ChronoUnit.MINUTES), syncConfigType)
.execute();

ctx.insertInto(ATTEMPTS, ATTEMPTS.ID, ATTEMPTS.JOB_ID, ATTEMPTS.STATUS, ATTEMPTS.CREATED_AT, ATTEMPTS.UPDATED_AT)
.values(100L, 100L, AttemptStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(26, ChronoUnit.MINUTES))
.values(1L, 1L, AttemptStatus.succeeded, OffsetDateTime.now().minus(20, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(18, ChronoUnit.MINUTES))
.values(2L, 2L, AttemptStatus.succeeded, OffsetDateTime.now().minus(18, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(16, ChronoUnit.MINUTES))
.values(3L, 3L, AttemptStatus.succeeded, OffsetDateTime.now().minus(26, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(14, ChronoUnit.MINUTES))
.values(4L, 4L, AttemptStatus.succeeded, OffsetDateTime.now().minus(18, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(17, ChronoUnit.MINUTES))
.values(5L, 5L, AttemptStatus.running, OffsetDateTime.now().minus(14, ChronoUnit.MINUTES),
OffsetDateTime.now().minus(14, ChronoUnit.MINUTES))
.execute();

final var numOfJubsRunningUnusallyLong = db.numberOfJobsRunningUnusuallyLong();
assertEquals(0, numOfJubsRunningUnusallyLong);
}

@Test
void shouldSkipInsufficientJobRuns() throws SQLException {
final var connectionId = UUID.randomUUID();
final var syncConfigType = JobConfigType.sync;

// Require at least 5 runs in last week to get meaningful average runtime.
ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT, JOBS.UPDATED_AT, JOBS.CONFIG_TYPE)
.values(100L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.HOURS),
OffsetDateTime.now().minus(26, ChronoUnit.HOURS), syncConfigType)
.values(1L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(20, ChronoUnit.HOURS),
OffsetDateTime.now().minus(18, ChronoUnit.HOURS), syncConfigType)
.values(2L, connectionId.toString(), JobStatus.running, OffsetDateTime.now().minus(18, ChronoUnit.HOURS),
OffsetDateTime.now().minus(1, ChronoUnit.HOURS), syncConfigType)
.execute();

ctx.insertInto(ATTEMPTS, ATTEMPTS.ID, ATTEMPTS.JOB_ID, ATTEMPTS.STATUS, ATTEMPTS.CREATED_AT, ATTEMPTS.UPDATED_AT)
.values(100L, 100L, AttemptStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.HOURS),
OffsetDateTime.now().minus(26, ChronoUnit.HOURS))
.values(1L, 1L, AttemptStatus.succeeded, OffsetDateTime.now().minus(20, ChronoUnit.HOURS),
OffsetDateTime.now().minus(18, ChronoUnit.HOURS))
.values(2L, 2L, AttemptStatus.running, OffsetDateTime.now().minus(18, ChronoUnit.HOURS),
OffsetDateTime.now().minus(1, ChronoUnit.HOURS))
.execute();

final var numOfJubsRunningUnusallyLong = db.numberOfJobsRunningUnusuallyLong();
assertEquals(0, numOfJubsRunningUnusallyLong);
}

}

}