From ebe0eaad11700becc6989dead93fe6e61a939719 Mon Sep 17 00:00:00 2001 From: itutu-tienday <> Date: Fri, 27 Dec 2024 12:29:12 +0900 Subject: [PATCH 1/4] first commit for "resolive abnormal workflow" --- studio/app/common/core/rules/runner.py | 83 ++---- .../core/snakemake/smk_status_logger.py | 10 +- .../common/core/workflow/workflow_result.py | 255 +++++++++++++++++- studio/app/common/routers/run.py | 8 +- studio/app/common/schemas/workflow.py | 27 ++ .../core/workflow/test_workflow_result.py | 5 + 6 files changed, 305 insertions(+), 83 deletions(-) diff --git a/studio/app/common/core/rules/runner.py b/studio/app/common/core/rules/runner.py index 7d8ea2371..d6ce2a692 100644 --- a/studio/app/common/core/rules/runner.py +++ b/studio/app/common/core/rules/runner.py @@ -2,13 +2,12 @@ import gc import json import os -import signal +import time import traceback from dataclasses import asdict from datetime import datetime from pathlib import Path -from fastapi import HTTPException from filelock import FileLock from studio.app.common.core.experiment.experiment import ExptOutputPathIds @@ -19,6 +18,7 @@ from studio.app.common.core.utils.file_reader import JsonReader from studio.app.common.core.utils.filepath_creater import join_filepath from studio.app.common.core.utils.pickle_handler import PickleReader, PickleWriter +from studio.app.common.schemas.workflow import WorkflowPIDFileData from studio.app.const import DATE_FORMAT from studio.app.dir_path import DIRPATH from studio.app.optinist.core.nwb.nwb_creater import ( @@ -40,9 +40,10 @@ def run(cls, __rule: Rule, last_output, run_script_path: str): logger.info("start rule runner") # write pid file - cls.__write_pid_file(__rule, run_script_path) + workflow_dirpath = str(Path(__rule.output).parent.parent) + cls.write_pid_file(workflow_dirpath, run_script_path) - input_info = cls.__read_input_info(__rule.input) + input_info = cls.read_input_info(__rule.input) cls.__change_dict_key_exist(input_info, __rule) nwbfile = input_info["nwbfile"] @@ -106,23 +107,31 @@ def __get_pid_file_path(cls, workspace_id: str, unique_id: str) -> str: return pid_file_path @classmethod - def __write_pid_file(cls, __rule: Rule, run_script_path: str) -> None: + def write_pid_file(cls, workflow_dirpath: str, run_script_path: str) -> None: """ save snakemake script file path and PID of current running algo function """ - pid_data = {"last_pid": os.getpid(), "last_script_file": run_script_path} + pid_data = WorkflowPIDFileData( + last_pid=os.getpid(), + last_script_file=run_script_path, + create_time=time.time(), + ) - workflow_dirpath = str(Path(__rule.output).parent.parent) ids = ExptOutputPathIds(workflow_dirpath) pid_file_path = cls.__get_pid_file_path(ids.workspace_id, ids.unique_id) with open(pid_file_path, "w") as f: - json.dump(pid_data, f) + json.dump(asdict(pid_data), f) @classmethod - def read_pid_file(cls, workspace_id: str, unique_id: str) -> dict: + def read_pid_file(cls, workspace_id: str, unique_id: str) -> WorkflowPIDFileData: pid_file_path = cls.__get_pid_file_path(workspace_id, unique_id) - pid_data = JsonReader.read(pid_file_path) + if not os.path.exists(pid_file_path): + return None + + pid_data_json = JsonReader.read(pid_file_path) + pid_data = WorkflowPIDFileData(**pid_data_json) + return pid_data @classmethod @@ -186,7 +195,7 @@ def __change_dict_key_exist(cls, input_info, rule_config: Rule): input_info[arg_name] = input_info.pop(return_name) @classmethod - def __read_input_info(cls, input_files): + def read_input_info(cls, input_files): input_info = {} for filepath in input_files: load_data = PickleReader.read(filepath) @@ -222,55 +231,3 @@ def __dict2leaf(cls, root_dict: dict, path_list): return cls.__dict2leaf(root_dict[path], path_list) else: return root_dict[path] - - @classmethod - def cancel_run(cls, workspace_id: str, unique_id: str): - """ - The algorithm function of this workflow is being executed at the line: - https://github.com/snakemake/snakemake/blob/27b224ed12448df8aebc7d1ff8f25e3bf7622232/snakemake/shell.py#L258 - ``` - proc = sp.Popen( - cmd, - bufsize=-1, - shell=use_shell, - stdout=stdout, - universal_newlines=iterable or read or None, - close_fds=close_fds, - **cls._process_args, - env=envvars, - ) - ``` - The `cmd` argument has the following format: - ``` - source ~/miniconda3/bin/activate - '/app/.snakemake/conda/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx_'; - set -euo pipefail; - python /app/.snakemake/scripts/tmpxxxxxxxx.func.py - ``` - Interrupt the conda activate at the beginning of the process is impossible - because it is only called when each algorithm function executes. - This workflow is cancelled by killing process via PID of algorithm function - saved in `RUN_PROCESS_PID_FILE` file - Raises: - HTTPException: if pid_filepath or last_script_file does not exist - """ - - pid_filepath = cls.__get_pid_file_path(workspace_id, unique_id) - - if not os.path.exists(pid_filepath): - raise HTTPException(status_code=404) - - pid_data = cls.read_pid_file(workspace_id, unique_id) - last_pid = pid_data["last_pid"] - last_script_file = pid_data["last_script_file"] - - if not os.path.exists(last_script_file): - logger.warning( - f"The run script has not yet started. script: {last_script_file}" - ) - raise HTTPException(status_code=404) - - os.remove(last_script_file) - os.kill(last_pid, signal.SIGTERM) - - return True diff --git a/studio/app/common/core/snakemake/smk_status_logger.py b/studio/app/common/core/snakemake/smk_status_logger.py index e1ec558d8..b75067953 100644 --- a/studio/app/common/core/snakemake/smk_status_logger.py +++ b/studio/app/common/core/snakemake/smk_status_logger.py @@ -8,6 +8,7 @@ create_directory, join_filepath, ) +from studio.app.common.schemas.workflow import WorkflowErrorInfo from studio.app.dir_path import DIRPATH @@ -67,7 +68,7 @@ def __init_error_log_file(cls, workspace_id: str, unique_id: str) -> str: return log_file_path @classmethod - def get_error_content(cls, workspace_id: str, unique_id: str) -> dict: + def get_error_content(cls, workspace_id: str, unique_id: str) -> WorkflowErrorInfo: log_file_path = cls.__get_error_log_file_path(workspace_id, unique_id) if os.path.exists(log_file_path): @@ -77,7 +78,7 @@ def get_error_content(cls, workspace_id: str, unique_id: str) -> dict: error_log = None has_error = False - return {"has_error": has_error, "error_log": error_log} + return WorkflowErrorInfo(has_error=has_error, error_log=error_log) def __init__(self, workspace_id, unique_id): self.workspace_id = workspace_id @@ -113,7 +114,10 @@ def log_handler(self, msg: Dict[str, str] = None): # since multiple running workflow share log data, # check if message really belongs to the current workflow - if pid_data["last_script_file"] in msg["msg"]: + if ( + pid_data is not None + and pid_data.last_script_file in msg["msg"] + ): self.logger.error("Workflow cancelled") else: self.logger.error(msg) diff --git a/studio/app/common/core/workflow/workflow_result.py b/studio/app/common/core/workflow/workflow_result.py index 7e789fd9c..7581666cd 100644 --- a/studio/app/common/core/workflow/workflow_result.py +++ b/studio/app/common/core/workflow/workflow_result.py @@ -1,22 +1,37 @@ import os +import re +import signal +import time from dataclasses import asdict from datetime import datetime from glob import glob -from typing import Dict +from http.client import HTTPException +from typing import Dict, List + +from psutil import AccessDenied, NoSuchProcess, Process, ZombieProcess, process_iter from studio.app.common.core.experiment.experiment_reader import ExptConfigReader from studio.app.common.core.experiment.experiment_writer import ExptConfigWriter +from studio.app.common.core.logger import AppLogger +from studio.app.common.core.rules.runner import Runner from studio.app.common.core.snakemake.smk_status_logger import SmkStatusLogger from studio.app.common.core.utils.filepath_creater import join_filepath from studio.app.common.core.utils.pickle_handler import PickleReader from studio.app.common.core.workflow.workflow import Message, NodeRunStatus, OutputPath from studio.app.common.dataclass import BaseData +from studio.app.common.schemas.workflow import ( + WorkflowErrorInfo, + WorkflowPIDFileData, + WorkflowProcessInfo, +) from studio.app.const import DATE_FORMAT from studio.app.dir_path import DIRPATH +logger = AppLogger.get_logger() + class WorkflowResult: - def __init__(self, workspace_id, unique_id): + def __init__(self, workspace_id: str, unique_id: str): self.workspace_id = workspace_id self.unique_id = unique_id self.workflow_dirpath = join_filepath( @@ -29,15 +44,17 @@ def __init__(self, workspace_id, unique_id): self.expt_filepath = join_filepath( [self.workflow_dirpath, DIRPATH.EXPERIMENT_YML] ) + self.monitor = WorkflowMonitor(workspace_id, unique_id) - def observe(self, nodeIdList) -> Dict: + def observe(self, nodeIdList: List[str]) -> Dict: """ Perform the following operations for the specified workflow - Check and update the workflow execution status - Response with the confirmed workflow execution status """ - - results: Dict[str, Message] = {} + # validate args + if not nodeIdList: + return {} # check for workflow errors workflow_error = SmkStatusLogger.get_error_content( @@ -45,9 +62,38 @@ def observe(self, nodeIdList) -> Dict: ) # observe node list + results = self.__observe_node_list(nodeIdList, workflow_error) + + # check workflow status + is_workflow_status_running = self.__is_workflow_status_running( + nodeIdList, results + ) + + # If the workflow status is running (workflow is incomplete), + # check whether the actual process exists. + if is_workflow_status_running: + # check workflow process exists + current_process = self.monitor.search_process() + + # error handling for process not found + if current_process is None: + workflow_error = WorkflowErrorInfo( + has_error=True, error_log="No Snakemake process found." + ) + + # re-run observe node list (reflects workflow error) + results = self.__observe_node_list(nodeIdList, workflow_error) + + return results + + def __observe_node_list( + self, nodeIdList: List[str], workflow_error: WorkflowErrorInfo + ) -> Dict[str, Message]: + results: Dict[str, Message] = {} + for node_id in nodeIdList: # Cases with errors in workflow - if workflow_error["has_error"]: + if workflow_error.has_error: node_pickle_path = None node_result = NodeResult( self.workspace_id, @@ -84,7 +130,24 @@ def observe(self, nodeIdList) -> Dict: return results - def __check_has_nwb(self, node_id=None): + def __is_workflow_status_running( + self, nodeIdList: List[str], messages: Dict[str, Message] + ) -> bool: + """ + By comparing the number of nodeIdList waiting for processing completion + with the number of nodeIdList that has completed processing immediately before, + is_running is determined. + """ + is_running = len(nodeIdList) != len(messages.keys()) + + logger.debug( + "check wornflow running status " + f"[{self.workspace_id}/{self.unique_id}] [is_running: {is_running}]" + ) + + return is_running + + def __check_has_nwb(self, node_id=None) -> None: target_whole_nwb = node_id is None if target_whole_nwb: @@ -115,7 +178,7 @@ def __init__( unique_id: str, node_id: str, pickle_filepath: str, - workflow_error: dict = None, + workflow_error: WorkflowErrorInfo = None, ): self.workspace_id = workspace_id self.unique_id = unique_id @@ -131,12 +194,8 @@ def __init__( self.expt_filepath = join_filepath( [self.workflow_dirpath, DIRPATH.EXPERIMENT_YML] ) - self.workflow_has_error = ( - workflow_error["has_error"] if workflow_error else False - ) - self.workflow_error_log = ( - workflow_error["error_log"] if workflow_error else None - ) + self.workflow_has_error = workflow_error.has_error if workflow_error else False + self.workflow_error_log = workflow_error.error_log if workflow_error else None if not self.workflow_has_error: pickle_filepath = pickle_filepath.replace("\\", "/") @@ -211,3 +270,171 @@ def output_paths(self) -> dict: output_paths[k] = v.output_path return output_paths + + +class WorkflowMonitor: + PROCESS_SNAKEMAKE_CMDLINE = "python .*/\\.snakemake/scripts/" + PROCESS_SNAKEMAKE_WAIT_TIMEOUT = 180 # sec + PROCESS_CONDA_CMDLINE = "conda env create .*/\\.snakemake/conda/" + PROCESS_CONDA_WAIT_TIMEOUT = 3600 + + def __init__(self, workspace_id: str, unique_id: str): + self.workspace_id = workspace_id + self.unique_id = unique_id + + def search_process(self) -> WorkflowProcessInfo: + pid_data = Runner.read_pid_file(self.workspace_id, self.unique_id) + if pid_data is None: + logger.warning( + f"No workflow pid file found. [{self.workspace_id}/{self.unique_id}]" + ) + + # Set dummy value to proceed to the next step. + pid_data = WorkflowPIDFileData( + last_pid=999999, + last_script_file="__dummy_function.py", + create_time=time.time(), + ) + + process_result: WorkflowProcessInfo = None + + # Find the process corresponding to the pid in pid_data + try: + last_pid = pid_data.last_pid + + # get process + process = Process(last_pid) + logger.info(f"Found workflow process. {process}") + + # validate process name + process_cmdline = " ".join(process.cmdline()) + if not re.search(self.PROCESS_SNAKEMAKE_CMDLINE, process_cmdline): + logger.warning( + "Found another process with same PID:" + f" [{last_pid}] [{process_cmdline}]" + ) + raise NoSuchProcess(last_pid) + + process_result = WorkflowProcessInfo(process=process, pid_data=pid_data) + + # If the target process does not exist, + # check for the existence of the `conda env create` command process. + except NoSuchProcess: + logger.warning(f"No workflow process found. {pid_data}") + + # Search for the existence of a conda command process ("conda env create") + conda_process = None + for proc in process_iter(["pid", "name", "cmdline"]): + try: + cmdline = proc.info.get("cmdline") + cmdline = " ".join(cmdline) if cmdline else "" + + if re.search(self.PROCESS_CONDA_CMDLINE, cmdline): + conda_process_create_elapsed = int( + time.time() - proc.create_time() + ) + logger.info( + f"Found conda process. [{proc}] [{cmdline}] " + f"[{conda_process_create_elapsed}]", + ) + conda_process = Process(proc.pid) + + # Check elapsed time for process startup + # + # ATTENTION: + # The conda command process is a separate process from + # the snakemake process (although it is a child process), + # so it is difficult to identify the process with certainty. + # Therefore, the process start time is used here to determine + # the process by estimation. + if ( + conda_process_create_elapsed + < self.PROCESS_CONDA_WAIT_TIMEOUT + ): + process_result = WorkflowProcessInfo( + process=conda_process, pid_data=pid_data + ) + else: + logger.warning( + "This conda command is " + "probably an irrelevant process.." + f"[{conda_process}] [{conda_process_create_elapsed}]" + ) + else: + continue # skip that process + + except AccessDenied: + continue # skip that process + except ZombieProcess: + continue # skip that process + + # Rescue action when process not found + if conda_process is None: + # Check elapsed time for process startup + # *Retry for a certain period of time even if process not found + if pid_data.elapsed_time < self.PROCESS_SNAKEMAKE_WAIT_TIMEOUT: + logger.debug( + f"Set dummy workflow process tentatively. [{pid_data}]" + ) + process_result = WorkflowProcessInfo( + process=None, pid_data=pid_data + ) + else: + logger.warning(f"No workflow process found at all. [{pid_data}]") + process_result = None + + return process_result + + def cancel_run(self): + """ + The algorithm function of this workflow is being executed at the line: + https://github.com/snakemake/snakemake/blob/27b224ed12448df8aebc7d1ff8f25e3bf7622232/snakemake/shell.py#L258 + ``` + proc = sp.Popen( + cmd, + bufsize=-1, + shell=use_shell, + stdout=stdout, + universal_newlines=iterable or read or None, + close_fds=close_fds, + **cls._process_args, + env=envvars, + ) + ``` + The `cmd` argument has the following format: + ``` + source ~/miniconda3/bin/activate + '/app/.snakemake/conda/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx_'; + set -euo pipefail; + python /app/.snakemake/scripts/tmpxxxxxxxx.func.py + ``` + Interrupt the conda activate at the beginning of the process is impossible + because it is only called when each algorithm function executes. + This workflow is cancelled by killing process via PID of algorithm function + saved in `RUN_PROCESS_PID_FILE` file + Raises: + HTTPException: if pid_filepath or last_script_file does not exist + """ + + current_process = self.search_process() + if current_process is None: + raise HTTPException(status_code=404) + elif current_process.process is None: + raise HTTPException(status_code=404) + + pid_data = current_process.pid_data + + if os.path.exists(pid_data.last_script_file): + # force remove run script file + os.remove(pid_data.last_script_file) + + else: + logger.warning( + "The run script has not yet started. " + f"script: {pid_data.last_script_file}" + ) + + # send kill to process + current_process.process.send_signal(signal.SIGTERM) + + return True diff --git a/studio/app/common/routers/run.py b/studio/app/common/routers/run.py index 8b77c5487..e9a92b264 100644 --- a/studio/app/common/routers/run.py +++ b/studio/app/common/routers/run.py @@ -3,9 +3,11 @@ from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status from studio.app.common.core.logger import AppLogger -from studio.app.common.core.rules.runner import Runner from studio.app.common.core.workflow.workflow import Message, NodeItem, RunItem -from studio.app.common.core.workflow.workflow_result import WorkflowResult +from studio.app.common.core.workflow.workflow_result import ( + WorkflowMonitor, + WorkflowResult, +) from studio.app.common.core.workflow.workflow_runner import WorkflowRunner from studio.app.common.core.workspace.workspace_dependencies import ( is_workspace_available, @@ -86,7 +88,7 @@ async def run_result(workspace_id: str, uid: str, nodeDict: NodeItem): ) async def cancel_run(workspace_id: str, uid: str): try: - return Runner.cancel_run(workspace_id, uid) + return WorkflowMonitor(workspace_id, uid).cancel_run() except HTTPException as e: logger.error(e) raise e diff --git a/studio/app/common/schemas/workflow.py b/studio/app/common/schemas/workflow.py index 686001789..67f4246cc 100644 --- a/studio/app/common/schemas/workflow.py +++ b/studio/app/common/schemas/workflow.py @@ -1,6 +1,8 @@ from dataclasses import dataclass from typing import Dict, Optional +from psutil import Process + from studio.app.common.core.experiment.experiment import ExptFunction from studio.app.common.core.workflow.workflow import Edge, Node @@ -23,3 +25,28 @@ class WorkflowWithResults: function: Dict[str, ExptFunction] nodeDict: Dict[str, Node] edgeDict: Dict[str, Edge] + + +@dataclass +class WorkflowPIDFileData: + last_pid: int + last_script_file: str + create_time: float + elapsed_time: float = None + + def __post_init__(self): + import time + + self.elapsed_time = time.time() - self.create_time + + +@dataclass +class WorkflowProcessInfo: + process: Process + pid_data: WorkflowPIDFileData + + +@dataclass +class WorkflowErrorInfo: + has_error: bool + error_log: str diff --git a/studio/tests/app/common/core/workflow/test_workflow_result.py b/studio/tests/app/common/core/workflow/test_workflow_result.py index 3e128d9bc..e86463804 100644 --- a/studio/tests/app/common/core/workflow/test_workflow_result.py +++ b/studio/tests/app/common/core/workflow/test_workflow_result.py @@ -1,6 +1,7 @@ import os import shutil +from studio.app.common.core.rules.runner import Runner from studio.app.common.core.workflow.workflow import Message from studio.app.common.core.workflow.workflow_result import NodeResult, WorkflowResult from studio.app.dir_path import DIRPATH @@ -22,6 +23,10 @@ def test_WorkflowResult_get(): output_dirpath, dirs_exist_ok=True, ) + + # first, write pid_file + Runner.write_pid_file(output_dirpath, "xxxx_dummy_func_script.py") + output = WorkflowResult(workspace_id=workspace_id, unique_id=unique_id).observe( node_id_list ) From d7d619db0beb8ff3e6bf715fa6b37bb9d0ff0745 Mon Sep 17 00:00:00 2001 From: itutu-tienday <> Date: Sat, 28 Dec 2024 10:32:51 +0900 Subject: [PATCH 2/4] minor fix for WorkflowMonitor --- .../common/core/workflow/workflow_result.py | 40 +++++++------------ 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/studio/app/common/core/workflow/workflow_result.py b/studio/app/common/core/workflow/workflow_result.py index 7581666cd..a4a2e3a51 100644 --- a/studio/app/common/core/workflow/workflow_result.py +++ b/studio/app/common/core/workflow/workflow_result.py @@ -276,7 +276,7 @@ class WorkflowMonitor: PROCESS_SNAKEMAKE_CMDLINE = "python .*/\\.snakemake/scripts/" PROCESS_SNAKEMAKE_WAIT_TIMEOUT = 180 # sec PROCESS_CONDA_CMDLINE = "conda env create .*/\\.snakemake/conda/" - PROCESS_CONDA_WAIT_TIMEOUT = 3600 + PROCESS_CONDA_WAIT_TIMEOUT = 3600 # sec def __init__(self, workspace_id: str, unique_id: str): self.workspace_id = workspace_id @@ -292,7 +292,7 @@ def search_process(self) -> WorkflowProcessInfo: # Set dummy value to proceed to the next step. pid_data = WorkflowPIDFileData( last_pid=999999, - last_script_file="__dummy_function.py", + last_script_file="__dummy_wrapper_function.py", create_time=time.time(), ) @@ -330,12 +330,10 @@ def search_process(self) -> WorkflowProcessInfo: cmdline = " ".join(cmdline) if cmdline else "" if re.search(self.PROCESS_CONDA_CMDLINE, cmdline): - conda_process_create_elapsed = int( - time.time() - proc.create_time() - ) + conda_ps_create_elapsed = int(time.time() - proc.create_time()) logger.info( f"Found conda process. [{proc}] [{cmdline}] " - f"[{conda_process_create_elapsed}]", + f"[{conda_ps_create_elapsed} sec]", ) conda_process = Process(proc.pid) @@ -347,10 +345,7 @@ def search_process(self) -> WorkflowProcessInfo: # so it is difficult to identify the process with certainty. # Therefore, the process start time is used here to determine # the process by estimation. - if ( - conda_process_create_elapsed - < self.PROCESS_CONDA_WAIT_TIMEOUT - ): + if conda_ps_create_elapsed < self.PROCESS_CONDA_WAIT_TIMEOUT: process_result = WorkflowProcessInfo( process=conda_process, pid_data=pid_data ) @@ -358,7 +353,7 @@ def search_process(self) -> WorkflowProcessInfo: logger.warning( "This conda command is " "probably an irrelevant process.." - f"[{conda_process}] [{conda_process_create_elapsed}]" + f"[{conda_process}] [{conda_ps_create_elapsed} sec]" ) else: continue # skip that process @@ -368,20 +363,15 @@ def search_process(self) -> WorkflowProcessInfo: except ZombieProcess: continue # skip that process - # Rescue action when process not found - if conda_process is None: - # Check elapsed time for process startup - # *Retry for a certain period of time even if process not found - if pid_data.elapsed_time < self.PROCESS_SNAKEMAKE_WAIT_TIMEOUT: - logger.debug( - f"Set dummy workflow process tentatively. [{pid_data}]" - ) - process_result = WorkflowProcessInfo( - process=None, pid_data=pid_data - ) - else: - logger.warning(f"No workflow process found at all. [{pid_data}]") - process_result = None + # Rescue action when process not found + if process_result is None: + # Check elapsed time for process startup + # *Retry for a certain period of time even if process not found + if pid_data.elapsed_time < self.PROCESS_SNAKEMAKE_WAIT_TIMEOUT: + logger.debug(f"Set dummy workflow process tentatively. [{pid_data}]") + process_result = WorkflowProcessInfo(process=None, pid_data=pid_data) + else: + logger.warning(f"No workflow process found at all. [{pid_data}]") return process_result From 6be3049d7803eb25d8a0f5507eee3b0704d0bd11 Mon Sep 17 00:00:00 2001 From: itutu-tienday <> Date: Tue, 7 Jan 2025 17:09:58 +0900 Subject: [PATCH 3/4] resolve abnormal workflow) fix workflow timeout check bug (improved pid file check) --- .../common/core/workflow/workflow_result.py | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/studio/app/common/core/workflow/workflow_result.py b/studio/app/common/core/workflow/workflow_result.py index a4a2e3a51..bb47d308d 100644 --- a/studio/app/common/core/workflow/workflow_result.py +++ b/studio/app/common/core/workflow/workflow_result.py @@ -44,7 +44,7 @@ def __init__(self, workspace_id: str, unique_id: str): self.expt_filepath = join_filepath( [self.workflow_dirpath, DIRPATH.EXPERIMENT_YML] ) - self.monitor = WorkflowMonitor(workspace_id, unique_id) + self.monitor = WorkflowMonitor(workspace_id, unique_id, self.expt_filepath) def observe(self, nodeIdList: List[str]) -> Dict: """ @@ -278,9 +278,10 @@ class WorkflowMonitor: PROCESS_CONDA_CMDLINE = "conda env create .*/\\.snakemake/conda/" PROCESS_CONDA_WAIT_TIMEOUT = 3600 # sec - def __init__(self, workspace_id: str, unique_id: str): + def __init__(self, workspace_id: str, unique_id: str, expt_filepath: str): self.workspace_id = workspace_id self.unique_id = unique_id + self.expt_filepath = expt_filepath def search_process(self) -> WorkflowProcessInfo: pid_data = Runner.read_pid_file(self.workspace_id, self.unique_id) @@ -289,14 +290,23 @@ def search_process(self) -> WorkflowProcessInfo: f"No workflow pid file found. [{self.workspace_id}/{self.unique_id}]" ) + # Refer experiment_data instead of pid_data + expt_config = ExptConfigReader.read(self.expt_filepath) + try: + expt_started_time = datetime.strptime( + expt_config.started_at, DATE_FORMAT + ) + except ValueError: + expt_started_time = datetime.fromtimestamp(0) + # Set dummy value to proceed to the next step. pid_data = WorkflowPIDFileData( last_pid=999999, last_script_file="__dummy_wrapper_function.py", - create_time=time.time(), + create_time=expt_started_time.timestamp(), ) - process_result: WorkflowProcessInfo = None + process_data: WorkflowProcessInfo = None # Find the process corresponding to the pid in pid_data try: @@ -315,7 +325,7 @@ def search_process(self) -> WorkflowProcessInfo: ) raise NoSuchProcess(last_pid) - process_result = WorkflowProcessInfo(process=process, pid_data=pid_data) + process_data = WorkflowProcessInfo(process=process, pid_data=pid_data) # If the target process does not exist, # check for the existence of the `conda env create` command process. @@ -346,7 +356,7 @@ def search_process(self) -> WorkflowProcessInfo: # Therefore, the process start time is used here to determine # the process by estimation. if conda_ps_create_elapsed < self.PROCESS_CONDA_WAIT_TIMEOUT: - process_result = WorkflowProcessInfo( + process_data = WorkflowProcessInfo( process=conda_process, pid_data=pid_data ) else: @@ -364,16 +374,16 @@ def search_process(self) -> WorkflowProcessInfo: continue # skip that process # Rescue action when process not found - if process_result is None: + if process_data is None: # Check elapsed time for process startup # *Retry for a certain period of time even if process not found if pid_data.elapsed_time < self.PROCESS_SNAKEMAKE_WAIT_TIMEOUT: logger.debug(f"Set dummy workflow process tentatively. [{pid_data}]") - process_result = WorkflowProcessInfo(process=None, pid_data=pid_data) + process_data = WorkflowProcessInfo(process=None, pid_data=pid_data) else: logger.warning(f"No workflow process found at all. [{pid_data}]") - return process_result + return process_data def cancel_run(self): """ From 6bf19d7a449c820ae74ec3ead00ddc3ece1c3922 Mon Sep 17 00:00:00 2001 From: itutu-tienday <> Date: Wed, 8 Jan 2025 15:14:02 +0900 Subject: [PATCH 4/4] resolve abnormal workflow) - simplified WorkflowMonitor init params - fix HTTPException import error --- .../common/core/workflow/workflow_result.py | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/studio/app/common/core/workflow/workflow_result.py b/studio/app/common/core/workflow/workflow_result.py index bb47d308d..1b517d695 100644 --- a/studio/app/common/core/workflow/workflow_result.py +++ b/studio/app/common/core/workflow/workflow_result.py @@ -5,9 +5,9 @@ from dataclasses import asdict from datetime import datetime from glob import glob -from http.client import HTTPException from typing import Dict, List +from fastapi import HTTPException, status from psutil import AccessDenied, NoSuchProcess, Process, ZombieProcess, process_iter from studio.app.common.core.experiment.experiment_reader import ExptConfigReader @@ -44,7 +44,7 @@ def __init__(self, workspace_id: str, unique_id: str): self.expt_filepath = join_filepath( [self.workflow_dirpath, DIRPATH.EXPERIMENT_YML] ) - self.monitor = WorkflowMonitor(workspace_id, unique_id, self.expt_filepath) + self.monitor = WorkflowMonitor(workspace_id, unique_id) def observe(self, nodeIdList: List[str]) -> Dict: """ @@ -278,10 +278,17 @@ class WorkflowMonitor: PROCESS_CONDA_CMDLINE = "conda env create .*/\\.snakemake/conda/" PROCESS_CONDA_WAIT_TIMEOUT = 3600 # sec - def __init__(self, workspace_id: str, unique_id: str, expt_filepath: str): + def __init__(self, workspace_id: str, unique_id: str): self.workspace_id = workspace_id self.unique_id = unique_id - self.expt_filepath = expt_filepath + self.expt_filepath = join_filepath( + [ + DIRPATH.OUTPUT_DIR, + self.workspace_id, + self.unique_id, + DIRPATH.EXPERIMENT_YML, + ] + ) def search_process(self) -> WorkflowProcessInfo: pid_data = Runner.read_pid_file(self.workspace_id, self.unique_id) @@ -418,10 +425,15 @@ def cancel_run(self): current_process = self.search_process() if current_process is None: - raise HTTPException(status_code=404) + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Current process not found", + ) elif current_process.process is None: - raise HTTPException(status_code=404) - + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Current process not found", + ) pid_data = current_process.pid_data if os.path.exists(pid_data.last_script_file):