From f09191f4a55844642eec979101b4a31502ea56f0 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Mon, 8 Mar 2021 21:08:40 -0500 Subject: [PATCH 1/2] stop jobs once all tasks are stopped --- src/api-service/__app__/onefuzzlib/jobs.py | 14 ++++ .../__app__/onefuzzlib/tasks/main.py | 71 ++++++++++--------- 2 files changed, 52 insertions(+), 33 deletions(-) diff --git a/src/api-service/__app__/onefuzzlib/jobs.py b/src/api-service/__app__/onefuzzlib/jobs.py index db09dbad08..2686a5af25 100644 --- a/src/api-service/__app__/onefuzzlib/jobs.py +++ b/src/api-service/__app__/onefuzzlib/jobs.py @@ -85,6 +85,20 @@ def init(self) -> None: self.state = JobState.enabled self.save() + def stop_if_all_done(self) -> None: + not_stopped = [ + task + for task in Task.search(query={"job_id": [self.job_id]}) + if task.state != TaskState.stopped + ] + if not_stopped: + return + + logging.info( + JOB_LOG_PREFIX + "stopping job as all tasks are stopped: %s", self.job_id + ) + self.stopping() + def stopping(self) -> None: self.state = JobState.stopping logging.info(JOB_LOG_PREFIX + "stopping: %s", self.job_id) diff --git a/src/api-service/__app__/onefuzzlib/tasks/main.py b/src/api-service/__app__/onefuzzlib/tasks/main.py index 753117abc1..d2161c3987 100644 --- a/src/api-service/__app__/onefuzzlib/tasks/main.py +++ b/src/api-service/__app__/onefuzzlib/tasks/main.py @@ -126,12 +126,17 @@ def init(self) -> None: def stopping(self) -> None: # TODO: we need to 'unschedule' this task from the existing pools + from ..jobs import Job logging.info("stopping task: %s:%s", self.job_id, self.task_id) ProxyForward.remove_forward(self.task_id) delete_queue(str(self.task_id), StorageType.corpus) Node.stop_task(self.task_id) - self.set_state(TaskState.stopped, send=False) + self.set_state(TaskState.stopped) + + job = Job.get(self.job_id) + if job: + job.stop_if_all_done() @classmethod def search_states( @@ -189,15 +194,7 @@ def mark_stopping(self) -> None: ) return - self.set_state(TaskState.stopping, send=False) - send_event( - EventTaskStopped( - job_id=self.job_id, - task_id=self.task_id, - user_info=self.user_info, - config=self.config, - ) - ) + self.set_state(TaskState.stopping) def mark_failed(self, error: Error) -> None: if self.state in [TaskState.stopped, TaskState.stopping]: @@ -207,17 +204,7 @@ def mark_failed(self, error: Error) -> None: return self.error = error - self.set_state(TaskState.stopping, send=False) - - send_event( - EventTaskFailed( - job_id=self.job_id, - task_id=self.task_id, - error=error, - user_info=self.user_info, - config=self.config, - ) - ) + self.set_state(TaskState.stopping) def get_pool(self) -> Optional[Pool]: if self.config.pool: @@ -294,22 +281,40 @@ def on_start(self) -> None: def key_fields(cls) -> Tuple[str, str]: return ("job_id", "task_id") - def set_state(self, state: TaskState, send: bool = True) -> None: - if self.state == state: - return - + def set_state(self, state: TaskState) -> None: self.state = state if self.state in [TaskState.running, TaskState.setting_up]: self.on_start() self.save() - send_event( - EventTaskStateUpdated( - job_id=self.job_id, - task_id=self.task_id, - state=self.state, - end_time=self.end_time, - config=self.config, + if self.state == TaskState.stopped: + if self.error: + send_event( + EventTaskFailed( + job_id=self.job_id, + task_id=self.task_id, + error=self.error, + user_info=self.user_info, + config=self.config, + ) + ) + else: + send_event( + EventTaskStopped( + job_id=self.job_id, + task_id=self.task_id, + user_info=self.user_info, + config=self.config, + ) + ) + else: + send_event( + EventTaskStateUpdated( + job_id=self.job_id, + task_id=self.task_id, + state=self.state, + end_time=self.end_time, + config=self.config, + ) ) - ) From f8d590bdeffaa30eedd3b3be399f744998d0390b Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Tue, 9 Mar 2021 09:52:06 -0500 Subject: [PATCH 2/2] split out set_state updates into separate pr --- .../__app__/onefuzzlib/tasks/main.py | 66 +++++++++---------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/src/api-service/__app__/onefuzzlib/tasks/main.py b/src/api-service/__app__/onefuzzlib/tasks/main.py index d2161c3987..2a04f3da24 100644 --- a/src/api-service/__app__/onefuzzlib/tasks/main.py +++ b/src/api-service/__app__/onefuzzlib/tasks/main.py @@ -132,7 +132,7 @@ def stopping(self) -> None: ProxyForward.remove_forward(self.task_id) delete_queue(str(self.task_id), StorageType.corpus) Node.stop_task(self.task_id) - self.set_state(TaskState.stopped) + self.set_state(TaskState.stopped, send=False) job = Job.get(self.job_id) if job: @@ -194,7 +194,15 @@ def mark_stopping(self) -> None: ) return - self.set_state(TaskState.stopping) + self.set_state(TaskState.stopping, send=False) + send_event( + EventTaskStopped( + job_id=self.job_id, + task_id=self.task_id, + user_info=self.user_info, + config=self.config, + ) + ) def mark_failed(self, error: Error) -> None: if self.state in [TaskState.stopped, TaskState.stopping]: @@ -204,7 +212,17 @@ def mark_failed(self, error: Error) -> None: return self.error = error - self.set_state(TaskState.stopping) + self.set_state(TaskState.stopping, send=False) + + send_event( + EventTaskFailed( + job_id=self.job_id, + task_id=self.task_id, + error=error, + user_info=self.user_info, + config=self.config, + ) + ) def get_pool(self) -> Optional[Pool]: if self.config.pool: @@ -281,40 +299,22 @@ def on_start(self) -> None: def key_fields(cls) -> Tuple[str, str]: return ("job_id", "task_id") - def set_state(self, state: TaskState) -> None: + def set_state(self, state: TaskState, send: bool = True) -> None: + if self.state == state: + return + self.state = state if self.state in [TaskState.running, TaskState.setting_up]: self.on_start() self.save() - if self.state == TaskState.stopped: - if self.error: - send_event( - EventTaskFailed( - job_id=self.job_id, - task_id=self.task_id, - error=self.error, - user_info=self.user_info, - config=self.config, - ) - ) - else: - send_event( - EventTaskStopped( - job_id=self.job_id, - task_id=self.task_id, - user_info=self.user_info, - config=self.config, - ) - ) - else: - send_event( - EventTaskStateUpdated( - job_id=self.job_id, - task_id=self.task_id, - state=self.state, - end_time=self.end_time, - config=self.config, - ) + send_event( + EventTaskStateUpdated( + job_id=self.job_id, + task_id=self.task_id, + state=self.state, + end_time=self.end_time, + config=self.config, ) + )