Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Store job information
Browse files Browse the repository at this point in the history
- add Job model and /admin/jobs endpoint
- start/end times, status and error messages are now stored for each job
- admins can view the list of jobs
- resolves #20
lkeegan committed Oct 22, 2024

Verified

This commit was signed with the committer’s verified signature.
lkeegan Liam Keegan
1 parent f73af6d commit c3c424d
Showing 9 changed files with 262 additions and 79 deletions.
39 changes: 34 additions & 5 deletions backend/src/predicTCR_server/app.py
Original file line number Diff line number Diff line change
@@ -13,10 +13,13 @@
from flask_jwt_extended import JWTManager
from flask_cors import cross_origin
from predicTCR_server.logger import get_logger
from predicTCR_server.utils import timestamp_now
from predicTCR_server.model import (
db,
Sample,
User,
Job,
Status,
Settings,
add_new_user,
add_new_runner_user,
@@ -281,6 +284,16 @@ def admin_users():
)
return jsonify(users=[user.as_dict() for user in users])

@app.route("/api/admin/jobs", methods=["GET"])
@jwt_required()
def admin_jobs():
if not current_user.is_admin:
return jsonify(message="Admin account required"), 400
jobs = (
db.session.execute(db.select(Job).order_by(db.desc(Job.id))).scalars().all()
)
return jsonify(jobs=[job.as_dict() for job in jobs])

@app.route("/api/admin/runner_token", methods=["GET"])
@jwt_required()
def admin_runner_token():
@@ -305,7 +318,17 @@ def runner_request_job():
sample_id = request_job()
if sample_id is None:
return jsonify(message="No job available"), 204
return {"sample_id": sample_id}
new_job = Job(
id=None,
sample_id=sample_id,
timestamp_start=timestamp_now(),
timestamp_end=0,
status=Status.RUNNING,
error_message="",
)
db.session.add(new_job)
db.session.commit()
return {"job_id": new_job.id, "sample_id": sample_id}

@app.route("/api/runner/result", methods=["POST"])
@cross_origin()
@@ -317,6 +340,9 @@ def runner_result():
sample_id = form_as_dict.get("sample_id", None)
if sample_id is None:
return jsonify(message="Missing key: sample_id"), 400
job_id = form_as_dict.get("job_id", None)
if job_id is None:
return jsonify(message="Missing key: job_id"), 400
success = form_as_dict.get("success", None)
if success is None or success.lower() not in ["true", "false"]:
logger.info(" -> missing success key")
@@ -328,19 +354,22 @@ def runner_result():
return jsonify(message="Result has success=True but no file"), 400
runner_hostname = form_as_dict.get("runner_hostname", "")
logger.info(
f"Result upload for '{sample_id}' from runner {current_user.email} / {runner_hostname}"
f"Job '{job_id}' uploaded result for '{sample_id}' from runner {current_user.email} / {runner_hostname}"
)
error_message = form_as_dict.get("error_message", None)
if error_message is not None:
error_message = form_as_dict.get("error_message", "")
if error_message != "":
logger.info(f" -> error message: {error_message}")
message, code = process_result(sample_id, success, zipfile)
message, code = process_result(
int(job_id), int(sample_id), success, error_message, zipfile
)
return jsonify(message=message), code

with app.app_context():
db.create_all()
if db.session.get(Settings, 1) is None:
db.session.add(
Settings(
id=None,
default_personal_submission_quota=10,
default_personal_submission_interval_mins=30,
global_quota=1000,
109 changes: 70 additions & 39 deletions backend/src/predicTCR_server/model.py
Original file line number Diff line number Diff line change
@@ -2,12 +2,14 @@

import re
import flask
from enum import Enum
import enum
import argon2
import pathlib
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm import DeclarativeBase, MappedAsDataclass, Mapped, mapped_column
from werkzeug.datastructures import FileStorage
from sqlalchemy.inspection import inspect
from sqlalchemy import Integer, String, Boolean, Enum
from dataclasses import dataclass
from predicTCR_server.email import send_email
from predicTCR_server.settings import predicTCR_url
@@ -20,12 +22,17 @@
decode_password_reset_token,
)

db = SQLAlchemy()

class Base(DeclarativeBase, MappedAsDataclass):
pass


db = SQLAlchemy(model_class=Base)
ph = argon2.PasswordHasher()
logger = get_logger()


class Status(str, Enum):
class Status(str, enum.Enum):
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
@@ -34,34 +41,45 @@ class Status(str, Enum):

@dataclass
class Settings(db.Model):
id: int = db.Column(db.Integer, primary_key=True)
default_personal_submission_quota: int = db.Column(db.Integer, nullable=False)
default_personal_submission_interval_mins: int = db.Column(
db.Integer, nullable=False
id: Mapped[int] = mapped_column(Integer, primary_key=True)
default_personal_submission_quota: Mapped[int] = mapped_column(
Integer, nullable=False
)
default_personal_submission_interval_mins: Mapped[int] = mapped_column(
Integer, nullable=False
)
global_quota: int = db.Column(db.Integer, nullable=False)
tumor_types: str = db.Column(db.String, nullable=False)
sources: str = db.Column(db.String, nullable=False)
csv_required_columns: str = db.Column(db.String, nullable=False)
global_quota: Mapped[int] = mapped_column(Integer, nullable=False)
tumor_types: Mapped[str] = mapped_column(String, nullable=False)
sources: Mapped[str] = mapped_column(String, nullable=False)
csv_required_columns: Mapped[str] = mapped_column(String, nullable=False)

def as_dict(self):
return {
c: getattr(self, c)
for c in inspect(self).attrs.keys()
if c != "password_hash"
}
return {c: getattr(self, c) for c in inspect(self).attrs.keys()}


@dataclass
class Job(db.Model):
id: Mapped[int] = mapped_column(Integer, primary_key=True)
sample_id: Mapped[int] = mapped_column(Integer, nullable=False)
timestamp_start: Mapped[int] = mapped_column(Integer, nullable=False)
timestamp_end: Mapped[int] = mapped_column(Integer, nullable=False)
status: Mapped[Status] = mapped_column(Enum(Status), nullable=False)
error_message: Mapped[str] = mapped_column(String, nullable=False)

def as_dict(self):
return {c: getattr(self, c) for c in inspect(self).attrs.keys()}


@dataclass
class Sample(db.Model):
id: int = db.Column(db.Integer, primary_key=True)
email: str = db.Column(db.String(256), nullable=False)
name: str = db.Column(db.String(128), nullable=False)
tumor_type: str = db.Column(db.String(128), nullable=False)
source: str = db.Column(db.String(128), nullable=False)
timestamp: int = db.Column(db.Integer, nullable=False)
status: Status = db.Column(db.Enum(Status), nullable=False)
has_results_zip: bool = db.Column(db.Boolean, nullable=False)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
email: Mapped[str] = mapped_column(String(256), nullable=False)
name: Mapped[str] = mapped_column(String(128), nullable=False)
tumor_type: Mapped[str] = mapped_column(String(128), nullable=False)
source: Mapped[str] = mapped_column(String(128), nullable=False)
timestamp: Mapped[int] = mapped_column(Integer, nullable=False)
status: Mapped[Status] = mapped_column(Enum(Status), nullable=False)
has_results_zip: Mapped[bool] = mapped_column(Boolean, nullable=False)

def _base_path(self) -> pathlib.Path:
data_path = flask.current_app.config["PREDICTCR_DATA_PATH"]
@@ -79,17 +97,17 @@ def result_file_path(self) -> pathlib.Path:

@dataclass
class User(db.Model):
id: int = db.Column(db.Integer, primary_key=True)
email: str = db.Column(db.Text, nullable=False, unique=True)
password_hash: str = db.Column(db.Text, nullable=False)
activated: bool = db.Column(db.Boolean, nullable=False)
enabled: bool = db.Column(db.Boolean, nullable=False)
quota: int = db.Column(db.Integer, nullable=False)
submission_interval_minutes: int = db.Column(db.Integer, nullable=False)
last_submission_timestamp: int = db.Column(db.Integer, nullable=False)
is_admin: bool = db.Column(db.Boolean, nullable=False)
is_runner: bool = db.Column(db.Boolean, nullable=False)
full_results: bool = db.Column(db.Boolean, nullable=False)
id: int = mapped_column(Integer, primary_key=True)
email: str = mapped_column(String, nullable=False, unique=True)
password_hash: str = mapped_column(String, nullable=False)
activated: bool = mapped_column(Boolean, nullable=False)
enabled: bool = mapped_column(Boolean, nullable=False)
quota: int = mapped_column(Integer, nullable=False)
submission_interval_minutes: int = mapped_column(Integer, nullable=False)
last_submission_timestamp: int = mapped_column(Integer, nullable=False)
is_admin: bool = mapped_column(Boolean, nullable=False)
is_runner: bool = mapped_column(Boolean, nullable=False)
full_results: bool = mapped_column(Boolean, nullable=False)

def set_password_nocheck(self, new_password: str):
self.password_hash = ph.hash(new_password)
@@ -145,17 +163,26 @@ def request_job() -> int | None:


def process_result(
sample_id: str, success: bool, result_zip_file: FileStorage | None
job_id: int,
sample_id: int,
success: bool,
error_message: str,
result_zip_file: FileStorage | None,
) -> tuple[str, int]:
sample = db.session.execute(
db.select(Sample).filter_by(id=sample_id)
).scalar_one_or_none()
sample = db.session.get(Sample, sample_id)
if sample is None:
logger.warning(f" --> Unknown sample id {sample_id}")
return f"Unknown sample id {sample_id}", 400
job = db.session.get(Job, job_id)
if job is None:
logger.warning(f" --> Unknown job id {job_id}")
return f"Unknown job id {job_id}", 400
job.timestamp_end = timestamp_now()
if success is False:
sample.has_results_zip = False
sample.status = Status.FAILED
job.status = Status.FAILED
job.error_message = error_message
db.session.commit()
return "Result processed", 200
if result_zip_file is None:
@@ -165,6 +192,7 @@ def process_result(
result_zip_file.save(sample.result_file_path())
sample.has_results_zip = True
sample.status = Status.COMPLETED
job.status = Status.COMPLETED
db.session.commit()
return "Result processed", 200

@@ -244,6 +272,7 @@ def add_new_user(email: str, password: str, is_admin: bool) -> tuple[str, int]:
try:
db.session.add(
User(
id=None,
email=email,
password_hash=ph.hash(password),
activated=False,
@@ -282,6 +311,7 @@ def add_new_runner_user() -> User | None:
runner_name = f"runner{runner_number}"
db.session.add(
User(
id=None,
email=runner_name,
password_hash="",
activated=False,
@@ -419,6 +449,7 @@ def add_new_sample(
settings = db.session.get(Settings, 1)
settings.global_quota -= 1
new_sample = Sample(
id=None,
email=email,
name=name,
tumor_type=tumor_type,
2 changes: 2 additions & 0 deletions backend/tests/helpers/flask_test_utils.py
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ def add_test_users(app):
email = f"{name}@abc.xy"
db.session.add(
User(
id=None,
email=email,
password_hash=ph.hash(name),
activated=True,
@@ -46,6 +47,7 @@ def add_test_samples(app, data_path: pathlib.Path):
with open(f"{ref_dir}/input.{input_file_type}", "w") as f:
f.write(input_file_type)
new_sample = Sample(
id=None,
email="user@abc.xy",
name=name,
tumor_type=f"tumor_type{sample_id}",
59 changes: 46 additions & 13 deletions backend/tests/test_app.py
Original file line number Diff line number Diff line change
@@ -210,12 +210,13 @@ def test_result_invalid(client):
assert "No results available" in response.json["message"]


def _upload_result(client, result_zipfile: pathlib.Path, sample_id: int):
def _upload_result(client, result_zipfile: pathlib.Path, job_id: int, sample_id: int):
headers = _get_auth_headers(client, "runner@abc.xy", "runner")
with open(result_zipfile, "rb") as f:
response = client.post(
"/api/runner/result",
data={
"job_id": job_id,
"sample_id": sample_id,
"success": True,
"file": (io.BytesIO(f.read()), result_zipfile.name),
@@ -225,19 +226,57 @@ def _upload_result(client, result_zipfile: pathlib.Path, sample_id: int):
return response


def test_result_valid(client, result_zipfile):
headers = _get_auth_headers(client, "user@abc.xy", "user")
sample_id = 1
assert _upload_result(client, result_zipfile, sample_id).status_code == 200
def test_runner_valid_success(client, result_zipfile):
headers = _get_auth_headers(client, "runner@abc.xy", "runner")
# request job
request_job_response = client.post(
"/api/runner/request_job",
json={"runner_hostname": "me"},
headers=headers,
)
assert request_job_response.status_code == 200
assert request_job_response.json == {"sample_id": 1, "job_id": 1}
# upload successful result
assert _upload_result(client, result_zipfile, 1, 1).status_code == 200
response = client.post(
"/api/result",
json={"sample_id": sample_id},
headers=headers,
json={"sample_id": 1},
headers=_get_auth_headers(client, "user@abc.xy", "user"),
)
assert response.status_code == 200
assert len(response.data) > 1


def test_runner_valid_failure(client, result_zipfile):
headers = _get_auth_headers(client, "runner@abc.xy", "runner")
# request job
request_job_response = client.post(
"/api/runner/request_job",
json={"runner_hostname": "me"},
headers=headers,
)
assert request_job_response.status_code == 200
assert request_job_response.json == {"sample_id": 1, "job_id": 1}
# upload failure result
result_response = client.post(
"/api/runner/result",
data={
"job_id": 1,
"sample_id": 1,
"success": False,
"error_message": "Something went wrong",
},
headers=headers,
)
assert result_response.status_code == 200
response = client.post(
"/api/result",
json={"sample_id": 1},
headers=_get_auth_headers(client, "user@abc.xy", "user"),
)
assert response.status_code == 400


def test_admin_samples_valid(client):
headers = _get_auth_headers(client, "admin@abc.xy", "admin")
response = client.get("/api/admin/samples", headers=headers)
@@ -288,12 +327,6 @@ def test_admin_users_valid(client):
assert "users" in response.json


def test_runner_result_valid(client, result_zipfile):
response = _upload_result(client, result_zipfile, 1)
assert response.status_code == 200
assert "result processed" in response.json["message"].lower()


def test_admin_update_user_valid(client):
headers = _get_auth_headers(client, "admin@abc.xy", "admin")
user = client.get("/api/admin/users", headers=headers).json["users"][0]
62 changes: 62 additions & 0 deletions frontend/src/components/JobsTable.vue
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<script setup lang="ts">
import {
FwbTable,
FwbTableBody,
FwbTableCell,
FwbTableHead,
FwbTableHeadCell,
FwbTableRow,
} from "flowbite-vue";
import type { Job } from "@/utils/types";
import { apiClient, logout } from "@/utils/api-client";
import { ref } from "vue";
const jobs = ref([] as Job[]);
function get_jobs() {
apiClient
.get("admin/jobs")
.then((response) => {
jobs.value = response.data.jobs;
})
.catch((error) => {
if (error.response.status > 400) {
logout();
}
console.log(error);
});
}
get_jobs();
</script>

<template>
<fwb-table aria-label="Runner jobs">
<fwb-table-head>
<fwb-table-head-cell>Id</fwb-table-head-cell>
<fwb-table-head-cell>SampleId</fwb-table-head-cell>
<fwb-table-head-cell>Start</fwb-table-head-cell>
<fwb-table-head-cell>Runtime</fwb-table-head-cell>
<fwb-table-head-cell>Status</fwb-table-head-cell>
<fwb-table-head-cell>Error message</fwb-table-head-cell>
</fwb-table-head>
<fwb-table-body>
<fwb-table-row
v-for="job in jobs"
:key="job.id"
:class="job.status !== 'failed' ? '!bg-slate-50' : '!bg-red-200'"
>
<fwb-table-cell>{{ job.id }}</fwb-table-cell>
<fwb-table-cell>{{ job.sample_id }}</fwb-table-cell>
<fwb-table-cell>{{
new Date(job.timestamp_start * 1000).toISOString()
}}</fwb-table-cell>
<fwb-table-cell
>{{ (job.timestamp_end - job.timestamp_start) / 60 }}m</fwb-table-cell
>
<fwb-table-cell>{{ job.status }}</fwb-table-cell>
<fwb-table-cell>{{ job.error_message }}</fwb-table-cell>
</fwb-table-row>
</fwb-table-body>
</fwb-table>
</template>
9 changes: 9 additions & 0 deletions frontend/src/utils/types.ts
Original file line number Diff line number Diff line change
@@ -31,3 +31,12 @@ export type Settings = {
sources: string;
csv_required_columns: string;
};

export type Job = {
id: number;
sample_id: number;
timestamp_start: number;
timestamp_end: number;
status: string;
error_message: string;
};
4 changes: 4 additions & 0 deletions frontend/src/views/AdminView.vue
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ import SamplesTable from "@/components/SamplesTable.vue";
import SettingsTable from "@/components/SettingsTable.vue";
import UsersTable from "@/components/UsersTable.vue";
import ListComponent from "@/components/ListComponent.vue";
import JobsTable from "@/components/JobsTable.vue";
import ListItem from "@/components/ListItem.vue";
import { FwbButton } from "flowbite-vue";
import { ref } from "vue";
@@ -72,6 +73,9 @@ get_samples();
<ListItem title="Samples">
<SamplesTable :samples="samples" :admin="true"></SamplesTable>
</ListItem>
<ListItem title="Runner Jobs">
<JobsTable />
</ListItem>
</ListComponent>
</div>
</main>
2 changes: 2 additions & 0 deletions runner/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -10,9 +10,11 @@ services:
deploy:
mode: replicated
replicas: ${PREDICTCR_RUNNER_JOBS:-1}
restart: always
networks:
- predictcr-network

networks:
predictcr-network:
name: predictcr
external: true
55 changes: 33 additions & 22 deletions runner/src/predicTCR_runner/runner.py
Original file line number Diff line number Diff line change
@@ -16,9 +16,13 @@ def __init__(self, api_url: str, jwt_token: str, poll_interval: int = 5):
self.poll_interval = poll_interval
self.runner_hostname = os.environ.get("HOSTNAME", "unknown")
self.logger = logging.getLogger(__name__)
self.job_id: int | None = None
self.sample_id: int | None = None

def _request_job(self) -> int | None:
def _request_job(self) -> bool:
self.logger.debug(f"Requesting job from {self.api_url}...")
self.job_id = None
self.sample_id = None
response = requests.post(
url=f"{self.api_url}/runner/request_job",
json={"runner_hostname": self.runner_hostname},
@@ -27,23 +31,28 @@ def _request_job(self) -> int | None:
)
if response.status_code == 204:
self.logger.debug(" -> no job available.")
return None
return False
elif response.status_code == 200:
sample_id = response.json().get("sample_id", None)
self.logger.debug(f" -> sample id {sample_id} available.")
return sample_id
self.job_id = response.json().get("job_id", None)
self.sample_id = response.json().get("sample_id", None)
self.logger.debug(
f" -> job id {self.job_id} for sample id {self.sample_id}."
)
if self.job_id is not None and self.sample_id is not None:
return True
else:
self.logger.error(
f"request_job failed with {response.status_code}: {response.content}"
)
return None
return False

def _report_job_failed(self, sample_id: int, message: str):
self.logger.info(f"...job failed for sample id {sample_id}.")
def _report_job_failed(self, message: str):
self.logger.info(f"...job {self.job_id} failed for sample id {self.sample_id}.")
response = requests.post(
url=f"{self.api_url}/runner/result",
data={
"sample_id": sample_id,
"job_id": self.job_id,
"sample_id": self.sample_id,
"runner_id": self.runner_hostname,
"success": "false",
"error_message": message,
@@ -54,16 +63,17 @@ def _report_job_failed(self, sample_id: int, message: str):
if response.status_code != 200:
self.logger.error(f"result with {response.status_code}: {response.content}")

def _upload_result(self, sample_id: int, result_file: str):
def _upload_result(self, result_file: str):
self.logger.info(
f"...job finished for sample id {sample_id}, uploading {result_file}..."
f"...job {self.job_id} finished for sample id {self.sample_id}, uploading {result_file}..."
)
with open(result_file) as result_file:
response = requests.post(
url=f"{self.api_url}/runner/result",
files={"file": result_file},
data={
"sample_id": sample_id,
"job_id": self.job_id,
"sample_id": self.sample_id,
"runner_hostname": self.runner_hostname,
"success": True,
},
@@ -73,14 +83,16 @@ def _upload_result(self, sample_id: int, result_file: str):
if response.status_code != 200:
self.logger.error(f"Failed to upload result: {response.content}")

def _run_job(self, sample_id: int):
self.logger.info(f"Starting job for sample id {sample_id}...")
def _run_job(self):
self.logger.info(
f"Starting job {self.job_id} for sample id {self.sample_id}..."
)
self.logger.debug("Downloading input files...")
with tempfile.TemporaryDirectory(delete=False) as tmpdir:
for input_file_type in ["h5", "csv"]:
response = requests.post(
url=f"{self.api_url}/input_{input_file_type}_file",
json={"sample_id": sample_id},
json={"sample_id": self.sample_id},
headers=self.auth_header,
timeout=30,
)
@@ -89,7 +101,6 @@ def _run_job(self, sample_id: int):
f"Failed to download {input_file_type}: {response.content}"
)
return self._report_job_failed(
sample_id,
f"Failed to download {input_file_type} on {self.runner_hostname}",
)
input_file_name = f"input.{input_file_type}"
@@ -104,20 +115,20 @@ def _run_job(self, sample_id: int):
self.logger.debug(f" - running {tmpdir}/scripts.sh...")
subprocess.run(["sh", "./script.sh"], cwd=tmpdir, check=True)
self.logger.debug(f" ...{tmpdir}/script.sh finished.")
self._upload_result(sample_id, f"{tmpdir}/result.zip")
self._upload_result(f"{tmpdir}/result.zip")
except Exception as e:
self.logger.exception(e)
self.logger.error(f"Failed to run job for sample {sample_id}: {e}")
self.logger.error(
f"Failed to run job {self.job_id} for sample {self.sample_id}: {e}"
)
return self._report_job_failed(
sample_id,
f"Error during job execution on {self.runner_hostname}: {e}",
)

def start(self):
self.logger.info(f"Polling {self.api_url} for jobs...")
while True:
job_id = self._request_job()
if job_id is not None:
self._run_job(job_id)
if self._request_job():
self._run_job()
else:
time.sleep(self.poll_interval)

0 comments on commit c3c424d

Please sign in to comment.