Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

automate dataflow annotation #658

Merged
merged 11 commits into from
Feb 8, 2024
83 changes: 83 additions & 0 deletions examples/misc/darshan_enabled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python3

import os

import radical.entk as re
import radical.pilot as rp

from radical.entk.utils.provenance import enable_darshan, get_provenance_graph

RESOURCE_DESCRIPTION = {
# https://radicalpilot.readthedocs.io/en/stable/supported/polaris.html
'resource': 'anl.polaris',
'project' : 'RECUP',
'queue' : 'debug',
'cpus' : 32,
'walltime': 15
}

os.environ['RADICAL_LOG_LVL'] = 'DEBUG'
os.environ['RADICAL_REPORT'] = 'TRUE'


# pylint: disable=anomalous-backslash-in-string
def get_stages():

# hello-RP task
task_00 = re.Task({
'executable': 'radical-pilot-hello.sh',
'arguments' : [10],
'cpu_reqs' : {'cpu_processes' : 1,
'cpu_threads' : 4,
'cpu_thread_type': rp.OpenMP}
})

# R/W data
output_01 = 'output_01.dat'
task_01 = re.Task({
'executable' : '/bin/sh',
'arguments' : ['-c', f'cat input.dat | wc > {output_01}'],
'upload_input_data': ['/etc/passwd > input.dat'],
'copy_output_data' : [f'{output_01} > $SHARED/{output_01}']
})

stage_0 = re.Stage()
stage_0.add_tasks([task_00, task_01])

# R/W data and task depends on the task from the previous stage
task_10 = re.Task({
'executable' : '/bin/sh',
'arguments' : ['-c',
f"sed -r 's/\s+//g' {output_01} " + # noqa: W605
'| grep -o . | sort | uniq -c > output_10.dat'],
'copy_input_data': [f'$SHARED/{output_01} > {output_01}']
})

stage_1 = re.Stage()
stage_1.add_tasks([task_10])

return [stage_0, stage_1]


def main():
pipeline = re.Pipeline()
pipeline.add_stages(get_stages())
workflow = [pipeline]

enable_darshan(pipelines=workflow,
modules=['e4s/22.08/PrgEnv-gnu',
'darshan-runtime',
'darshan-util'])

amgr = re.AppManager()
amgr.resource_desc = RESOURCE_DESCRIPTION
amgr.workflow = workflow
amgr.run()

print(get_provenance_graph(pipelines=workflow,
output_file='entk_provenance.json'))


if __name__ == '__main__':
main()

109 changes: 96 additions & 13 deletions src/radical/entk/utils/provenance.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,119 @@

__copyright__ = 'Copyright 2023, The RADICAL-Cybertools Team'
__copyright__ = 'Copyright 2024, The RADICAL-Cybertools Team'
__license__ = 'MIT'

from typing import Optional, Dict, List
import glob

from typing import Optional, Dict, List, Union

import radical.utils as ru

from .. import Pipeline
from .. import Pipeline, Task

_darshan_env = None


# ------------------------------------------------------------------------------
#
def enable_darshan(pipelines: List[Pipeline],
darshan_runtime_root: Optional[str] = None,
modules: Optional[List[str]] = None) -> None:

if darshan_runtime_root:
if not darshan_runtime_root.startswith('/'):
raise RuntimeError('Path for the darshan installation '
'should be an absolute path '
f'(provided path: {darshan_runtime_root})')
else:
darshan_runtime_root = '$DARSHAN_RUNTIME_ROOT'

global _darshan_env

darshan_activation_cmds = []
for module in modules or []:
darshan_activation_cmds.append(f'module load {module}')
_darshan_env = ru.env_prep(pre_exec_cached=darshan_activation_cmds)

for pipeline in pipelines:
for stage in pipeline.stages:
for task in stage.tasks:

darshan_log_dir = '${RP_TASK_SANDBOX}/${RP_TASK_ID}_darshan'
darshan_enable = (f'LD_PRELOAD="{darshan_runtime_root}'
'/lib/libdarshan.so" ')

if task.cpu_reqs.cpu_processes == 1:
darshan_enable += 'DARSHAN_ENABLE_NONMPI=1 '

task.executable = darshan_enable + task.executable
task.pre_launch += [f'mkdir -p {darshan_log_dir}']
task.pre_exec.extend(
darshan_activation_cmds +
[f'export DARSHAN_LOG_DIR_PATH={darshan_log_dir}'])


# ------------------------------------------------------------------------------
#
def get_provenance_graph(pipelines: List[Pipeline],
def get_parsed_data(log: str, target_counters: Union[str, List[str]]) -> set:

data = set()

grep_patterns = '-e ' + ' -e '.join(ru.as_list(target_counters))
mtitov marked this conversation as resolved.
Show resolved Hide resolved
parser_cmd = (f'darshan-parser {log} | grep {grep_patterns} | '
"awk '{print $5\":\"$6}'")
out, err, ret = ru.sh_callout(parser_cmd, env=_darshan_env, shell=True)
if ret:
print(f'[ERROR] Darshan not able to parse "{log}": {err}')
else:
for o in out.split('\n'):
if not o:
continue
value, file = o.split(':')
try: value = int(value)
except ValueError: value = 0
if value > 0 and file.startswith('/'):
data.add(file)

return data


# ------------------------------------------------------------------------------
#
def annotate_task_with_darshan(task: Task) -> None:

inputs = set()
outputs = set()

for log in glob.glob(f'{task.path}/{task.uid}_darshan/*'):

inputs.update(get_parsed_data(log, ['POSIX_BYTES_READ', 'STDIO_OPENS']))
outputs.update(get_parsed_data(log, 'POSIX_BYTES_WRITTEN'))

arguments = ' '.join(task.arguments)
if '>' in arguments:
outputs.add(arguments.split('>')[1].split(';')[0].strip())

task.annotate(inputs=sorted(inputs), outputs=sorted(outputs))


# ------------------------------------------------------------------------------
#
def get_provenance_graph(pipelines: Union[Pipeline, List[Pipeline]],
output_file: Optional[str] = None) -> Dict:
"""
Using UIDs of all entities to build a workflow provenance graph.
"""

graph = {}

pipelines = ru.as_list(pipelines)

for pipeline in pipelines:
for pipeline in ru.as_list(pipelines):
graph[pipeline.uid] = {}

for stage in pipelines.stages:
for stage in pipeline.stages:
graph[pipeline.uid][stage.uid] = {}

for task in stage.tasks:
g_task = graph[pipeline.uid][stage.uid].setdefault(task.uid, {})
if task.annotations:
g_task.update(task.annotations.as_dict())
annotate_task_with_darshan(task)
graph[pipeline.uid][stage.uid][task.uid] = \
task.annotations.as_dict()

if output_file:
if not output_file.endswith('.json'):
Expand Down
Loading