Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of store RDF export of the workflow in CWL Prov RO-Bundle #1709

Draft
wants to merge 55 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
d4851f6
Implementation of store RDF export of the workflow in CWL Prov RO-Bun…
jjkoehorst Aug 16, 2022
00ce190
formatting changes according to make format
jjkoehorst Aug 16, 2022
562d13c
formatting corrections
jjkoehorst Aug 16, 2022
e063735
remove need for type ignore
mr-c Aug 16, 2022
e0bb0e0
hard change to checksum_only
jjkoehorst Aug 16, 2022
16335b7
Added sha checksum to file_entity, need to look into what predicate s…
jjkoehorst Aug 16, 2022
59703ac
Merge branch 'cwlprov-cwl-rdf' of github.com:jjkoehorst/cwltool into …
jjkoehorst Aug 16, 2022
40c6705
formatting cleanup
jjkoehorst Aug 17, 2022
87c304b
--no-data argument added
jjkoehorst Aug 17, 2022
049dcd7
added no_data variable to some functions as i was unable to access th…
jjkoehorst Aug 17, 2022
21ecba9
test provenance --no-data added and a TODO check for check_bagit if w…
jjkoehorst Aug 17, 2022
879d5ce
Global no-data option for now to test the same environment with or wi…
jjkoehorst Aug 17, 2022
723c643
NO_DATA global variable added to know if there should be no data for …
jjkoehorst Aug 18, 2022
540a5a8
formatting
jjkoehorst Aug 18, 2022
211348a
cleaning logger and no_data access implementation
jjkoehorst Aug 18, 2022
bd61e43
Merge branch 'cwlprov-cwl-rdf'
jjkoehorst Aug 24, 2022
ad90be6
cleaning up imports
jjkoehorst Aug 25, 2022
76abff0
make remove_unused_imports, cleaning up all kinds of imports
jjkoehorst Sep 5, 2022
33d1551
some empty line formatting
jjkoehorst Sep 5, 2022
81b48de
Merge branch 'main' into cwlprov-cwl-rdf
jjkoehorst Sep 5, 2022
bc56733
if not none instead of !=
jjkoehorst Sep 5, 2022
e5b498d
make cleanup sync
jjkoehorst Sep 6, 2022
3666b65
docstrings added
jjkoehorst Sep 6, 2022
f58e90e
Default NO_DATA set to false
jjkoehorst Sep 6, 2022
4a6906b
move NO_DATA to utils
jjkoehorst Sep 6, 2022
08e18b0
remove global NO_DATA
mr-c Sep 6, 2022
33f706b
missed two NO_DATA's
jjkoehorst Sep 6, 2022
406ae69
Merge branch 'cwlprov-cwl-rdf' of github.com:jjkoehorst/cwltool into …
mr-c Sep 6, 2022
b288fb4
added return type str: to the checksum content processor
jjkoehorst Sep 6, 2022
cb28e1a
Merge branch 'cwlprov-cwl-rdf' of github.com:jjkoehorst/cwltool into …
mr-c Sep 6, 2022
1dbcdad
fix type
mr-c Sep 6, 2022
ab71278
restore regular prov tests
mr-c Sep 6, 2022
112f4f0
Duplicated a test case and the cwltool function to allow for --no-dat…
jjkoehorst Sep 6, 2022
cd0a4af
formatting
jjkoehorst Sep 6, 2022
50dac83
nolisting workflow and test added
jjkoehorst Sep 7, 2022
d3048af
with copy files but excluding a specific folder test
jjkoehorst Sep 7, 2022
ac532d4
working on load listing recognition for files and provenance
jjkoehorst Sep 8, 2022
fb5a65a
expanded the test case, server testing showed a loadListing option no…
jjkoehorst Sep 8, 2022
373b600
issue with load listing field
jjkoehorst Sep 8, 2022
eb93204
unused import removal
jjkoehorst Sep 8, 2022
a4b26af
show file name with debugger
jjkoehorst Sep 9, 2022
95c2c63
from_fp does not always carry name
jjkoehorst Sep 9, 2022
401918e
testing to print stacktrace to identify path to print file
jjkoehorst Sep 20, 2022
6fe74f3
check listing value
jjkoehorst Sep 20, 2022
7f370bb
change default to invalid_listing
jjkoehorst Sep 20, 2022
26fec21
debugging in progress
jjkoehorst Sep 20, 2022
d01a0df
trace in debug
jjkoehorst Oct 12, 2022
c15156b
stack trace only at debug level
jjkoehorst Oct 12, 2022
315e78f
stacktrace disabled
jjkoehorst Oct 12, 2022
8158340
Merge branch 'main' into cwlprov-cwl-rdf
jjkoehorst Aug 16, 2023
cad4896
formatting
jjkoehorst Aug 16, 2023
b930842
sort imports
jjkoehorst Aug 16, 2023
aa0054e
No warnings test
jjkoehorst Aug 17, 2023
87946a3
missed one attribute
jjkoehorst Aug 17, 2023
420dd1c
work in progress to fix the main merge
jjkoehorst Aug 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cwltool/argparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ def arg_parser() -> argparse.ArgumentParser:
help="Record user account info as part of provenance.",
dest="user_provenance",
)
provgroup.add_argument(
"--no-data",
default=False,
action="store_true",
help="Disables the storage of input and output data files of the workflow in the provenance data folder",
dest="no_data",
)
provgroup.add_argument(
"--disable-user-provenance",
default=False,
Expand Down Expand Up @@ -378,6 +385,7 @@ def arg_parser() -> argparse.ArgumentParser:

volumegroup = parser.add_mutually_exclusive_group()
volumegroup.add_argument("--verbose", action="store_true", help="Default logging")
volumegroup.add_argument("--no-warnings", action="store_true", help="Only print errors.")
volumegroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
volumegroup.add_argument("--debug", action="store_true", help="Print even more logging")

Expand Down
159 changes: 129 additions & 30 deletions cwltool/cwlprov/provenance_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
cast,
)

from prov.identifier import Identifier, QualifiedName
from prov.identifier import Identifier, Namespace, QualifiedName
from prov.model import PROV, PROV_LABEL, PROV_TYPE, PROV_VALUE, ProvDocument, ProvEntity
from schema_salad.sourceline import SourceLine

import cwltool.workflow

from ..errors import WorkflowException
from ..job import CommandLineJob, JobBase
from ..loghandler import _logger
Expand Down Expand Up @@ -55,20 +57,49 @@
from .ro import ResearchObject


def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectType) -> CWLObjectType:
def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectType, process) -> CWLObjectType:
"""Create copy of job object for provenance."""
if not isinstance(job, WorkflowJob):
# direct command line tool execution
return job_order_object
customised_job: CWLObjectType = {}
# new job object for RO
debug = _logger.isEnabledFor(logging.DEBUG)
# Process the process object first
load_listing = {}

# Implementation to capture the loadlisting from cwl to skip the inclusion of for example files of big database
# folders
for index, entry in enumerate(process.inputs_record_schema["fields"]):
if (
entry["type"] == "org.w3id.cwl.cwl.Directory"
and "loadListing" in entry
and entry["loadListing"]
):
load_listing[entry["name"]] = entry["loadListing"]

# print("LOAD LISTING: ", load_listing)
# PROCESS:Workflow: file:///Users/jasperk/gitlab/cwltool/tests/wf/directory_no_listing.cwl
# print("PROCESS:" + str(process))

for each, i in enumerate(job.tool["inputs"]):
with SourceLine(job.tool["inputs"], each, WorkflowException, debug):
iid = shortname(i["id"])
# if iid in the load listing object and no_listing then....
if iid in job_order_object:
customised_job[iid] = copy.deepcopy(job_order_object[iid])
# add the input element in dictionary for provenance
if iid in load_listing:
if load_listing[iid] == "no_listing":
_logger.warning("Skip listing of " + iid)
job_order_object[iid]["loadListing"] = "no_listing"
job_order_object[iid]["listing"] = []
customised_job[iid] = job_order_object[iid]
else:
# Normal deep copy
customised_job[iid] = copy.deepcopy(job_order_object[iid])
# TODO Other listing options here?
else:
# add the input element in dictionary for provenance
customised_job[iid] = copy.deepcopy(job_order_object[iid])
elif "default" in i:
customised_job[iid] = copy.deepcopy(i["default"])
# add the default elements in the dictionary for provenance
Expand Down Expand Up @@ -236,31 +267,42 @@ def evaluate(
if not hasattr(process, "steps"):
# record provenance of independent commandline tool executions
self.prospective_prov(job)
customised_job = copy_job_order(job, job_order_object)
customised_job = copy_job_order(job, job_order_object, process)
self.used_artefacts(customised_job, self.workflow_run_uri)
create_job(research_obj, customised_job)
elif hasattr(job, "workflow"):
# record provenance of workflow executions
self.prospective_prov(job)
customised_job = copy_job_order(job, job_order_object)
self.used_artefacts(customised_job, self.workflow_run_uri)
customised_job = copy_job_order(job, job_order_object, process)
self.used_artefacts(
customised_job, self.workflow_run_uri, schema=process.inputs_record_schema
)

def record_process_start(
self, process: Process, job: JobsType, process_run_id: Optional[str] = None
) -> Optional[str]:
if not hasattr(process, "steps"):
process_run_id = self.workflow_run_uri
elif not hasattr(job, "workflow"):
elif not hasattr(job, "workflow") and isinstance(process, cwltool.workflow.Workflow):
# commandline tool execution as part of workflow
name = ""
if isinstance(job, (CommandLineJob, JobBase, WorkflowJob)):
name = job.name
process_name = urllib.parse.quote(name, safe=":/,#")
process_run_id = self.start_process(process_name, datetime.datetime.now())
# Iterator as step is not always 1, check with process_name to find the correct step.id
step = None
for step in process.steps:
if step.id.endswith("#" + process_name):
break
if step is None:
raise Exception("No / wrong step detected...!")

process_run_id = self.start_process(step.id, process_name, datetime.datetime.now())
return process_run_id

def start_process(
self,
step_id: str, # The ID of the step involved
process_name: str,
when: datetime.datetime,
process_run_id: Optional[str] = None,
Expand All @@ -269,12 +311,32 @@ def start_process(
if process_run_id is None:
process_run_id = uuid.uuid4().urn
prov_label = "Run of workflow/packed.cwl#main/" + process_name
self.document.activity(
process_run_id,
None,
None,
{PROV_TYPE: WFPROV["ProcessRun"], PROV_LABEL: prov_label},
)
# TESTING to include the Steps URI so linking to --print-rdf becomes possible
FILE_PATH = None
WORKFLOW_STEP = None
# Not sure if steps is always 1 element so a step name check including the # is performed
if step_id.endswith("#" + process_name):
# Temp import maybe there is another way to create the URI's ?
# Looked at --print-rdf for a possible URI
WORKFLOW = Namespace("Workflow", "https://w3id.org/cwl/cwl#Workflow/")
WORKFLOW_STEP = WORKFLOW["steps"]
# Was not sure how to create a URI without a namespace
FILE = Namespace("", "")
# The entire file://....#step path
FILE_PATH = FILE[step_id]

# Added the WORKFLOW_STEP and FILE_PATH to the object
self.document.activity(
process_run_id,
None,
None,
{
PROV_TYPE: WFPROV["ProcessRun"],
PROV_LABEL: prov_label,
WORKFLOW_STEP: FILE_PATH,
},
)

self.document.wasAssociatedWith(
process_run_id, self.engine_uuid, str("wf:main/" + process_name)
)
Expand All @@ -287,11 +349,15 @@ def record_process_end(
process_run_id: str,
outputs: Union[CWLObjectType, MutableSequence[CWLObjectType], None],
when: datetime.datetime,
load_listing: str = "invalid_listing",
) -> None:
self.generate_output_prov(outputs, process_run_id, process_name)
self.generate_output_prov(outputs, process_run_id, process_name, load_listing)
self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when)

def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, str]:
def declare_file(
self, value: CWLObjectType, load_listing: str = "invalid_listing"
) -> Tuple[ProvEntity, ProvEntity, str]:
_logger.debug("What listing? " + load_listing)
if value["class"] != "File":
raise ValueError("Must have class:File: %s" % value)
# Need to determine file hash aka RO filename
Expand Down Expand Up @@ -345,9 +411,11 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st
for sec in cast(MutableSequence[CWLObjectType], value.get("secondaryFiles", [])):
# TODO: Record these in a specializationOf entity with UUID?
if sec["class"] == "File":
(sec_entity, _, _) = self.declare_file(sec)
_logger.debug("447: " + load_listing)
(sec_entity, _, _) = self.declare_file(sec, load_listing)
elif sec["class"] == "Directory":
sec_entity = self.declare_directory(sec)
_logger.debug("450: " + load_listing)
sec_entity = self.declare_directory(sec, load_listing)
else:
raise ValueError(f"Got unexpected secondaryFiles value: {sec}")
# We don't know how/when/where the secondary file was generated,
Expand All @@ -362,7 +430,9 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st

return file_entity, entity, checksum

def declare_directory(self, value: CWLObjectType) -> ProvEntity:
def declare_directory(
self, value: CWLObjectType, load_listing: str = "invalid_listing"
) -> ProvEntity:
"""Register any nested files/directories."""
# FIXME: Calculate a hash-like identifier for directory
# so we get same value if it's the same filenames/hashes
Expand Down Expand Up @@ -408,12 +478,23 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:
# a later call to this method will sort that
is_empty = True

# if value['basename'] == "dirIgnore":
# pass
if "listing" not in value:
get_listing(self.fsaccess, value)
if load_listing == "no_listing":
pass
elif load_listing == "deep_listing":
get_listing(self.fsaccess, value)
elif load_listing == "shallow_listing":
get_listing(self.fsaccess, value, False)
else:
raise ValueError("Invalid listing value: %s", load_listing)

for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])):
is_empty = False
# Declare child-artifacts
entity = self.declare_artefact(entry)
_logger.debug("523: " + load_listing)
entity = self.declare_artefact(entry, load_listing)
self.document.membership(coll, entity)
# Membership relation aka our ORE Proxy
m_id = uuid.uuid4().urn
Expand Down Expand Up @@ -481,7 +562,7 @@ def declare_string(self, value: str) -> Tuple[ProvEntity, str]:
)
return entity, checksum

def declare_artefact(self, value: Any) -> ProvEntity:
def declare_artefact(self, value: Any, load_listing: str = "invalid_listing") -> ProvEntity:
"""Create data artefact entities for all file objects."""
if value is None:
# FIXME: If this can happen in CWL, we'll
Expand Down Expand Up @@ -523,12 +604,13 @@ def declare_artefact(self, value: Any) -> ProvEntity:

# Base case - we found a File we need to update
if value.get("class") == "File":
(entity, _, _) = self.declare_file(value)
_logger.debug("635: " + load_listing)
(entity, _, _) = self.declare_file(value, load_listing)
value["@id"] = entity.identifier.uri
return entity

if value.get("class") == "Directory":
entity = self.declare_directory(value)
entity = self.declare_directory(value, load_listing)
value["@id"] = entity.identifier.uri
return entity
coll_id = value.setdefault("@id", uuid.uuid4().urn)
Expand Down Expand Up @@ -570,7 +652,7 @@ def declare_artefact(self, value: Any) -> ProvEntity:
members = []
for each_input_obj in iter(value):
# Recurse and register any nested objects
e = self.declare_artefact(each_input_obj)
e = self.declare_artefact(each_input_obj, load_listing)
members.append(e)

# If we reached this, then we were allowed to iterate
Expand Down Expand Up @@ -604,20 +686,36 @@ def used_artefacts(
job_order: Union[CWLObjectType, List[CWLObjectType]],
process_run_id: str,
name: Optional[str] = None,
schema: Any = None,
load_listing: Optional[str] = None,
) -> None:
"""Add used() for each data artefact."""
if isinstance(job_order, list):
for entry in job_order:
self.used_artefacts(entry, process_run_id, name)
# for field in schema.fields:
# if field['name'] == entry.
# load_listing = schema.fields
self.used_artefacts(entry, process_run_id, name, load_listing)
else:
# FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows
base = "main"
if name is not None:
base += "/" + name
for key, value in job_order.items():
prov_role = self.wf_ns[f"{base}/{key}"]
if not load_listing:
load_listing = "deep_listing"
for field in schema["fields"]:
if field["name"] == key:
if "loadListing" in field:
load_listing = field["loadListing"]
break
else:
# Need to find a way to reproduce this
_logger.warning("No loadListing info in object")
load_listing = "no_listing"
try:
entity = self.declare_artefact(value)
entity = self.declare_artefact(value, load_listing)
self.document.used(
process_run_id,
entity,
Expand All @@ -633,11 +731,12 @@ def generate_output_prov(
final_output: Union[CWLObjectType, MutableSequence[CWLObjectType], None],
process_run_id: Optional[str],
name: Optional[str],
load_listing: str = "invalid_listing",
) -> None:
"""Call wasGeneratedBy() for each output,copy the files into the RO."""
if isinstance(final_output, MutableSequence):
for entry in final_output:
self.generate_output_prov(entry, process_run_id, name)
self.generate_output_prov(entry, process_run_id, name, load_listing)
elif final_output is not None:
# Timestamp should be created at the earliest
timestamp = datetime.datetime.now()
Expand All @@ -646,7 +745,7 @@ def generate_output_prov(
# entity (UUID) and document it as generated in
# a role corresponding to the output
for output, value in final_output.items():
entity = self.declare_artefact(value)
entity = self.declare_artefact(value, load_listing)
if name is not None:
name = urllib.parse.quote(str(name), safe=":/,#")
# FIXME: Probably not "main" in nested workflows
Expand Down
Loading