Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More provenance specs + logic improvements #240

Merged
merged 16 commits into from
Jul 2, 2020
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
will not alter the lsf launch.
- 12 hidden test specs and associated cli tests, for cli tests with specs that we
do not want in `merlin examples`.
- Inside `merlin_info`, added provenance specs `orig.yaml`, `expanded.yaml`, and
`partial.yaml`, which is identical to the original spec, but with expanded user variables.

### Fixed
- Updated to new celery (4.4.5) syntax for signature return codes.
Expand Down
4 changes: 2 additions & 2 deletions merlin/examples/workflows/simple_chain/simple_chain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ study:
run:
cmd: |
touch STEP_ONE_RUNNING_$(X2)
sleep 5
sleep 1
rm STEP_ONE_RUNNING_$(X2)

- name: step2
Expand All @@ -24,7 +24,7 @@ study:
run:
cmd: |
ls $(step1.workspace)
sleep 5
sleep 1
depends: [step1]

global.parameters:
Expand Down
14 changes: 3 additions & 11 deletions merlin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,9 @@ def process_restart(args):
"""
print(banner_small)
restart_dir = verify_dirpath(args.restart_dir)
filepath = os.path.join(args.restart_dir, "merlin_info", "*.yaml")
possible_specs = glob.glob(filepath)
if len(possible_specs) == 0:
raise ValueError(
f"'{filepath}' does not match any provenance spec file to restart from."
)
elif len(possible_specs) > 1:
raise ValueError(
f"'{filepath}' matches more than one provenance spec file to restart from."
)
filepath = verify_filepath(possible_specs[0])
filepath = os.path.join(args.restart_dir, "merlin_info", "expanded.yaml")
if not os.path.exists(filepath):
raise ValueError(f"'{filepath}' does not exist, and is necessary for restart.")
LOG.info(f"Restarting workflow at '{restart_dir}'")
study = MerlinStudy(filepath, restart_dir=restart_dir)
router.run_task_server(study, args.run_mode)
Expand Down
2 changes: 1 addition & 1 deletion merlin/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def run_task_server(study, run_mode=None):
:param `study`: The MerlinStudy object
:param `run_mode`: The type of run mode, e.g. local, batch
"""
if study.spec.merlin["resources"]["task_server"] == "celery":
if study.expanded_spec.merlin["resources"]["task_server"] == "celery":
run_celery(study, run_mode)
else:
LOG.error("Celery is not specified as the task server!")
Expand Down
10 changes: 6 additions & 4 deletions merlin/spec/specification.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@
import json
import logging
import os
from io import StringIO
from contextlib import suppress
from io import StringIO

import jsonschema
import yaml
from maestrowf.specification.yamlspecification import YAMLSpecification

from merlin.spec import (
SCHEMA_PATH,
all_keys,
defaults,
SCHEMA_PATH
)


Expand Down Expand Up @@ -222,7 +222,7 @@ def get_worker_names(self):
for worker in self.merlin["resources"]["workers"]:
result.append(worker)
return result

def verify(self):
"""Validate the specification."""

Expand All @@ -237,7 +237,9 @@ def verify(self):
super().validate_schema("env", self.environment, schemas["ENV"])
super().validate_schema("batch", self.batch, schemas["BATCH"])
for step in self.study:
super().validate_schema(f"study step {step['name']}", step, schemas["STUDY_STEP"])
super().validate_schema(
f"study step {step['name']}", step, schemas["STUDY_STEP"]
)
for param, contents in self.globals.items():
super().validate_schema("global.params", contents, schemas["PARAM"])

Expand Down
82 changes: 54 additions & 28 deletions merlin/study/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import subprocess
import time
from contextlib import suppress
from copy import deepcopy
from fileinput import FileInput

from cached_property import cached_property
Expand Down Expand Up @@ -84,9 +85,9 @@ def __init__(
dry_run=False,
no_errors=False,
):
self.spec = MerlinSpec.load_specification(filepath)
self.original_spec = MerlinSpec.load_specification(filepath)
self.override_vars = override_vars
error_override_vars(self.override_vars, self.spec.path)
error_override_vars(self.override_vars, self.original_spec.path)

self.samples_file = samples_file
self.label_clash_error()
Expand All @@ -96,13 +97,13 @@ def __init__(
# If we load from a file, record that in the object for provenance
# downstream
if self.samples_file is not None:
self.spec.merlin["samples"]["file"] = self.samples_file
self.spec.merlin["samples"]["generate"]["cmd"] = ""
self.original_spec.merlin["samples"]["file"] = self.samples_file
self.original_spec.merlin["samples"]["generate"]["cmd"] = ""

self.restart_dir = restart_dir

self.special_vars = {
"SPECROOT": self.spec.specroot,
"SPECROOT": self.original_spec.specroot,
"MERLIN_TIMESTAMP": self.timestamp,
"MERLIN_INFO": self.info,
"MERLIN_WORKSPACE": self.workspace,
Expand All @@ -116,33 +117,44 @@ def __init__(
self.dag = None
self.load_dag()

@cached_property
def provenance_original_spec(self):
return os.path.join(self.info, "orig.yaml")
ben-bay marked this conversation as resolved.
Show resolved Hide resolved

def write_original_spec(self):
shutil.copyfile(self.original_spec.path, self.provenance_original_spec)

def label_clash_error(self):
"""
Detect any illegal clashes between merlin's
merlin -> samples -> column_labels and Maestro's
global.parameters. Raises an error if any such
clash exists.
"""
if self.spec.merlin["samples"]:
for label in self.spec.merlin["samples"]["column_labels"]:
if label in self.spec.globals:
if self.original_spec.merlin["samples"]:
for label in self.original_spec.merlin["samples"]["column_labels"]:
if label in self.original_spec.globals:
raise ValueError(
f"column_label {label} cannot also be " "in global.parameters!"
)

@property
def user_vars(self):
@staticmethod
def get_user_vars(spec):
"""
Using the spec environment, return a dictionary
of expanded user-defined variables.
"""
uvars = []
if "variables" in self.spec.environment:
uvars.append(self.spec.environment["variables"])
if "labels" in self.spec.environment:
uvars.append(self.spec.environment["labels"])
if "variables" in spec.environment:
uvars.append(spec.environment["variables"])
if "labels" in spec.environment:
uvars.append(spec.environment["labels"])
return determine_user_variables(*uvars)

@property
def user_vars(self):
return MerlinStudy.get_user_vars(self.original_spec)

def write_expand_by_line(self, filepath, keywords):
"""
Given a destination and keyword dictionary, expand each
Expand All @@ -161,16 +173,16 @@ def write_expanded_spec(self, dest):
:param `dest`: destination for fully expanded yaml file
"""
# specification text including defaults and overridden user variables
full_spec = dump_with_overrides(self.spec, self.override_vars)
full_spec = dump_with_overrides(self.original_spec, self.override_vars)

with open(dest, "w") as dumped_file:
dumped_file.write(full_spec)

# update spec so that user_vars update will be accurate
self.spec = MerlinSpec.load_specification(dest)
# load spec so that user_vars update will be accurate
new_spec = MerlinSpec.load_specification(dest)
ben-bay marked this conversation as resolved.
Show resolved Hide resolved

# expand user variables
self.write_expand_by_line(dest, self.user_vars)
self.write_expand_by_line(dest, MerlinStudy.get_user_vars(new_spec))
# expand reserved words
self.write_expand_by_line(dest, self.special_vars)

Expand Down Expand Up @@ -274,7 +286,7 @@ def output_path(self):
return os.path.abspath(output_path)

else:
output_path = str(self.spec.output_path)
output_path = str(self.original_spec.output_path)

if (self.override_vars is not None) and (
"OUTPUT_PATH" in self.override_vars
Expand Down Expand Up @@ -314,7 +326,7 @@ def workspace(self):
)
return os.path.abspath(self.restart_dir)

workspace_name = f'{self.spec.name.replace(" ", "_")}_{self.timestamp}'
workspace_name = f'{self.original_spec.name.replace(" ", "_")}_{self.timestamp}'
workspace = os.path.join(self.output_path, workspace_name)
with suppress(FileNotFoundError):
shutil.rmtree(workspace)
Expand All @@ -339,9 +351,7 @@ def expanded_spec(self):
specification.
"""
# Write expanded yaml spec
self.expanded_filepath = os.path.join(
self.info, self.spec.name.replace(" ", "_") + ".yaml"
)
self.expanded_filepath = os.path.join(self.info, "expanded.yaml")

# If we are restarting, we don't need to re-expand, just need to read
# in the previously expanded spec
Expand All @@ -353,8 +363,9 @@ def expanded_spec(self):
)

# expand provenance spec filename
if "$(" in self.spec.name:
expanded_name = result.description["name"].replace(" ", "_") + ".yaml"
if "$(" in self.original_spec.name:
# expanded_name = result.description["name"].replace(" ", "_") + ".yaml"
expanded_name = "expanded.yaml"
ben-bay marked this conversation as resolved.
Show resolved Hide resolved
expanded_workspace = os.path.join(
self.output_path,
f"{result.description['name'].replace(' ', '_')}_{self.timestamp}",
Expand All @@ -379,11 +390,27 @@ def expanded_spec(self):
self.special_vars["MERLIN_INFO"] = self.info
self.expanded_filepath = os.path.join(self.info, expanded_name)
result.path = self.expanded_filepath
self.spec.path = self.expanded_filepath
# rewrite provenance spec to correct samples.generate.cmd and samples.file
if self.restart_dir is None:
self.write_expanded_spec(self.expanded_filepath)

complete_spec = MerlinSpec.load_specification(self.expanded_filepath)

# write original spec
self.write_original_spec()

# write partially-expanded spec
partial_spec = deepcopy(self.original_spec)
if "variables" in complete_spec.environment:
partial_spec.environment["variables"] = complete_spec.environment[
"variables"
]
if "labels" in complete_spec.environment:
partial_spec.environment["labels"] = complete_spec.environment["labels"]
partial_spec_path = os.path.join(self.info, "partial.yaml")
with open(partial_spec_path, "w") as f:
f.write(partial_spec.dump())

LOG.info(f"Study workspace is '{self.workspace}'.")
return result

Expand Down Expand Up @@ -478,8 +505,7 @@ def load_dag(self):
self.dag = DAG(maestro_dag, labels)

def get_adapter_config(self, override_type=None):
spec = MerlinSpec.load_specification(self.spec.path)
adapter_config = dict(spec.batch)
adapter_config = dict(self.expanded_spec.batch)

if "type" not in adapter_config.keys():
adapter_config["type"] = "local"
Expand Down
5 changes: 4 additions & 1 deletion tests/integration/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,10 @@ def define_tests():
),
"local format 0": (
f"merlin -lvl debug run {dev_examples}/full_format.yaml --local",
[RegexCond("Spec verified. No errors found."), RegexCond("Merlin block verified. No errors found.")],
[
RegexCond("Spec verified. No errors found."),
RegexCond("Merlin block verified. No errors found."),
],
"local",
),
"local format 1": (
Expand Down