Skip to content

Commit

Permalink
Merge pull request #686 from opensafely-core/aggregate-job-metrics
Browse files Browse the repository at this point in the history
aggregate job metrics
  • Loading branch information
bloodearnest authored Dec 13, 2023
2 parents e67bb96 + c70cbc6 commit 95de4e6
Show file tree
Hide file tree
Showing 11 changed files with 319 additions and 35 deletions.
1 change: 1 addition & 0 deletions jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def _is_valid_backend_name(name):

WORKDIR = Path(os.environ.get("WORKDIR", default_work_dir)).resolve()
DATABASE_FILE = WORKDIR / "db.sqlite"
METRICS_FILE = WORKDIR / "metrics.sqlite"
GIT_REPO_DIR = WORKDIR / "repos"

# valid archive formats
Expand Down
29 changes: 22 additions & 7 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from pipeline.legacy import get_all_output_patterns_from_project_file

from jobrunner import config
from jobrunner import config, record_stats
from jobrunner.executors import volumes
from jobrunner.job_executor import (
ExecutorAPI,
Expand Down Expand Up @@ -241,16 +241,24 @@ def get_status(self, job_definition, timeout=15):
f"docker timed out after {timeout}s inspecting container {name}"
)

metrics = record_stats.read_job_metrics(job_definition.id)

if container is None: # container doesn't exist
if job_definition.cancelled:
if volumes.get_volume_api(job_definition).volume_exists(job_definition):
# jobs prepared but not running do not need to finalize, so we
# proceed directly to the FINALIZED state here
return JobStatus(
ExecutorState.FINALIZED, "Prepared job was cancelled"
ExecutorState.FINALIZED,
"Prepared job was cancelled",
metrics=metrics,
)
else:
return JobStatus(ExecutorState.UNKNOWN, "Pending job was cancelled")
return JobStatus(
ExecutorState.UNKNOWN,
"Pending job was cancelled",
metrics=metrics,
)

# timestamp file presence means we have finished preparing
timestamp_ns = volumes.get_volume_api(job_definition).read_timestamp(
Expand All @@ -261,24 +269,31 @@ def get_status(self, job_definition, timeout=15):
# re-prepare it anyway.
if timestamp_ns is None:
# we are Jon Snow
return JobStatus(ExecutorState.UNKNOWN)
return JobStatus(ExecutorState.UNKNOWN, metrics={})
else:
# we've finish preparing
return JobStatus(ExecutorState.PREPARED, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.PREPARED, timestamp_ns=timestamp_ns, metrics=metrics
)

if container["State"]["Running"]:
timestamp_ns = datestr_to_ns_timestamp(container["State"]["StartedAt"])
return JobStatus(ExecutorState.EXECUTING, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.EXECUTING, timestamp_ns=timestamp_ns, metrics=metrics
)
elif job_definition.id in RESULTS:
return JobStatus(
ExecutorState.FINALIZED,
timestamp_ns=RESULTS[job_definition.id].timestamp_ns,
metrics=metrics,
)
else:
# container present but not running, i.e. finished
# Nb. this does not include prepared jobs, as they have a volume but not a container
timestamp_ns = datestr_to_ns_timestamp(container["State"]["FinishedAt"])
return JobStatus(ExecutorState.EXECUTED, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.EXECUTED, timestamp_ns=timestamp_ns, metrics=metrics
)

def get_results(self, job_definition):
if job_definition.id not in RESULTS:
Expand Down
1 change: 1 addition & 0 deletions jobrunner/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class JobStatus:
timestamp_ns: int = (
None # timestamp this JobStatus occurred, in integer nanoseconds
)
metrics: dict = field(default_factory=dict)


@dataclass
Expand Down
7 changes: 7 additions & 0 deletions jobrunner/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import secrets
import shlex
from enum import Enum
from functools import total_ordering

from jobrunner.lib.commands import requires_db_access
from jobrunner.lib.database import databaseclass, migration
Expand All @@ -36,6 +37,7 @@ class State(Enum):
# affordances in the web, cli and telemetry.


@total_ordering
class StatusCode(Enum):
# PENDING states
#
Expand Down Expand Up @@ -77,6 +79,10 @@ class StatusCode(Enum):
def is_final_code(self):
return self in StatusCode._FINAL_STATUS_CODES

def __lt__(self, other):
order = list(self.__class__)
return order.index(self) < order.index(other)


# used for tracing to know if a state is final or not
StatusCode._FINAL_STATUS_CODES = [
Expand Down Expand Up @@ -245,6 +251,7 @@ class Job:
# used to track the OTel trace context for this job
trace_context: dict = None

# map of file -> error
level4_excluded_files: dict = None

# used to cache the job_request json by the tracing code
Expand Down
104 changes: 96 additions & 8 deletions jobrunner/record_stats.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""
Super crude docker/system stats logger
"""
import json
import logging
import subprocess
import sys
import time
from collections import defaultdict

from opentelemetry import trace

Expand All @@ -17,12 +19,62 @@
log = logging.getLogger(__name__)
tracer = trace.get_tracer("ticks")

# Simplest possible table. We're only storing aggregate data
DDL = """
CREATE TABLE IF NOT EXISTS jobs (
id TEXT,
metrics TEXT,
PRIMARY KEY (id)
)
"""

_conn = None


def ensure_metrics_db():
global _conn
_conn = database.get_connection(config.METRICS_FILE)
_conn.execute("PRAGMA journal_mode = WAL")
_conn.execute(DDL)


def read_job_metrics(job_id, **metrics):
if _conn is None:
ensure_metrics_db()

raw_metrics = _conn.execute(
"SELECT metrics FROM jobs WHERE id = ?",
(job_id,),
).fetchone()
if raw_metrics is None:
metrics = {}
else:
metrics = json.loads(raw_metrics["metrics"])
return defaultdict(float, metrics)


def write_job_metrics(job_id, metrics):
if _conn is None:
ensure_metrics_db()

raw_metrics = json.dumps(metrics)
_conn.execute(
"""
INSERT INTO jobs (id, metrics) VALUES (?, ?)
ON CONFLICT(id) DO UPDATE set metrics = ?
""",
(job_id, raw_metrics, raw_metrics),
)


def main():
last_run = None
while True:
before = time.time()
last_run = record_tick_trace(last_run)
active_jobs = database.find_where(
models.Job, state__in=[models.State.PENDING, models.State.RUNNING]
)
last_run = record_tick_trace(last_run, active_jobs)

# record_tick_trace might have take a while, so sleep the remainding interval
# enforce a minimum time of 3s to ensure we don't hammer honeycomb or
Expand All @@ -31,7 +83,7 @@ def main():
time.sleep(max(2, config.STATS_POLL_INTERVAL - elapsed))


def record_tick_trace(last_run):
def record_tick_trace(last_run, active_jobs):
"""Record a period tick trace of current jobs.
This will give us more realtime information than the job traces, which only
Expand Down Expand Up @@ -69,10 +121,7 @@ def record_tick_trace(last_run):
# every span has the same timings
start_time = last_run
end_time = now

active_jobs = database.find_where(
models.Job, state__in=[models.State.PENDING, models.State.RUNNING]
)
duration_s = int((end_time - start_time) / 1e9)

with tracer.start_as_current_span(
"TICK", start_time=start_time, attributes=trace_attrs
Expand All @@ -82,22 +131,61 @@ def record_tick_trace(last_run):
root.add_event("stats_error", attributes=error_attrs, timestamp=start_time)

for job in active_jobs:
span = tracer.start_span(job.status_code.name, start_time=start_time)
# we are using seconds for our metric calculations

metrics = stats.get(job.id, {})

# set up attributes
job_span_attrs = {}
job_span_attrs.update(trace_attrs)
metrics = stats.get(job.id, {})
job_span_attrs["has_metrics"] = metrics != {}
job_span_attrs.update(metrics)

# this means the job is running
if metrics:
runtime_s = int(now / 1e9) - job.started_at
# protect against unexpected runtimes
if runtime_s > 0:
job_metrics = update_job_metrics(
job,
metrics,
duration_s,
runtime_s,
)
job_span_attrs.update(job_metrics)
else:
job_span_attrs.set("bad_tick_runtime", runtime_s)

# record span
span = tracer.start_span(job.status_code.name, start_time=start_time)
tracing.set_span_metadata(span, job, **job_span_attrs)
span.end(end_time)

return end_time


def update_job_metrics(job, raw_metrics, duration_s, runtime_s):
"""Update and persist per-job aggregate stats in the metrics db"""

job_metrics = read_job_metrics(job.id)

cpu = raw_metrics["cpu_percentage"]
mem = raw_metrics["memory_used"]

job_metrics["cpu_sample"] = cpu
job_metrics["cpu_cumsum"] += duration_s * cpu
job_metrics["cpu_mean"] = job_metrics["cpu_cumsum"] / runtime_s
job_metrics["cpu_peak"] = max(job_metrics["cpu_peak"], cpu)
job_metrics["mem_sample"] = mem
job_metrics["mem_cumsum"] += duration_s * mem
job_metrics["mem_mean"] = job_metrics["mem_cumsum"] / runtime_s
job_metrics["mem_peak"] = max(job_metrics["mem_peak"], mem)

write_job_metrics(job.id, job_metrics)

return job_metrics


if __name__ == "__main__":
configure_logging()

Expand Down
6 changes: 4 additions & 2 deletions jobrunner/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import requests

from jobrunner import config, queries
from jobrunner import config, queries, record_stats
from jobrunner.create_or_update_jobs import create_or_update_jobs
from jobrunner.lib.database import find_where, select_values
from jobrunner.lib.log_utils import configure_logging, set_log_context
Expand Down Expand Up @@ -143,19 +143,21 @@ def job_to_remote_format(job):
Convert our internal representation of a Job into whatever format the
job-server expects
"""

return {
"identifier": job.id,
"job_request_id": job.job_request_id,
"action": job.action,
"run_command": job.run_command,
"status": job.state.value,
"status_code": job.status_code.value if job.status_code else "",
"status_code": job.status_code.value,
"status_message": job.status_message or "",
"created_at": job.created_at_isoformat,
"updated_at": job.updated_at_isoformat,
"started_at": job.started_at_isoformat,
"completed_at": job.completed_at_isoformat,
"trace_context": job.trace_context,
"metrics": record_stats.read_job_metrics(job.id),
}


Expand Down
13 changes: 12 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

import jobrunner.executors.local
from jobrunner import config, tracing
from jobrunner import config, record_stats, tracing
from jobrunner.executors import volumes
from jobrunner.job_executor import Study
from jobrunner.lib import database
Expand Down Expand Up @@ -195,6 +195,17 @@ def db(monkeypatch, request):
del database.CONNECTION_CACHE.__dict__[database_file]


@pytest.fixture(autouse=True)
def metrics_db(monkeypatch, request):
"""Create a throwaway metrics db."""
record_stats._conn = None
database_file = f"file:metrics-{request.node.name}?mode=memory&cache=shared"
monkeypatch.setattr(config, "METRICS_FILE", database_file)
yield
database.CONNECTION_CACHE.__dict__.pop(database_file, None)
record_stats._conn = None


@dataclass
class SubprocessStub:
calls: deque = field(default_factory=deque)
Expand Down
17 changes: 16 additions & 1 deletion tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections import defaultdict
from copy import deepcopy

from jobrunner import config, tracing
from jobrunner import config, record_stats, tracing
from jobrunner.job_executor import ExecutorState, JobResults, JobStatus
from jobrunner.lib import docker
from jobrunner.lib.database import insert
Expand Down Expand Up @@ -78,6 +78,12 @@ def job_factory(job_request=None, **kwargs):
values["created_at"] = int(timestamp)
if "updated_at" not in kwargs:
values["updated_at"] = int(timestamp)

if "started_at" not in kwargs:
status_code = kwargs.get("status_code", values["status_code"])
if status_code and status_code >= StatusCode.EXECUTING:
values["started_at"] = int(timestamp)

if "status_code_updated_at" not in kwargs:
values["status_code_updated_at"] = int(timestamp * 1e9)
values.update(kwargs)
Expand All @@ -103,6 +109,15 @@ def job_results_factory(timestamp_ns=None, **kwargs):
return JobResults(timestamp_ns=timestamp_ns, **values)


def metrics_factory(job=None, metrics=None):
if job is None:
job = job_factory()
if metrics is None:
metrics = {}

record_stats.write_job_metrics(job.id, metrics)


class StubExecutorAPI:
"""Dummy implementation of the ExecutorAPI, for use in tests.
Expand Down
Loading

0 comments on commit 95de4e6

Please sign in to comment.