Skip to content

Commit

Permalink
Merge pull request #245 from opensafely-core/jobrunner-v2.73.0
Browse files Browse the repository at this point in the history
Update to jobrunner v2.73.0, including pipeline
  • Loading branch information
bloodearnest authored Mar 25, 2024
2 parents 0c5ac2d + a6ef85d commit 4ac33e1
Show file tree
Hide file tree
Showing 44 changed files with 399 additions and 184 deletions.
3 changes: 3 additions & 0 deletions opensafely/_vendor/jobrunner/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Dict, List

from opensafely._vendor.pipeline.exceptions import ProjectValidationError
from opensafely._vendor.pipeline.models import Action
from opensafely._vendor.pipeline.outputs import get_output_dirs

from opensafely._vendor.jobrunner.lib.path_utils import ensure_unix_path
Expand All @@ -19,6 +20,7 @@ class ActionSpecification:
run: str
needs: List[str]
outputs: Dict[str, Dict[str, str]]
action: Action


def get_action_specification(config, action_id, using_dummy_data_backend=False):
Expand Down Expand Up @@ -80,6 +82,7 @@ def get_action_specification(config, action_id, using_dummy_data_backend=False):
run=run_command,
needs=action_spec.needs,
outputs=action_spec.outputs.dict(exclude_unset=True),
action=action_spec,
)


Expand Down
13 changes: 10 additions & 3 deletions opensafely/_vendor/jobrunner/cli/add_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import argparse
import dataclasses
import pprint
import sys
import textwrap
from pathlib import Path
from urllib.parse import urlparse
Expand Down Expand Up @@ -36,6 +37,7 @@ def main(
requested_actions=actions,
force_run_dependencies=force_run_dependencies,
cancelled_actions=[],
codelists_ok=True,
)
)
print("Submitting JobRequest:\n")
Expand All @@ -46,6 +48,8 @@ def main(
for job in jobs:
display_obj(job)

return job_request, jobs


def display_obj(obj):
if hasattr(obj, "asdict"):
Expand All @@ -57,7 +61,10 @@ def display_obj(obj):
print()


def run():
def run(argv=None):
if argv is None:
argv = sys.argv[1:]

configure_logging()
parser = argparse.ArgumentParser(description=__doc__.partition("\n\n")[0])
parser.add_argument("repo_url", help="URL (or local path) of git repository")
Expand All @@ -82,8 +89,8 @@ def run():
)
parser.add_argument("-f", "--force-run-dependencies", action="store_true")

args = parser.parse_args()
main(**vars(args))
args = parser.parse_args(argv)
return main(**vars(args))


if __name__ == "__main__":
Expand Down
12 changes: 11 additions & 1 deletion opensafely/_vendor/jobrunner/cli/kill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import argparse

from opensafely._vendor.jobrunner.executors import local
from opensafely._vendor.jobrunner.job_executor import JobResults
from opensafely._vendor.jobrunner.lib import database, docker
from opensafely._vendor.jobrunner.models import Job, State, StatusCode
from opensafely._vendor.jobrunner.run import job_to_job_definition, mark_job_as_failed
Expand Down Expand Up @@ -32,7 +33,16 @@ def main(partial_job_ids, cleanup=False):
)
if container_metadata:
job = job_to_job_definition(job)
metadata = local.get_job_metadata(job, {}, container_metadata)
# create a dummy JobResults with just the message we want
results = JobResults(
outputs=None,
unmatched_patterns=None,
unmatched_outputs=None,
exit_code=container_metadata["State"]["ExitCode"],
image_id=container_metadata["Image"],
message="job killed by OpenSAFELY administrator",
)
metadata = local.get_job_metadata(job, {}, container_metadata, results)
local.write_job_logs(job, metadata, copy_log_to_workspace=False)

if cleanup:
Expand Down
23 changes: 4 additions & 19 deletions opensafely/_vendor/jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from multiprocessing import cpu_count
from pathlib import Path

from opensafely._vendor import pipeline


class ConfigException(Exception):
pass
Expand Down Expand Up @@ -46,6 +48,7 @@ def _is_valid_backend_name(name):

WORKDIR = Path(os.environ.get("WORKDIR", default_work_dir)).resolve()
DATABASE_FILE = WORKDIR / "db.sqlite"
METRICS_FILE = WORKDIR / "metrics.sqlite"
GIT_REPO_DIR = WORKDIR / "repos"

# valid archive formats
Expand Down Expand Up @@ -143,25 +146,7 @@ def database_urls_from_env(env):
) # 16mb


# TODO: we might want to take this list from pipeline if we implement it there.
LEVEL4_FILE_TYPES = [
# tables
".csv",
".tsv",
# images
".jpg",
".jpeg",
".png",
".svg",
".svgz",
# reports
".html",
".pdf",
".txt",
".log",
".json",
".md",
]
LEVEL4_FILE_TYPES = pipeline.constants.LEVEL4_FILE_TYPES

STATA_LICENSE = os.environ.get("STATA_LICENSE")
STATA_LICENSE_REPO = os.environ.get(
Expand Down
35 changes: 25 additions & 10 deletions opensafely/_vendor/jobrunner/create_or_update_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class JobRequestError(Exception):
pass


class StaleCodelistError(JobRequestError):
pass


class NothingToDoError(JobRequestError):
pass

Expand All @@ -62,10 +66,10 @@ def create_or_update_jobs(job_request):
JobRequestError,
) as e:
log.info(f"JobRequest failed:\n{e}")
create_failed_job(job_request, e)
create_job_from_exception(job_request, e)
except Exception:
log.exception("Uncaught error while creating jobs")
create_failed_job(job_request, JobRequestError("Internal error"))
create_job_from_exception(job_request, JobRequestError("Internal error"))
else:
if job_request.cancelled_actions:
log.debug("Cancelling actions: %s", job_request.cancelled_actions)
Expand Down Expand Up @@ -114,7 +118,8 @@ def create_jobs(job_request):


def validate_job_request(job_request):
if config.ALLOWED_GITHUB_ORGS:
# http prefix allows local git repos, useful for tests
if job_request.repo_url.startswith("http") and config.ALLOWED_GITHUB_ORGS:
validate_repo_url(job_request.repo_url, config.ALLOWED_GITHUB_ORGS)
if not job_request.requested_actions:
raise JobRequestError("At least one action must be supplied")
Expand Down Expand Up @@ -238,6 +243,7 @@ def recursively_build_jobs(jobs_by_action, job_request, pipeline_config, action)
commit=job_request.commit,
workspace=job_request.workspace,
database_name=job_request.database_name,
requires_db=action_spec.action.is_database_action,
action=action,
wait_for_job_ids=wait_for_job_ids,
requires_outputs_from=action_spec.needs,
Expand Down Expand Up @@ -311,12 +317,12 @@ def assert_codelists_ok(job_request, new_jobs):
# Codelists are out of date; fail the entire job request if any job
# requires database access
if job.requires_db:
raise JobRequestError(
raise StaleCodelistError(
f"Codelists are out of date (required by action {job.action})"
)


def create_failed_job(job_request, exception):
def create_job_from_exception(job_request, exception):
"""
Sometimes we want to say to the job-server (and the user): your JobRequest
was broken so we weren't able to create any jobs for it. But the only way
Expand All @@ -327,19 +333,25 @@ def create_failed_job(job_request, exception):
This is a bit of a hack, but it keeps the sync protocol simple.
"""
action = "__error__"
error = exception
state = State.FAILED
status_message = str(exception)

# Special case for the NothingToDoError which we treat as a success
if isinstance(exception, NothingToDoError):
state = State.SUCCEEDED
code = StatusCode.SUCCEEDED
status_message = "All actions have already run"
action = job_request.requested_actions[0]
error = None
# StaleCodelistError is a failure but not an INTERNAL_ERROR
elif isinstance(exception, StaleCodelistError):
code = StatusCode.STALE_CODELISTS
else:
state = State.FAILED
code = StatusCode.INTERNAL_ERROR
# include exception name in message to aid debugging
status_message = f"{type(exception).__name__}: {exception}"
action = "__error__"
error = exception

now = time.time()
job = Job(
job_request_id=job_request.id,
Expand Down Expand Up @@ -379,7 +391,10 @@ def set_cancelled_flag_for_actions(job_request_id, actions):
# working.
update_where(
Job,
{"cancelled": True},
{
"cancelled": True,
"completed_at": int(time.time()),
},
job_request_id=job_request_id,
action__in=actions,
)
37 changes: 28 additions & 9 deletions opensafely/_vendor/jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from opensafely._vendor.pipeline.legacy import get_all_output_patterns_from_project_file

from opensafely._vendor.jobrunner import config
from opensafely._vendor.jobrunner import config, record_stats
from opensafely._vendor.jobrunner.executors import volumes
from opensafely._vendor.jobrunner.job_executor import (
ExecutorAPI,
Expand Down Expand Up @@ -241,16 +241,24 @@ def get_status(self, job_definition, timeout=15):
f"docker timed out after {timeout}s inspecting container {name}"
)

metrics = record_stats.read_job_metrics(job_definition.id)

if container is None: # container doesn't exist
if job_definition.cancelled:
if volumes.get_volume_api(job_definition).volume_exists(job_definition):
# jobs prepared but not running do not need to finalize, so we
# proceed directly to the FINALIZED state here
return JobStatus(
ExecutorState.FINALIZED, "Prepared job was cancelled"
ExecutorState.FINALIZED,
"Prepared job was cancelled",
metrics=metrics,
)
else:
return JobStatus(ExecutorState.UNKNOWN, "Pending job was cancelled")
return JobStatus(
ExecutorState.UNKNOWN,
"Pending job was cancelled",
metrics=metrics,
)

# timestamp file presence means we have finished preparing
timestamp_ns = volumes.get_volume_api(job_definition).read_timestamp(
Expand All @@ -261,24 +269,31 @@ def get_status(self, job_definition, timeout=15):
# re-prepare it anyway.
if timestamp_ns is None:
# we are Jon Snow
return JobStatus(ExecutorState.UNKNOWN)
return JobStatus(ExecutorState.UNKNOWN, metrics={})
else:
# we've finish preparing
return JobStatus(ExecutorState.PREPARED, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.PREPARED, timestamp_ns=timestamp_ns, metrics=metrics
)

if container["State"]["Running"]:
timestamp_ns = datestr_to_ns_timestamp(container["State"]["StartedAt"])
return JobStatus(ExecutorState.EXECUTING, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.EXECUTING, timestamp_ns=timestamp_ns, metrics=metrics
)
elif job_definition.id in RESULTS:
return JobStatus(
ExecutorState.FINALIZED,
timestamp_ns=RESULTS[job_definition.id].timestamp_ns,
metrics=metrics,
)
else:
# container present but not running, i.e. finished
# Nb. this does not include prepared jobs, as they have a volume but not a container
timestamp_ns = datestr_to_ns_timestamp(container["State"]["FinishedAt"])
return JobStatus(ExecutorState.EXECUTED, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.EXECUTED, timestamp_ns=timestamp_ns, metrics=metrics
)

def get_results(self, job_definition):
if job_definition.id not in RESULTS:
Expand Down Expand Up @@ -409,7 +424,9 @@ def finalize_job(job_definition):
base_revision=labels.get("org.opensafely.base.vcs-ref", "unknown"),
base_created=labels.get("org.opencontainers.base.build-date", "unknown"),
)
job_metadata = get_job_metadata(job_definition, outputs, container_metadata)
job_metadata = get_job_metadata(
job_definition, outputs, container_metadata, results
)

if job_definition.cancelled:
write_job_logs(job_definition, job_metadata, copy_log_to_workspace=False)
Expand All @@ -426,7 +443,7 @@ def finalize_job(job_definition):
return results


def get_job_metadata(job_definition, outputs, container_metadata):
def get_job_metadata(job_definition, outputs, container_metadata, results):
# job_metadata is a big dict capturing everything we know about the state
# of the job
job_metadata = dict()
Expand All @@ -437,6 +454,7 @@ def get_job_metadata(job_definition, outputs, container_metadata):
job_metadata["docker_image_id"] = container_metadata["Image"]
# convert exit code to str so 0 exit codes get logged
job_metadata["exit_code"] = str(container_metadata["State"]["ExitCode"])
job_metadata["status_message"] = results.message
job_metadata["container_metadata"] = container_metadata
job_metadata["outputs"] = outputs
job_metadata["commit"] = job_definition.study.commit
Expand Down Expand Up @@ -679,6 +697,7 @@ def write_log_file(job_definition, job_metadata, filename, excluded):
"commit",
"docker_image_id",
"exit_code",
"status_message",
"created_at",
"completed_at",
"database_name",
Expand Down
1 change: 1 addition & 0 deletions opensafely/_vendor/jobrunner/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class JobStatus:
timestamp_ns: int = (
None # timestamp this JobStatus occurred, in integer nanoseconds
)
metrics: dict = field(default_factory=dict)


@dataclass
Expand Down
19 changes: 0 additions & 19 deletions opensafely/_vendor/jobrunner/lib/commands.py

This file was deleted.

1 change: 1 addition & 0 deletions opensafely/_vendor/jobrunner/lib/docker_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def get_container_stats(timeout=DEFAULT_TIMEOUT):
removeprefix(row["Name"], "os-job-"): {
"cpu_percentage": float(row["CPUPerc"].rstrip("%")),
"memory_used": _parse_size(row["MemUsage"].split()[0]),
"container_id": row["Container"],
}
for row in data
if row["Name"].startswith("os-job-")
Expand Down
Loading

0 comments on commit 4ac33e1

Please sign in to comment.