Skip to content

Commit

Permalink
Merge pull request #3931 from oliver-sanders/universal-id
Browse files Browse the repository at this point in the history
global universal identifier
  • Loading branch information
hjoliver authored Jan 17, 2022
2 parents 0b5543f + daa8f27 commit 8c81b09
Show file tree
Hide file tree
Showing 749 changed files with 11,114 additions and 7,473 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ First Release Candidate for Cylc 8.

### Enhancements

[#3931](https://github.com/cylc/cylc-flow/pull/3931) -
Convert Cylc to use the new "Universal Identifier".

[#4506](https://github.com/cylc/cylc-flow/pull/4506) -
Cylc no longer creates a `flow.cylc` symlink to a `suite.rc` file.
This only affects you if you have used a prior Cylc 8 pre-release.
Expand Down
4 changes: 0 additions & 4 deletions cylc/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@
"DEBUG": logging.DEBUG,
}

# Used widely with data element ID (internally and externally),
# scope may widen further with internal and CLI adoption.
ID_DELIM = '|'


def environ_init():
"""Initialise cylc environment."""
Expand Down
41 changes: 41 additions & 0 deletions cylc/flow/async_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,3 +417,44 @@ async def asyncqgen(queue):
"""Turn a queue into an async generator."""
while not queue.empty():
yield await queue.get()


async def unordered_map(coroutine, iterator):
"""An asynchronous map function which does not preserve order.
Use in situations where you want results as they are completed rather than
once they are all completed.
Example:
# define your async coroutine
>>> async def square(x): return x**2
# define your iterator (must yield tuples)
>>> iterator = [(num,) for num in range(5)]
# use `async for` to iterate over the results
# (sorted in this case so the test is repeatable)
>>> async def test():
... ret = []
... async for x in unordered_map(square, iterator):
... ret.append(x)
... return sorted(ret)
>>> asyncio.run(test())
[((0,), 0), ((1,), 1), ((2,), 4), ((3,), 9), ((4,), 16)]
"""
# create tasks
pending = []
for args in iterator:
task = asyncio.create_task(coroutine(*args))
task._args = args
pending.append(task)

# run tasks
while pending:
done, pending = await asyncio.wait(
pending,
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
yield task._args, task.result()
14 changes: 9 additions & 5 deletions cylc/flow/broadcast_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

from cylc.flow import LOG
from cylc.flow.broadcast_report import (
CHANGE_FMT, CHANGE_PREFIX_SET,
CHANGE_FMT,
CHANGE_PREFIX_SET,
get_broadcast_change_report,
get_broadcast_bad_options_report)
get_broadcast_bad_options_report,
)
from cylc.flow.id import Tokens
from cylc.flow.cycling.loader import get_point, standardise_point_string
from cylc.flow.exceptions import PointParsingError
from cylc.flow.task_id import TaskID

ALL_CYCLE_POINTS_STRS = ["*", "all-cycle-points", "all-cycles"]

Expand Down Expand Up @@ -154,7 +156,9 @@ def get_broadcast(self, task_id=None):
# all broadcasts requested
return self.broadcasts
try:
name, point_string = TaskID.split(task_id)
tokens = Tokens(task_id, relative=True)
name = tokens['task']
point_string = tokens['cycle']
except ValueError:
raise Exception("Can't split task_id %s" % task_id)

Expand Down Expand Up @@ -206,7 +210,7 @@ def _match_ext_trigger(self, itask):
if trig != qmsg:
continue
# Matched.
point_string = TaskID.split(itask.identity)[1]
point_string = itask.tokens['cycle']
# Set trigger satisfied.
itask.state.external_triggers[trig] = True
# Broadcast the event ID to the cycle point.
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/broadcast_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
BAD_OPTIONS_TITLE = "No broadcast to cancel/clear for these options:"
BAD_OPTIONS_TITLE_SET = ("Rejected broadcast: settings are not"
" compatible with the workflow")
CHANGE_FMT = "\n%(change)s [%(namespace)s.%(point)s] %(key)s=%(value)s"
CHANGE_FMT = "\n%(change)s [%(point)s/%(namespace)s] %(key)s=%(value)s"
CHANGE_PREFIX_CANCEL = "-"
CHANGE_PREFIX_SET = "+"
CHANGE_TITLE_CANCEL = "Broadcast cancelled:"
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/command_polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ def __init__(self, condition, interval, max_polls, args):
self.n_polls = 0
self.args = args # any extra parameters needed by check()

def check(self):
async def check(self):
"""Abstract method. Test polling condition."""
raise NotImplementedError()

def poll(self):
async def poll(self):
"""Poll for the condition embodied by self.check().
Return True if condition met, or False if polling exhausted."""

Expand All @@ -81,7 +81,7 @@ def poll(self):

while self.n_polls < self.max_polls:
self.n_polls += 1
if self.check():
if await self.check():
sys.stdout.write(": satisfied\n")
return True
if self.max_polls > 1:
Expand Down
59 changes: 46 additions & 13 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
get_sequence, get_sequence_cls, init_cyclers, get_dump_format,
INTEGER_CYCLING_TYPE, ISO8601_CYCLING_TYPE
)
from cylc.flow.id import Tokens
from cylc.flow.cycling.integer import IntegerInterval
from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval
from cylc.flow.exceptions import (
Expand Down Expand Up @@ -100,7 +101,11 @@
)
from cylc.flow.wallclock import (
get_current_time_string, set_utc_mode, get_utc_mode)
from cylc.flow.workflow_files import NO_TITLE, WorkflowFiles
from cylc.flow.workflow_files import (
NO_TITLE,
WorkflowFiles,
check_deprecation,
)
from cylc.flow.xtrigger_mgr import XtriggerManager

if TYPE_CHECKING:
Expand Down Expand Up @@ -180,7 +185,7 @@ def __init__(
work_dir: Optional[str] = None,
share_dir: Optional[str] = None
) -> None:

check_deprecation(Path(fpath))
self.mem_log = mem_log_func
if self.mem_log is None:
self.mem_log = lambda x: None
Expand Down Expand Up @@ -689,10 +694,12 @@ def process_start_cycle_point(self) -> None:
# Start from designated task(s).
# Select the earliest start point for use in pre-initial ignore.
self.start_point = min(
get_point(
TaskID.split(taskid)[1]
).standardise()
for taskid in self.options.starttask
get_point(cycle).standardise()
for cycle in [
Tokens(taskid, relative=True)['cycle']
for taskid in self.options.starttask
]
if cycle
)
else:
# Start from the initial point.
Expand Down Expand Up @@ -830,7 +837,15 @@ def _check_circular(self):
for rhs, lhss in sorted(rhs2lhss.items()):
for lhs in sorted(lhss):
err_msg += ' %s => %s' % (
TaskID.get(*lhs), TaskID.get(*rhs))
Tokens(
cycle=str(lhs[1]),
task=lhs[0]
).relative_id,
Tokens(
cycle=str(rhs[1]),
task=rhs[0]
).relative_id,
)
if err_msg:
raise WorkflowConfigError(
'circular edges detected:' + err_msg)
Expand Down Expand Up @@ -1875,25 +1890,43 @@ def _close_families(l_id, r_id, clf_map):
lname, lpoint = None, None
if l_id:
lname, lpoint = l_id
lret = TaskID.get(lname, lpoint)
lret = Tokens(
cycle=str(lpoint),
task=lname,
).relative_id
rret = None
rname, rpoint = None, None
if r_id:
rname, rpoint = r_id
rret = TaskID.get(rname, rpoint)
rret = Tokens(
cycle=str(rpoint),
task=rname,
).relative_id

for fam_name, fam_members in clf_map.items():
if lname in fam_members and rname in fam_members:
# l and r are both members
lret = TaskID.get(fam_name, lpoint)
rret = TaskID.get(fam_name, rpoint)
lret = Tokens(
cycle=str(lpoint),
task=fam_name,
).relative_id
rret = Tokens(
cycle=str(rpoint),
task=fam_name,
).relative_id
break
elif lname in fam_members:
# l is a member
lret = TaskID.get(fam_name, lpoint)
lret = Tokens(
cycle=str(lpoint),
task=fam_name,
).relative_id
elif rname in fam_members:
# r is a member
rret = TaskID.get(fam_name, rpoint)
rret = Tokens(
cycle=str(rpoint),
task=fam_name,
).relative_id

return lret, rret

Expand Down
Loading

0 comments on commit 8c81b09

Please sign in to comment.