Skip to content

Commit

Permalink
Issue735/clear workflow (#872)
Browse files Browse the repository at this point in the history
Workflow directory now auto deletes after runs. Adds a config option to choose whether to delete or keep workflow directories upon completion.
  • Loading branch information
kabir-vats authored Jul 11, 2024
1 parent 7fd785e commit 9c8bdee
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 6 deletions.
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Contributors:
* Quincy Wofford - `qwofford <https://github.com/qwofford>`_
* Tim Randles - `trandles-lanl <https://github.com/trandles-lanl>`_
* Jacob Tronge - `jtronge <https://github.com/jtronge>`_
* Kabir Vats - `kabir-vats <https://github.com/kabir-vats>`_

Concept and Design Contributors

Expand Down
2 changes: 2 additions & 0 deletions beeflow/common/config_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ def filepath_completion_input(*pargs, **kwargs):
info='backend workload scheduler to interact with ')
VALIDATOR.option('DEFAULT', 'use_archive', validator=validation.bool_, default=True,
info='use the BEE archiving functinality')
VALIDATOR.option('DEFAULT', 'delete_completed_workflow_dirs', validator=validation.bool_,
default=True, info='delete workflow directory for completed jobs')
VALIDATOR.option('DEFAULT', 'neo4j_image', validator=validation.file_,
info='neo4j container image',
input_fn=filepath_completion_input)
Expand Down
2 changes: 2 additions & 0 deletions beeflow/wf_manager/common/dep_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,5 @@ def kill_gdb(pid):
os.kill(pid, signal.SIGTERM)
except OSError:
dep_log.info('Process already killed')
return False
return True
18 changes: 12 additions & 6 deletions beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from beeflow.common.db import wfm_db
from beeflow.common.db.bdb import connect_db

from beeflow.common.config_driver import BeeConfig as bc

log = bee_logging.setup(__name__)
db_path = wf_utils.get_db_path()
Expand All @@ -39,13 +39,21 @@ def archive_workflow(db, wf_id, final_state=None):
# We use tar directly since tarfile is apparently very slow
workflows_dir = wf_utils.get_workflows_dir()
subprocess.call(['tar', '-czf', archive_path, wf_id], cwd=workflows_dir)
pid = db.workflows.get_gdb_pid(wf_id)
# Wait for Graph database to be down (max 10 seconds)
for _ in range(10):
if not dep_manager.kill_gdb(pid):
break
time.sleep(1)
remove_wf_dir = bc.get('DEFAULT', 'delete_completed_workflow_dirs')
if remove_wf_dir:
log.info('Removing Workflow Directory')
wf_utils.remove_wf_dir(wf_id)


def archive_fail_workflow(db, wf_id):
"""Archive and fail a workflow."""
archive_workflow(db, wf_id, final_state='Failed')
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)


def set_dependent_tasks_dep_fail(db, wfi, wf_id, task):
Expand Down Expand Up @@ -132,11 +140,9 @@ def handle_state_change(self, state_update, task, wfi, db):
wf_utils.schedule_submit_tasks(state_update.wf_id, tasks)

if wfi.workflow_completed():
log.info("Workflow Completed")
wf_id = wfi.workflow_id
archive_workflow(db, state_update.wf_id)
pid = db.workflows.get_gdb_pid(state_update.wf_id)
dep_manager.kill_gdb(pid)
log.info('Workflow Completed')

# If the job failed and it doesn't include a checkpoint-restart hint,
# then fail the entire workflow
Expand Down

0 comments on commit 9c8bdee

Please sign in to comment.