diff --git a/backend/src/predicTCR_server/app.py b/backend/src/predicTCR_server/app.py index 0822a1e..4ad30fc 100644 --- a/backend/src/predicTCR_server/app.py +++ b/backend/src/predicTCR_server/app.py @@ -160,7 +160,7 @@ def get_settings(): def input_h5_file(): sample_id = request.json.get("sample_id", None) logger.info( - f"User {current_user.email} requesting results for sample {sample_id}" + f"User {current_user.email} requesting h5 file for sample {sample_id}" ) filters = {"id": sample_id} if not current_user.is_admin and not current_user.is_runner: @@ -178,7 +178,7 @@ def input_h5_file(): def input_csv_file(): sample_id = request.json.get("sample_id", None) logger.info( - f"User {current_user.email} requesting results for sample {sample_id}" + f"User {current_user.email} requesting csv file for sample {sample_id}" ) filters = {"id": sample_id} if not current_user.is_admin and not current_user.is_runner: @@ -321,6 +321,8 @@ def runner_request_job(): new_job = Job( id=None, sample_id=sample_id, + runner_id=current_user.id, + runner_hostname=runner_hostname, timestamp_start=timestamp_now(), timestamp_end=0, status=Status.RUNNING, @@ -376,6 +378,7 @@ def runner_result(): tumor_types="Lung;Breast;Other", sources="TIL;PMBC;Other", csv_required_columns="barcode;cdr3;chain", + runner_job_timeout_mins=60, ) ) db.session.commit() diff --git a/backend/src/predicTCR_server/model.py b/backend/src/predicTCR_server/model.py index 258868c..0147aa5 100644 --- a/backend/src/predicTCR_server/model.py +++ b/backend/src/predicTCR_server/model.py @@ -52,6 +52,7 @@ class Settings(db.Model): tumor_types: Mapped[str] = mapped_column(String, nullable=False) sources: Mapped[str] = mapped_column(String, nullable=False) csv_required_columns: Mapped[str] = mapped_column(String, nullable=False) + runner_job_timeout_mins: Mapped[int] = mapped_column(Integer, nullable=False) def as_dict(self): return {c: getattr(self, c) for c in inspect(self).attrs.keys()} @@ -61,6 +62,8 @@ def as_dict(self): class Job(db.Model): id: Mapped[int] = mapped_column(Integer, primary_key=True) sample_id: Mapped[int] = mapped_column(Integer, nullable=False) + runner_id: Mapped[int] = mapped_column(Integer, nullable=False) + runner_hostname: Mapped[str] = mapped_column(String, nullable=False) timestamp_start: Mapped[int] = mapped_column(Integer, nullable=False) timestamp_end: Mapped[int] = mapped_column(Integer, nullable=False) status: Mapped[Status] = mapped_column(Enum(Status), nullable=False) @@ -78,6 +81,7 @@ class Sample(db.Model): tumor_type: Mapped[str] = mapped_column(String(128), nullable=False) source: Mapped[str] = mapped_column(String(128), nullable=False) timestamp: Mapped[int] = mapped_column(Integer, nullable=False) + timestamp_results: Mapped[int] = mapped_column(Integer, nullable=False) status: Mapped[Status] = mapped_column(Enum(Status), nullable=False) has_results_zip: Mapped[bool] = mapped_column(Boolean, nullable=False) @@ -145,7 +149,26 @@ def get_samples(email: str | None = None) -> list[Sample]: def request_job() -> int | None: - # todo: go through running jobs and reset to queued if they have been running for more than e.g. 2 hrs + job_timeout_minutes = db.session.get(Settings, 1).runner_job_timeout_mins + sample_to_resubmit = ( + db.session.execute( + db.select(Sample).filter( + (Sample.status == Status.RUNNING) + & ( + timestamp_now() - Sample.timestamp_results + > job_timeout_minutes * 60 + ) + ) + ) + .scalars() + .first() + ) + if sample_to_resubmit is not None: + logger.info( + f"Sample {sample_to_resubmit.id} has been running for more than {job_timeout_minutes} minutes - putting back in queue" + ) + sample_to_resubmit.status = Status.QUEUED + db.session.commit() selected_samples = ( db.select(Sample) .filter(Sample.status == Status.QUEUED) @@ -157,6 +180,7 @@ def request_job() -> int | None: return None else: logger.info(f" --> sample id {sample.id}") + sample.timestamp_results = timestamp_now() sample.status = Status.RUNNING db.session.commit() return sample.id @@ -185,6 +209,11 @@ def process_result( job.error_message = error_message db.session.commit() return "Result processed", 200 + if sample.has_results_zip: + logger.warning(f" --> Sample {sample_id} already has results") + job.status = Status.COMPLETED + db.session.commit() + return f"Sample {sample_id} already has results", 400 if result_zip_file is None: logger.warning(" --> No zipfile") return "Zip file missing", 400 @@ -455,6 +484,7 @@ def add_new_sample( tumor_type=tumor_type, source=source, timestamp=timestamp_now(), + timestamp_results=0, status=Status.QUEUED, has_results_zip=False, ) diff --git a/backend/tests/helpers/flask_test_utils.py b/backend/tests/helpers/flask_test_utils.py index a79e1b9..3a6d512 100644 --- a/backend/tests/helpers/flask_test_utils.py +++ b/backend/tests/helpers/flask_test_utils.py @@ -53,6 +53,7 @@ def add_test_samples(app, data_path: pathlib.Path): tumor_type=f"tumor_type{sample_id}", source=f"source{sample_id}", timestamp=sample_id, + timestamp_results=0, status=Status.QUEUED, has_results_zip=False, ) diff --git a/backend/tests/test_app.py b/backend/tests/test_app.py index bc6edc9..ac23d73 100644 --- a/backend/tests/test_app.py +++ b/backend/tests/test_app.py @@ -141,6 +141,7 @@ def test_get_settings_valid(client): "id": 1, "sources": "TIL;PMBC;Other", "tumor_types": "Lung;Breast;Other", + "runner_job_timeout_mins": 60, } @@ -154,6 +155,7 @@ def test_update_settings_valid(client): "id": 1, "sources": "a;b;g", "tumor_types": "1;2;6", + "runner_job_timeout_mins": 12, "invalid-key": "invalid", } response = client.post("/api/admin/settings", headers=headers, json=new_settings) diff --git a/frontend/src/components/JobsTable.vue b/frontend/src/components/JobsTable.vue index 063fea7..ded9d71 100644 --- a/frontend/src/components/JobsTable.vue +++ b/frontend/src/components/JobsTable.vue @@ -28,13 +28,22 @@ function get_jobs() { } get_jobs(); + +function get_runtime_minutes(job: Job): number { + if (job.status === "running") { + return Math.ceil((Date.now() / 1000 - job.timestamp_start) / 60); + } + return Math.ceil((job.timestamp_end - job.timestamp_start) / 60); +}