Skip to content

Commit

Permalink
test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Oct 16, 2020
1 parent 73768a0 commit 161cd41
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 28 deletions.
28 changes: 24 additions & 4 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,10 @@ def increment_graph_window(
graph_children = generate_graph_children(
self.schd.config.get_taskdef(name), point)

if edge_distance > self.n_edge_distance or not graph_children:
if (
edge_distance > self.n_edge_distance or
not any(graph_children.values())
):
if descendant or edge_distance == 0:
self.n_window_boundary_nodes[
active_id].setdefault(edge_distance, set()).add(s_id)
Expand Down Expand Up @@ -607,8 +610,12 @@ def increment_graph_window(
False, True)

if edge_distance == 1:
for tp_id in self.n_window_boundary_nodes[active_id][
max(self.n_window_boundary_nodes[active_id].keys())]:
levels = self.n_window_boundary_nodes[active_id].keys()
# Could be self-reference node foo:failed => foo
if not levels:
self.n_window_boundary_nodes[active_id][0] = {active_id}
levels = (0,)
for tp_id in self.n_window_boundary_nodes[active_id][max(levels)]:
self.prune_trigger_nodes.setdefault(
tp_id, set()).add(active_id)
del self.n_window_boundary_nodes[active_id]
Expand Down Expand Up @@ -660,7 +667,8 @@ def _expand_graph_window(
def remove_active_node(self, name, point):
"""Flag removal of this active graph window."""
tp_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}'
self.all_task_pool.remove(tp_id)
if tp_id in self.all_task_pool:
self.all_task_pool.remove(tp_id)
# flagged isolates/end-of-branch nodes for pruning on removal
if (
tp_id in self.prune_trigger_nodes and
Expand Down Expand Up @@ -1147,6 +1155,18 @@ def set_graph_window_extent(self, n_edge_distance):
self.next_n_edge_distance = n_edge_distance
self.updates_pending = True

def set_workflow_ports(self):
# Create new message and copy existing message content
workflow = self.updated[WORKFLOW]
workflow.id = self.workflow_id
workflow.last_updated = time()
workflow.stamp = f'{workflow.id}@{workflow.last_updated}'

workflow.port = self.schd.port
workflow.pub_port = self.schd.pub_port

self.updates_pending = True

def update_workflow(self):
"""Update workflow element status and state totals."""
# Create new message and copy existing message content
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,8 @@ async def resolve_broadcasts(root, info, **args):


def resolve_json_dump(root, info, **args):
return json.loads(getattr(root, to_snake_case(info.field_name), '{}'))
field = getattr(root, to_snake_case(info.field_name), '{}') or '{}'
return json.loads(field)


# Types:
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ async def start_servers(self):
self.barrier.wait()
self.port = self.server.port
self.pub_port = self.publisher.port
self.data_store_mgr.set_workflow_ports()

async def log_start(self):
if self.is_restart:
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/graphql/01-workflow.t
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ cat > expected << __HERE__
"title": "foo",
"description": "bar"
},
"newestRunaheadCyclePoint": "1",
"newestRunaheadCyclePoint": "",
"newestCyclePoint": "1",
"oldestCyclePoint": "1",
"reloaded": false,
Expand Down
39 changes: 33 additions & 6 deletions tests/functional/graphql/02-root-queries.t
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,32 @@ cat > expected << __HERE__
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190101T00${ID_DELIM}baa"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190101T00${ID_DELIM}bar"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190101T00${ID_DELIM}foo"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190101T00${ID_DELIM}qar"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190101T00${ID_DELIM}qaz"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190101T00${ID_DELIM}qux"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190201T00${ID_DELIM}baa"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190201T00${ID_DELIM}foo"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190201T00${ID_DELIM}qar"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190201T00${ID_DELIM}qux"
}
],
"family": {
Expand Down Expand Up @@ -184,22 +202,28 @@ cat > expected << __HERE__
],
"edges": [
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}@wall_clock.20190101T00${ID_DELIM}foo.20190101T00"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}@wall_clock.20190101T00${ID_DELIM}qux.20190101T00"
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}baa.20190101T00${ID_DELIM}baa.20190201T00"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}baa.20190101T00${ID_DELIM}qaz.20190101T00"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}foo.20190101T00${ID_DELIM}bar.20190101T00"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}foo.20190101T00${ID_DELIM}foo.20190201T00"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}qar.20190101T00${ID_DELIM}qar.20190201T00"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}qux.20190101T00${ID_DELIM}bar.20190101T00"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}qux.20190101T00${ID_DELIM}qaz.20190101T00"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}qux.20190101T00${ID_DELIM}qux.20190201T00"
}
],
"nodesEdges": {
Expand All @@ -209,14 +233,17 @@ cat > expected << __HERE__
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190101T00${ID_DELIM}foo"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190201T00${ID_DELIM}foo"
}
],
"edges": [
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}@wall_clock.20190101T00${ID_DELIM}foo.20190101T00"
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}foo.20190101T00${ID_DELIM}bar.20190101T00"
},
{
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}foo.20190101T00${ID_DELIM}bar.20190101T00"
"id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}foo.20190101T00${ID_DELIM}foo.20190201T00"
}
]
}
Expand Down
3 changes: 3 additions & 0 deletions tests/functional/runahead/06-release-update.t
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ CYLC_RUN_PID="$!"
poll_suite_running
YYYY="$(date +%Y)"
NEXT1=$(( YYYY + 1 ))
NEXT2=$(( YYYY + 2 ))
poll_grep_suite_log -F "[bar.${NEXT1}] -released to the task pool"

cylc dump -t "${SUITE_NAME}" | awk '{print $1 $2 $3}' >'log'
cmp_ok 'log' - <<__END__
bar,$YYYY,running,
bar,$NEXT1,waiting,
bar,$NEXT2,waiting,
foo,$NEXT1,waiting,
__END__

Expand Down
33 changes: 17 additions & 16 deletions tests/integration/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ def test_get_entire_workflow(harness):
assert len(flow_msg.task_proxies) == len(data[TASK_PROXIES])


def test_increment_graph_elements(harness):
"""Test method that adds and removes elements by cycle point."""
def test_increment_graph_window(harness):
"""Test method that adds and removes elements window boundary."""
schd, data = harness
assert schd.data_store_mgr.pool_points
assert schd.data_store_mgr.prune_trigger_nodes
assert len(data[TASK_PROXIES]) == 1


Expand All @@ -90,23 +90,24 @@ def test_initiate_data_model(harness):
assert len(data[WORKFLOW].task_proxies) == 1


def test_prune_points(harness):
def test_prune_data_store(harness):
"""Test method that removes data elements by cycle point."""
schd, data = harness
points = [
standardise_point_string(p.cycle_point)
for p in data[TASK_PROXIES].values()
]
point = next(iter(points))
assert point in points
for itask in schd.pool.get_all_tasks():
schd.data_store_mgr.increment_graph_window(
itask.tdef.name, itask.point, itask.flow_label
)
schd.data_store_mgr.apply_deltas()
schd.data_store_mgr.clear_deltas()
schd.data_store_mgr.prune_points([point])
before_count = len(schd.data_store_mgr.data[
schd.data_store_mgr.workflow_id][TASK_PROXIES])
assert before_count > 0
schd.data_store_mgr.prune_flagged_nodes.update(set(data[TASK_PROXIES]))
schd.data_store_mgr.prune_data_store()
schd.data_store_mgr.apply_deltas()
assert point not in [
standardise_point_string(p.cycle_point)
for p in schd.data_store_mgr.data[
schd.data_store_mgr.workflow_id][TASK_PROXIES].values()
]
after_count = len(schd.data_store_mgr.data[
schd.data_store_mgr.workflow_id][TASK_PROXIES])
assert after_count < before_count


def test_update_data_structure(harness):
Expand Down

0 comments on commit 161cd41

Please sign in to comment.