Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve abnormal workflow #541

Merged
merged 4 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 20 additions & 63 deletions studio/app/common/core/rules/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
import gc
import json
import os
import signal
import time
import traceback
from dataclasses import asdict
from datetime import datetime
from pathlib import Path

from fastapi import HTTPException
from filelock import FileLock

from studio.app.common.core.experiment.experiment import ExptOutputPathIds
Expand All @@ -19,6 +18,7 @@
from studio.app.common.core.utils.file_reader import JsonReader
from studio.app.common.core.utils.filepath_creater import join_filepath
from studio.app.common.core.utils.pickle_handler import PickleReader, PickleWriter
from studio.app.common.schemas.workflow import WorkflowPIDFileData
from studio.app.const import DATE_FORMAT
from studio.app.dir_path import DIRPATH
from studio.app.optinist.core.nwb.nwb_creater import (
Expand All @@ -40,9 +40,10 @@ def run(cls, __rule: Rule, last_output, run_script_path: str):
logger.info("start rule runner")

# write pid file
cls.__write_pid_file(__rule, run_script_path)
workflow_dirpath = str(Path(__rule.output).parent.parent)
cls.write_pid_file(workflow_dirpath, run_script_path)

input_info = cls.__read_input_info(__rule.input)
input_info = cls.read_input_info(__rule.input)
cls.__change_dict_key_exist(input_info, __rule)
nwbfile = input_info["nwbfile"]

Expand Down Expand Up @@ -106,23 +107,31 @@ def __get_pid_file_path(cls, workspace_id: str, unique_id: str) -> str:
return pid_file_path

@classmethod
def __write_pid_file(cls, __rule: Rule, run_script_path: str) -> None:
def write_pid_file(cls, workflow_dirpath: str, run_script_path: str) -> None:
"""
save snakemake script file path and PID of current running algo function
"""
pid_data = {"last_pid": os.getpid(), "last_script_file": run_script_path}
pid_data = WorkflowPIDFileData(
last_pid=os.getpid(),
last_script_file=run_script_path,
create_time=time.time(),
)

workflow_dirpath = str(Path(__rule.output).parent.parent)
ids = ExptOutputPathIds(workflow_dirpath)
pid_file_path = cls.__get_pid_file_path(ids.workspace_id, ids.unique_id)

with open(pid_file_path, "w") as f:
json.dump(pid_data, f)
json.dump(asdict(pid_data), f)

@classmethod
def read_pid_file(cls, workspace_id: str, unique_id: str) -> dict:
def read_pid_file(cls, workspace_id: str, unique_id: str) -> WorkflowPIDFileData:
pid_file_path = cls.__get_pid_file_path(workspace_id, unique_id)
pid_data = JsonReader.read(pid_file_path)
if not os.path.exists(pid_file_path):
return None

pid_data_json = JsonReader.read(pid_file_path)
pid_data = WorkflowPIDFileData(**pid_data_json)

return pid_data

@classmethod
Expand Down Expand Up @@ -186,7 +195,7 @@ def __change_dict_key_exist(cls, input_info, rule_config: Rule):
input_info[arg_name] = input_info.pop(return_name)

@classmethod
def __read_input_info(cls, input_files):
def read_input_info(cls, input_files):
input_info = {}
for filepath in input_files:
load_data = PickleReader.read(filepath)
Expand Down Expand Up @@ -222,55 +231,3 @@ def __dict2leaf(cls, root_dict: dict, path_list):
return cls.__dict2leaf(root_dict[path], path_list)
else:
return root_dict[path]

@classmethod
def cancel_run(cls, workspace_id: str, unique_id: str):
"""
The algorithm function of this workflow is being executed at the line:
https://github.com/snakemake/snakemake/blob/27b224ed12448df8aebc7d1ff8f25e3bf7622232/snakemake/shell.py#L258
```
proc = sp.Popen(
cmd,
bufsize=-1,
shell=use_shell,
stdout=stdout,
universal_newlines=iterable or read or None,
close_fds=close_fds,
**cls._process_args,
env=envvars,
)
```
The `cmd` argument has the following format:
```
source ~/miniconda3/bin/activate
'/app/.snakemake/conda/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx_';
set -euo pipefail;
python /app/.snakemake/scripts/tmpxxxxxxxx.func.py
```
Interrupt the conda activate at the beginning of the process is impossible
because it is only called when each algorithm function executes.
This workflow is cancelled by killing process via PID of algorithm function
saved in `RUN_PROCESS_PID_FILE` file
Raises:
HTTPException: if pid_filepath or last_script_file does not exist
"""

pid_filepath = cls.__get_pid_file_path(workspace_id, unique_id)

if not os.path.exists(pid_filepath):
raise HTTPException(status_code=404)

pid_data = cls.read_pid_file(workspace_id, unique_id)
last_pid = pid_data["last_pid"]
last_script_file = pid_data["last_script_file"]

if not os.path.exists(last_script_file):
logger.warning(
f"The run script has not yet started. script: {last_script_file}"
)
raise HTTPException(status_code=404)

os.remove(last_script_file)
os.kill(last_pid, signal.SIGTERM)

return True
10 changes: 7 additions & 3 deletions studio/app/common/core/snakemake/smk_status_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
create_directory,
join_filepath,
)
from studio.app.common.schemas.workflow import WorkflowErrorInfo
from studio.app.dir_path import DIRPATH


Expand Down Expand Up @@ -67,7 +68,7 @@ def __init_error_log_file(cls, workspace_id: str, unique_id: str) -> str:
return log_file_path

@classmethod
def get_error_content(cls, workspace_id: str, unique_id: str) -> dict:
def get_error_content(cls, workspace_id: str, unique_id: str) -> WorkflowErrorInfo:
log_file_path = cls.__get_error_log_file_path(workspace_id, unique_id)

if os.path.exists(log_file_path):
Expand All @@ -77,7 +78,7 @@ def get_error_content(cls, workspace_id: str, unique_id: str) -> dict:
error_log = None
has_error = False

return {"has_error": has_error, "error_log": error_log}
return WorkflowErrorInfo(has_error=has_error, error_log=error_log)

def __init__(self, workspace_id, unique_id):
self.workspace_id = workspace_id
Expand Down Expand Up @@ -113,7 +114,10 @@ def log_handler(self, msg: Dict[str, str] = None):

# since multiple running workflow share log data,
# check if message really belongs to the current workflow
if pid_data["last_script_file"] in msg["msg"]:
if (
pid_data is not None
and pid_data.last_script_file in msg["msg"]
):
self.logger.error("Workflow cancelled")
else:
self.logger.error(msg)
Expand Down
Loading
Loading