Skip to content

Commit

Permalink
Release 0.1.7
Browse files Browse the repository at this point in the history
  • Loading branch information
pagrubel committed Mar 29, 2024
2 parents e3be834 + 88fe045 commit 8c435ef
Show file tree
Hide file tree
Showing 74 changed files with 2,058 additions and 1,208 deletions.
19 changes: 10 additions & 9 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ jobs:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- name: Dependency Install
run: ./ci/deps_install.sh
- name: Batch Scheduler Install and Start
run: ./ci/batch_scheduler.sh
- name: BEE Install
run: ./ci/bee_install.sh
- name: BEE Config
run: ./ci/bee_config.sh
- name: Install and Configure
run: |
. ./ci/env.sh
./ci/deps_install.sh
./ci/batch_scheduler.sh
./ci/bee_install.sh
./ci/bee_config.sh
- name: Integration Test
run: ./ci/integration_test.sh
run: |
. ./ci/env.sh
./ci/integration_test.sh
19 changes: 10 additions & 9 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ jobs:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- name: Dependency Install
run: ./ci/deps_install.sh
- name: Slurm Setup and Install
run: ./ci/slurm_start.sh
- name: BEE Install
run: ./ci/bee_install.sh
- name: BEE Config
run: ./ci/bee_config.sh
- name: Install and Configure
run: |
. ./ci/env.sh
./ci/deps_install.sh
./ci/batch_scheduler.sh
./ci/bee_install.sh
./ci/bee_config.sh
- name: Unit tests
run: ./ci/unit_tests.sh
run: |
. ./ci/env.sh
./ci/unit_tests.sh
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ poetry.lock
*.pyc
*.egg-info
*.out
*.tgz
*.tar.gz
*.log
.python-version
.DS_Store
.idea
Expand Down
57 changes: 57 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
0.1.0
Initial Release of hpc-beeflow published on PYPI

0.1.3
BEE now accepts stdout and stderr CWL specifications to direct those outputs for each task.

0.1.4
What's Changed
- Scheduler options added for time-limit, account and partitions as CWL extensions
- Fixes for MPI
- Jinja file no longer required
- Merge submit and start commands
- Improved usability of 'beecfg new'
- Combined gdbs
- Add restart code to beeflow
- Checkpoint restart fix
- Allow Absolute/Relative Paths for Main CWL and YAML Files
- Minimum version of Charliecloud required is now 0.32

0.1.5
- Combined beeflow, beeclient and beecfg commands. All commands now are invoked via beeflow.
- Fixed an obscure dependency issue between tasks
- Simplified config file, deleted duplications of bee_workdir
- CWL Parser was moved to the client
- CwlParser is now instantiated in bee_client.py
- CwlParser no longer invokes Workflow Interface, now returns Workflow and Task objects
- Allows verification of CWL specification without running the workflow
- Added support for Flux scheduler

0.1.6
Clean up of processes, logs, and directory space
- Eliminates extraneous Neo4j instances from cancelled/failed tasks
- Cleans up log entries for query
- Improves start time for celery
- Makes start time configurable
- Decreases the number of celery processes
- Fixes capability to specify a main cwl file and/or yaml file not in the CWL directory
- Parses CWL after packaging the directory
- Moves temporary files for unit tests out of $HOME

0.1.7

Major features: adds the capability to include post- and pre-processing scripts to tasks, fixes the Checkpoint/Restart capability, increases logging, and adds some features to the client.
- Initial task manager resiliency and error handling (#789)
- Add pre/post script support (#788)
- Fix LOCALE error for systems where redis container failed to start
- Add logging to workflow interface (#764)
- Enable logging in neo4j_cypher.py, neo4j_driver.py, and gdb_driver.py
- Add ``beeflow remove`` command to client
- Enables removal of archived or cancelled workflows and associated artifacts
- Update minimum Charliecloud version to 0.36
- CI refactor to allow running jobs on runners other than github
- Add sumbit command options to workflow artifact for archive purposes
- Increase maximum version of python to 3.12
- Fix Checkpoint/Restart capability
- Add testing for Checkpoint/Restart
- Adds capability to reset the beeflow files (deletes all artifacts) especially useful for developers.
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Contributors:
* Paul Bryant - `paulbry <https://github.com/paulbry>`_
* Rusty Davis - `rstyd <https://github.com/rstyd>`_
* Jieyang Chen - `JieyangChen7 <https://github.com/JieyangChen7>`_
* Krishna Chilleri - `Krishna Chilleri <https://github.com/kchilleri>`_
* Patricia Grubel - `pagrubel <https://github.com/pagrubel>`_
* Qiang Guan - `guanxyz <https://github.com/guanxyz>`_
* Ragini Gupta - `raginigupta6 <https://github.com/raginigupta6>`_
Expand Down Expand Up @@ -85,9 +86,8 @@ License can be found `here <https://github.com/lanl/BEE/blob/master/LICENSE>`_
Publications
==========================

- An HPC-Container Based Continuous Integration Tool for Detecting Scaling and Performance Issues in HPC Applications, IEEE Transactions on Services Computing, 2024, `DOI: 10.1109/TSC.2023.3337662 <https://doi.ieeecomputersociety.org/10.1109/TSC.2023.3337662>`_
- BEE Orchestrator: Running Complex Scientific Workflows on Multiple Systems, HiPC, 2021, `DOI: 10.1109/HiPC53243.2021.00052 <https://doi.org/10.1109/HiPC53243.2021.00052>`_
- "BeeSwarm: Enabling Parallel Scaling Performance Measurement in Continuous Integration for HPC Applications", ASE, 2021, `DOI: 10.1109/ASE51524.2021.9678805 <https://www.computer.org/csdl/proceedings-article/ase/2021/033700b136/1AjTjgnW2pa#:~:text=10.1109/ASE51524.2021.9678805>`_
- "BeeFlow: A Workflow Management System for In Situ Processing across HPC and Cloud Systems", ICDCS, 2018, `DOI: 10.1109/ICDCS.2018.00103 <https://ieeexplore.ieee.org/abstract/document/8416366>`_
- "Build and execution environment (BEE): an encapsulated environment enabling HPC applications running everywhere", IEEE BigData, 2018, `DOI: 10.1109/BigData.2018.8622572 <https://ieeexplore.ieee.org/document/8622572>`_


9 changes: 6 additions & 3 deletions RELEASE.rst
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
Publishing a new release
************************

1. Change the version in pyproject.toml and verify docs build;
and get this change merged into develop. (You may want to set the bypass as in step 2)
Verify all current changes in develop run correctly on nightly tests.

1. Start a branch named Release-0.x.x Change the version in pyproject.toml and verify docs build, add to HISTORY.md for this release,
and get this change merged into develop. (You may want to set the bypass as in step 2 on develop).

2. On github site go to Settings; on the left under Code and Automation
click on Branches; under Branch protection rules edit main;
check Allow specified actors to bypass required pull requests; add yourself
and don't forget to save the setting
3 Make sure documentation will be published upon push to main.
3. Make sure documentation will be published upon push to main.
See: .github/workflows/docs.yml
4. Checkout develop and pull for latest version then
checkout main and merge develop into main. Verify documentation was published.
Expand Down
142 changes: 97 additions & 45 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from beeflow.common.parser import CwlParser
from beeflow.common.wf_data import generate_workflow_id
from beeflow.client import core
from beeflow.wf_manager.resources import wf_utils

# Length of a shortened workflow ID
short_id_len = 6 #noqa: Not a constant
Expand Down Expand Up @@ -105,15 +106,25 @@ def _resource(tag=""):
return _url() + str(tag)


def get_wf_list():
"""Get the list of all workflows."""
try:
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')

if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit('WF Manager did not return workflow list')

logging.info('List Jobs: {resp.text}')
return jsonpickle.decode(resp.json()['workflow_list'])


def check_short_id_collision():
"""Check short workflow IDs for colliions; increase short ID length if detected."""
global short_id_len #noqa: Not a constant
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit(f"Checking for ID collision failed: {resp.status_code}")

workflow_list = jsonpickle.decode(resp.json()['workflow_list'])
workflow_list = get_wf_list()
if workflow_list:
while short_id_len < MAX_ID_LEN:
id_list = [_short_id(job[1]) for job in workflow_list]
Expand All @@ -132,18 +143,7 @@ def check_short_id_collision():
def match_short_id(wf_id):
"""Match user-provided short workflow ID to full workflow IDs."""
matched_ids = []

try:
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')

if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit(f'Could not match ID: {wf_id}. Code {resp.status_code}')
# raise ApiError("GET /jobs".format(resp.status_code))

workflow_list = jsonpickle.decode(resp.json()['workflow_list'])
workflow_list = get_wf_list()
if workflow_list:
for job in workflow_list:
if job[1].startswith(wf_id):
Expand All @@ -162,11 +162,25 @@ def match_short_id(wf_id):
long_wf_id = matched_ids[0]
return long_wf_id
else:
print("There are currently no workflows.")
sys.exit("There are currently no workflows.")

return None


def get_wf_status(wf_id):
"""Get workflow status."""
try:
conn = _wfm_conn()
resp = conn.get(_resource(wf_id), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')

if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit('Could not successfully query workflow manager')

return resp.json()['wf_status']


app = typer.Typer(no_args_is_help=True, add_completion=False, cls=NaturalOrderGroup)
app.add_typer(core.app, name='core')
app.add_typer(config_driver.app, name='config')
Expand Down Expand Up @@ -281,6 +295,19 @@ def is_parent(parent, path):
if tarball_path:
os.remove(tarball_path)

# Store provided arguments in text file for future reference
wf_dir = wf_utils.get_workflow_dir(wf_id)
sub_wf_dir = wf_dir + "/submit_command_args.txt"

f_name = open(sub_wf_dir, "w", encoding="utf-8")
f_name.write(f"wf_name: {wf_name}\n")
f_name.write(f"wf_path: {wf_path}\n")
f_name.write(f"main_cwl: {main_cwl}\n")
f_name.write(f"yaml: {yaml}\n")
f_name.write(f"workdir: {workdir}\n")
f_name.write(f"wf_id: {wf_id}")
f_name.close()

return wf_id


Expand Down Expand Up @@ -340,6 +367,36 @@ def package(wf_path: pathlib.Path = typer.Argument(...,
return package_path


@app.command()
def remove(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Remove cancelled, paused, or archived workflow with a workflow ID."""
long_wf_id = wf_id

wf_status = get_wf_status(wf_id)
print(f"Workflow Status is {wf_status}")
if wf_status in ('Cancelled', 'Archived', 'Paused'):
verify = f"All stored information for workflow {_short_id(wf_id)} will be removed."
verify += "\nContinue to remove? yes(y)/no(n): """
response = input(verify)
if response in ("n", "no"):
sys.exit("Workflow not removed.")
elif response in ("y", "yes"):
try:
conn = _wfm_conn()
resp = conn.delete(_resource(long_wf_id), json={'option': 'remove'}, timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')
if resp.status_code != requests.codes.accepted: # pylint: disable=no-member
error_exit('WF Manager could not remove workflow.')
typer.secho("Workflow removed!", fg=typer.colors.GREEN)
logging.info(f'Remove workflow: {resp.text}')
else:
print(f"{_short_id(wf_id)} may still be running.")
print("The workflow must be cancelled before attempting removal.")

sys.exit()


def unpackage(package_path, dest_path):
"""Unpackage a workflow tarball for parsing."""
package_str = str(package_path)
Expand All @@ -363,17 +420,7 @@ def unpackage(package_path, dest_path):
@app.command('list')
def list_workflows():
"""List all workflows."""
try:
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')

if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit('WF Manager did not return workflow list')

logging.info('List Jobs: {resp.text}')
workflow_list = jsonpickle.decode(resp.json()['workflow_list'])
workflow_list = get_wf_list()
if workflow_list:
typer.secho("Name\tID\tStatus", fg=typer.colors.GREEN)

Expand Down Expand Up @@ -401,11 +448,9 @@ def query(wf_id: str = typer.Argument(..., callback=match_short_id)):

tasks_status = resp.json()['tasks_status']
wf_status = resp.json()['wf_status']
if tasks_status == 'Unavailable':
typer.echo(wf_status)
else:
typer.echo(wf_status)
typer.echo(tasks_status)
typer.echo(wf_status)
for _task_id, task_name, task_state in tasks_status:
typer.echo(f'{task_name}--{task_state}')

logging.info('Query workflow: {resp.text}')
return wf_status, tasks_status
Expand Down Expand Up @@ -463,17 +508,24 @@ def resume(wf_id: str = typer.Argument(..., callback=match_short_id)):

@app.command()
def cancel(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Cancel a workflow."""
"""Cancel a paused or running workflow."""
long_wf_id = wf_id
try:
conn = _wfm_conn()
resp = conn.delete(_resource(long_wf_id), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')
if resp.status_code != requests.codes.accepted: # pylint: disable=no-member
error_exit('WF Manager could not cancel workflow.')
typer.secho("Workflow cancelled!", fg=typer.colors.GREEN)
logging.info(f'Cancel workflow: {resp.text}')
wf_status = get_wf_status(wf_id)
if wf_status in ('Running', 'Paused'):
try:
conn = _wfm_conn()
resp = conn.delete(_resource(long_wf_id), json={'option': 'cancel'}, timeout=60)

except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')
if resp.status_code != requests.codes.accepted: # pylint: disable=no-member
error_exit('WF Manager could not cancel workflow.')
typer.secho("Workflow cancelled!", fg=typer.colors.GREEN)
logging.info(f'Cancel workflow: {resp.text}')
elif wf_status == "Intializing":
print(f"Workflow is {wf_status}, try cancel later.")
else:
print(f"Workflow is {wf_status} cannot cancel.")


@app.command()
Expand Down
Loading

0 comments on commit 8c435ef

Please sign in to comment.