Skip to content

Option to remap data entity names #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
9 changes: 7 additions & 2 deletions src/runcrate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,20 @@ def cli():
type=click.Path(exists=True, dir_okay=False, readable=True, path_type=Path),
help="path to a README file (should be README.md in Markdown format)",
)
def convert(root, output, license, workflow_name, readme):
@click.option(
"--remap-names",
help="remap file/dir names to the original ones",
is_flag=True
)
def convert(root, output, license, workflow_name, readme, remap_names):
"""\
Convert a CWLProv RO bundle into a Workflow Run RO-Crate.

RO_DIR: top-level directory of the CWLProv RO
"""
if not output:
output = Path(f"{root.name}.crate.zip")
builder = ProvCrateBuilder(root, workflow_name, license, readme)
builder = ProvCrateBuilder(root, workflow_name, license, readme, remap_names=remap_names)
crate = builder.build()
if output.suffix == ".zip":
crate.write_zip(output)
Expand Down
60 changes: 43 additions & 17 deletions src/runcrate/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
"null": None,
}

SCATTER_JOB_PATTERN = re.compile(r"^(.+)_\d+$")
SCATTER_JOB_PATTERN = re.compile(r"^(.+)_(\d+)$")

CWLPROV_NONE = "https://w3id.org/cwl/prov#None"

Expand Down Expand Up @@ -192,7 +192,8 @@ def get_workflow(wf_path):

class ProvCrateBuilder:

def __init__(self, root, workflow_name=None, license=None, readme=None):
def __init__(self, root, workflow_name=None, license=None, readme=None,
remap_names=False):
self.root = Path(root)
self.workflow_name = workflow_name
self.license = license
Expand All @@ -213,6 +214,8 @@ def __init__(self, root, workflow_name=None, license=None, readme=None):
# map source files to destination files
self.file_map = {}
self.manifest = self._get_manifest()
self.remap_names = remap_names
self.data_root = "data"

@staticmethod
def _get_step_maps(cwl_defs):
Expand All @@ -238,11 +241,13 @@ def _get_manifest(self):
def _resolve_plan(self, activity):
job_qname = activity.plan()
plan = activity.provenance.entity(job_qname)
scatter_id = None
if not plan:
m = SCATTER_JOB_PATTERN.match(str(job_qname))
if m:
plan = activity.provenance.entity(m.groups()[0])
return plan
scatter_id = m.groups()[1]
return plan, scatter_id

def _get_hash(self, prov_param):
k = prov_param.id.localpart
Expand Down Expand Up @@ -461,9 +466,11 @@ def add_action(self, crate, activity, parent_instrument=None):
"@type": "CreateAction",
"name": activity.label,
}))
plan = self._resolve_plan(activity)
plan, scatter_id = self._resolve_plan(activity)
plan_tag = plan.id.localpart
dest_base = Path(self.data_root)
if plan_tag == "main":
dest_base = dest_base / "main"
assert str(activity.type) == "wfprov:WorkflowRun"
instrument = workflow
self.roc_engine_run["result"] = action
Expand All @@ -478,6 +485,7 @@ def to_wf_p(k):
if parts[0] == "main":
parts[0] = parent_instrument_fragment
plan_tag = "/".join(parts)
dest_base = dest_base / (f"{plan_tag}_{scatter_id}" if scatter_id else f"{plan_tag}")
tool_name = self.step_maps[parent_instrument_fragment][plan_tag]["tool"]
instrument = crate.dereference(f"{workflow.id}#{tool_name}")
control_action = self.control_actions.get(plan_tag)
Expand All @@ -501,12 +509,14 @@ def to_wf_p(k):
action["instrument"] = instrument
action["startTime"] = activity.start().time.isoformat()
action["endTime"] = activity.end().time.isoformat()
action["object"] = self.add_action_params(crate, activity, to_wf_p, "usage")
action["result"] = self.add_action_params(crate, activity, to_wf_p, "generation")
action["object"] = self.add_action_params(crate, activity, to_wf_p, "usage",
dest_base / "in" if self.remap_names else "")
action["result"] = self.add_action_params(crate, activity, to_wf_p, "generation",
dest_base / "out" if self.remap_names else "")
for job in activity.steps():
self.add_action(crate, job, parent_instrument=instrument)

def add_action_params(self, crate, activity, to_wf_p, ptype="usage"):
def add_action_params(self, crate, activity, to_wf_p, ptype="usage", dest_base=""):
action_params = []
all_roles = set()
for rel in getattr(activity, ptype)():
Expand All @@ -526,7 +536,7 @@ def add_action_params(self, crate, activity, to_wf_p, ptype="usage"):
wf_p = crate.dereference(to_wf_p(k))
k = get_fragment(k)
v = rel.entity()
value = self.convert_param(v, crate)
value = self.convert_param(v, crate, dest_base=dest_base)
if value is None:
continue # param is optional with no default and was not set
if {"ro:Folder", "wf4ever:File"} & set(str(_) for _ in v.types()):
Expand Down Expand Up @@ -563,34 +573,40 @@ def _set_alternate_name(prov_param, action_p, parent=None):
if "alternateName" in parent:
action_p["alternateName"] = (Path(parent["alternateName"]) / basename).as_posix()

def convert_param(self, prov_param, crate, convert_secondary=True, parent=None):
def convert_param(self, prov_param, crate, convert_secondary=True, parent=None, dest_base=""):
type_names = frozenset(str(_) for _ in prov_param.types())
secondary_files = [_.generated_entity() for _ in prov_param.derivations()
if str(_.type) == "cwlprov:SecondaryFile"]
if convert_secondary and secondary_files:
main_entity = self.convert_param(prov_param, crate, convert_secondary=False)
main_entity = self.convert_param(prov_param, crate, convert_secondary=False,
dest_base=dest_base)
action_p = self.collections.get(main_entity.id)
if not action_p:
action_p = crate.add(ContextEntity(crate, properties={
"@type": "Collection"
}))
action_p["mainEntity"] = main_entity
action_p["hasPart"] = [main_entity] + [
self.convert_param(_, crate) for _ in secondary_files
self.convert_param(_, crate, dest_base=dest_base) for _ in secondary_files
]
crate.root_dataset.append_to("mentions", action_p)
self.collections[main_entity.id] = action_p
return action_p
if "wf4ever:File" in type_names:
hash_ = self.hashes[prov_param.id.localpart]
dest = Path(parent.id if parent else "") / hash_
if self.remap_names:
basename = getattr(prov_param, "basename", hash_) or hash_
else:
basename = hash_
dest = Path(parent.id if parent else dest_base) / basename
action_p = crate.dereference(dest.as_posix())
if not action_p:
source = self.manifest[hash_]
action_p = crate.add_file(source, dest, properties={
"sha1": hash_,
})
self._set_alternate_name(prov_param, action_p, parent=parent)
if not self.remap_names:
self._set_alternate_name(prov_param, action_p, parent=parent)
try:
source_k = str(source.resolve(strict=False))
except RuntimeError:
Expand All @@ -599,11 +615,16 @@ def convert_param(self, prov_param, crate, convert_secondary=True, parent=None):
return action_p
if "ro:Folder" in type_names:
hash_ = self.hashes[prov_param.id.localpart]
dest = Path(parent.id if parent else "") / hash_
if self.remap_names:
basename = getattr(prov_param, "basename", hash_) or hash_
else:
basename = hash_
dest = Path(parent.id if parent else dest_base) / basename
action_p = crate.dereference(dest.as_posix())
if not action_p:
action_p = crate.add_directory(dest_path=dest)
self._set_alternate_name(prov_param, action_p, parent=parent)
if not self.remap_names:
self._set_alternate_name(prov_param, action_p, parent=parent)
for child in self.get_dict(prov_param).values():
part = self.convert_param(child, crate, parent=action_p)
action_p.append_to("hasPart", part)
Expand All @@ -612,12 +633,13 @@ def convert_param(self, prov_param, crate, convert_secondary=True, parent=None):
return str(prov_param.value)
if "prov:Dictionary" in type_names:
return dict(
(k, self.convert_param(v, crate))
(k, self.convert_param(v, crate, dest_base=dest_base))
for k, v in self.get_dict(prov_param).items()
if k != "@id"
)
if "prov:Collection" in type_names:
return [self.convert_param(_, crate) for _ in self.get_members(prov_param)]
return [self.convert_param(_, crate, dest_base=dest_base)
for _ in self.get_members(prov_param)]
if prov_param.id.uri == CWLPROV_NONE:
return None
raise RuntimeError(f"No value to convert for {prov_param}")
Expand Down Expand Up @@ -680,6 +702,10 @@ def patch_workflow_input_collection(self, crate, wf=None):
entity of the collection alone (a File). This method fixes the mapping
by retrieving the correct Collection entity from the relevant tool
execution.

Note that this trick does not lead to a correct result with
remap_names on: the workflow-level parameter should be mapped to a
separate collection where files have different paths.
"""
if wf is None:
wf = crate.mainEntity
Expand Down
11 changes: 11 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ def test_cli_convert_readme(data_dir, tmpdir):
assert crate.get(readme.name)


def test_cli_convert_remap_names(data_dir, tmpdir):
root = data_dir / "grepucase-run-1"
crate_dir = tmpdir / "grepucase-run-1-crate"
runner = CliRunner()
args = ["convert", str(root), "-o", str(crate_dir), "--remap-names"]
assert runner.invoke(cli, args).exit_code == 0
crate = ROCrate(crate_dir)
assert crate.get("data/main/in/grepucase_in/")
assert (crate_dir / "data" / "main" / "in" / "grepucase_in").is_dir()


def test_cli_report_provenance_minimal(data_dir, caplog):
crate_dir = data_dir / "revsort-provenance-crate-minimal"
runner = CliRunner()
Expand Down
Loading