Skip to content

Commit

Permalink
Add a timeout for runner jobs
Browse files Browse the repository at this point in the history
- Add runner job timeout
  - if a job status has been "running" for too long, it is reset to "queued" to allow another runner to claim it
- Ignore repeat results
  - if a sample already has a results zip file, ignore a new result
  - this could happen if a job took longer than the timeout but eventually completed
- add runner job timeout to admin settings
- resolves #32
  • Loading branch information
lkeegan committed Oct 23, 2024
1 parent afc9d53 commit a636a50
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 11 deletions.
7 changes: 5 additions & 2 deletions backend/src/predicTCR_server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def get_settings():
def input_h5_file():
sample_id = request.json.get("sample_id", None)
logger.info(
f"User {current_user.email} requesting results for sample {sample_id}"
f"User {current_user.email} requesting h5 file for sample {sample_id}"
)
filters = {"id": sample_id}
if not current_user.is_admin and not current_user.is_runner:
Expand All @@ -178,7 +178,7 @@ def input_h5_file():
def input_csv_file():
sample_id = request.json.get("sample_id", None)
logger.info(
f"User {current_user.email} requesting results for sample {sample_id}"
f"User {current_user.email} requesting csv file for sample {sample_id}"
)
filters = {"id": sample_id}
if not current_user.is_admin and not current_user.is_runner:
Expand Down Expand Up @@ -321,6 +321,8 @@ def runner_request_job():
new_job = Job(
id=None,
sample_id=sample_id,
runner_id=current_user.id,
runner_hostname=runner_hostname,
timestamp_start=timestamp_now(),
timestamp_end=0,
status=Status.RUNNING,
Expand Down Expand Up @@ -376,6 +378,7 @@ def runner_result():
tumor_types="Lung;Breast;Other",
sources="TIL;PMBC;Other",
csv_required_columns="barcode;cdr3;chain",
runner_job_timeout_mins=60,
)
)
db.session.commit()
Expand Down
32 changes: 31 additions & 1 deletion backend/src/predicTCR_server/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Settings(db.Model):
tumor_types: Mapped[str] = mapped_column(String, nullable=False)
sources: Mapped[str] = mapped_column(String, nullable=False)
csv_required_columns: Mapped[str] = mapped_column(String, nullable=False)
runner_job_timeout_mins: Mapped[int] = mapped_column(Integer, nullable=False)

def as_dict(self):
return {c: getattr(self, c) for c in inspect(self).attrs.keys()}
Expand All @@ -61,6 +62,8 @@ def as_dict(self):
class Job(db.Model):
id: Mapped[int] = mapped_column(Integer, primary_key=True)
sample_id: Mapped[int] = mapped_column(Integer, nullable=False)
runner_id: Mapped[int] = mapped_column(Integer, nullable=False)
runner_hostname: Mapped[str] = mapped_column(String, nullable=False)
timestamp_start: Mapped[int] = mapped_column(Integer, nullable=False)
timestamp_end: Mapped[int] = mapped_column(Integer, nullable=False)
status: Mapped[Status] = mapped_column(Enum(Status), nullable=False)
Expand All @@ -78,6 +81,7 @@ class Sample(db.Model):
tumor_type: Mapped[str] = mapped_column(String(128), nullable=False)
source: Mapped[str] = mapped_column(String(128), nullable=False)
timestamp: Mapped[int] = mapped_column(Integer, nullable=False)
timestamp_results: Mapped[int] = mapped_column(Integer, nullable=False)
status: Mapped[Status] = mapped_column(Enum(Status), nullable=False)
has_results_zip: Mapped[bool] = mapped_column(Boolean, nullable=False)

Expand Down Expand Up @@ -145,7 +149,26 @@ def get_samples(email: str | None = None) -> list[Sample]:


def request_job() -> int | None:
# todo: go through running jobs and reset to queued if they have been running for more than e.g. 2 hrs
job_timeout_minutes = db.session.get(Settings, 1).runner_job_timeout_mins
sample_to_resubmit = (
db.session.execute(
db.select(Sample).filter(
(Sample.status == Status.RUNNING)
& (
timestamp_now() - Sample.timestamp_results
> job_timeout_minutes * 60
)
)
)
.scalars()
.first()
)
if sample_to_resubmit is not None:
logger.info(
f"Sample {sample_to_resubmit.id} has been running for more than {job_timeout_minutes} minutes - putting back in queue"
)
sample_to_resubmit.status = Status.QUEUED
db.session.commit()
selected_samples = (
db.select(Sample)
.filter(Sample.status == Status.QUEUED)
Expand All @@ -157,6 +180,7 @@ def request_job() -> int | None:
return None
else:
logger.info(f" --> sample id {sample.id}")
sample.timestamp_results = timestamp_now()
sample.status = Status.RUNNING
db.session.commit()
return sample.id
Expand Down Expand Up @@ -185,6 +209,11 @@ def process_result(
job.error_message = error_message
db.session.commit()
return "Result processed", 200
if sample.has_results_zip:
logger.warning(f" --> Sample {sample_id} already has results")
job.status = Status.COMPLETED
db.session.commit()
return f"Sample {sample_id} already has results", 400
if result_zip_file is None:
logger.warning(" --> No zipfile")
return "Zip file missing", 400
Expand Down Expand Up @@ -455,6 +484,7 @@ def add_new_sample(
tumor_type=tumor_type,
source=source,
timestamp=timestamp_now(),
timestamp_results=0,
status=Status.QUEUED,
has_results_zip=False,
)
Expand Down
1 change: 1 addition & 0 deletions backend/tests/helpers/flask_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def add_test_samples(app, data_path: pathlib.Path):
tumor_type=f"tumor_type{sample_id}",
source=f"source{sample_id}",
timestamp=sample_id,
timestamp_results=0,
status=Status.QUEUED,
has_results_zip=False,
)
Expand Down
2 changes: 2 additions & 0 deletions backend/tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def test_get_settings_valid(client):
"id": 1,
"sources": "TIL;PMBC;Other",
"tumor_types": "Lung;Breast;Other",
"runner_job_timeout_mins": 60,
}


Expand All @@ -154,6 +155,7 @@ def test_update_settings_valid(client):
"id": 1,
"sources": "a;b;g",
"tumor_types": "1;2;6",
"runner_job_timeout_mins": 12,
"invalid-key": "invalid",
}
response = client.post("/api/admin/settings", headers=headers, json=new_settings)
Expand Down
17 changes: 13 additions & 4 deletions frontend/src/components/JobsTable.vue
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,22 @@ function get_jobs() {
}
get_jobs();
function get_runtime_minutes(job: Job): number {
if (job.status === "running") {
return Math.ceil((Date.now() / 1000 - job.timestamp_start) / 60);
}
return Math.ceil((job.timestamp_end - job.timestamp_start) / 60);
}
</script>

<template>
<fwb-table aria-label="Runner jobs">
<fwb-table-head>
<fwb-table-head-cell>Id</fwb-table-head-cell>
<fwb-table-head-cell>JobId</fwb-table-head-cell>
<fwb-table-head-cell>SampleId</fwb-table-head-cell>
<fwb-table-head-cell>RunnerId</fwb-table-head-cell>
<fwb-table-head-cell>Hostname</fwb-table-head-cell>
<fwb-table-head-cell>Start</fwb-table-head-cell>
<fwb-table-head-cell>Runtime</fwb-table-head-cell>
<fwb-table-head-cell>Status</fwb-table-head-cell>
Expand All @@ -48,12 +57,12 @@ get_jobs();
>
<fwb-table-cell>{{ job.id }}</fwb-table-cell>
<fwb-table-cell>{{ job.sample_id }}</fwb-table-cell>
<fwb-table-cell>{{ job.runner_id }}</fwb-table-cell>
<fwb-table-cell>{{ job.runner_hostname }}</fwb-table-cell>
<fwb-table-cell>{{
new Date(job.timestamp_start * 1000).toISOString()
}}</fwb-table-cell>
<fwb-table-cell
>{{ (job.timestamp_end - job.timestamp_start) / 60 }}m</fwb-table-cell
>
<fwb-table-cell>{{ get_runtime_minutes(job) }}m</fwb-table-cell>
<fwb-table-cell>{{ job.status }}</fwb-table-cell>
<fwb-table-cell>{{ job.error_message }}</fwb-table-cell>
</fwb-table-row>
Expand Down
25 changes: 25 additions & 0 deletions frontend/src/components/RefreshButton.vue
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<script setup lang="ts">
import { FwbButton } from "flowbite-vue";
</script>

<template>
<fwb-button>
<svg
class="w-6 h-6 text-gray-800 dark:text-white"
aria-hidden="true"
xmlns="http://www.w3.org/2000/svg"
width="24"
height="24"
fill="none"
viewBox="0 0 24 24"
>
<path
stroke="currentColor"
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
d="M17.651 7.65a7.131 7.131 0 0 0-12.68 3.15M18.001 4v4h-4m-7.652 8.35a7.13 7.13 0 0 0 12.68-3.15M6 20v-4h4"
/>
</svg>
</fwb-button>
</template>
8 changes: 8 additions & 0 deletions frontend/src/components/SettingsTable.vue
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ function update_settings() {
class="mb-2"
label="Required columns in CSV file (separated by ;)"
></fwb-input>
<fwb-range
v-model="settings.runner_job_timeout_mins"
:steps="1"
:min="1"
:max="360"
:label="`Timeout for runner jobs: ${settings.runner_job_timeout_mins} minutes`"
class="mb-2"
/>
<fwb-button @click="update_settings" color="green">
Save settings</fwb-button
>
Expand Down
4 changes: 4 additions & 0 deletions frontend/src/utils/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export type Sample = {
tumor_type: string;
source: number;
timestamp: number;
timestamp_results: number;
status: string;
has_results_zip: boolean;
};
Expand All @@ -30,11 +31,14 @@ export type Settings = {
tumor_types: string;
sources: string;
csv_required_columns: string;
runner_job_timeout_mins: number;
};

export type Job = {
id: number;
sample_id: number;
runner_id: number;
runner_hostname: string;
timestamp_start: number;
timestamp_end: number;
status: string;
Expand Down
5 changes: 3 additions & 2 deletions frontend/src/views/SamplesView.vue
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,13 @@ update_samples();
const submit_message = ref("");
let update_submit_message_timer = setInterval(() => {
let update_data = setInterval(() => {
update_samples();
update_submit_message();
}, 30000);
onUnmounted(() => {
clearInterval(update_submit_message_timer);
clearInterval(update_data);
});
function update_submit_message() {
Expand Down
4 changes: 2 additions & 2 deletions runner/scripts/script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

echo "Starting fake analysis..."

echo "Sleeping for 1 minute..."
echo "Sleeping for 5 minutes..."

sleep 60
sleep 300

ls > result.zip

Expand Down

0 comments on commit a636a50

Please sign in to comment.