Skip to content

Commit

Permalink
delta apply & publish
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Oct 13, 2019
1 parent ac2fcbf commit 7dbb396
Show file tree
Hide file tree
Showing 12 changed files with 855 additions and 240 deletions.
12 changes: 8 additions & 4 deletions bin/cylc-subscribe
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.network.scan import get_scan_items_from_fs, re_compile_filters
from cylc.flow.network.subscriber import WorkflowSubscriber
from cylc.flow.terminal import cli_function
from cylc.flow.ws_messages_pb2 import PbWorkflow
from cylc.flow.ws_data_mgr import DELTAS_MAP


def print_workflow(topic, msg):
data = PbWorkflow()
msg_type = topic.decode('utf-8')
try:
data = DELTAS_MAP[msg_type]()
except KeyError:
return
data.ParseFromString(msg)
print('Received: ', topic.decode())
print('Received: ', msg_type)
sys.stdout.write(
json.dumps(MessageToDict(data), indent=4) + '\n')

Expand Down Expand Up @@ -77,7 +81,7 @@ def main(_, options, suite, topic=None):
if topic is None:
topic = b'workflow'
else:
topic = topic.encode()
topic = topic.encode('utf-8')
subscriber = WorkflowSubscriber(host, port, [topic])

asyncio.ensure_future(subscriber.subscribe([topic], print_workflow))
Expand Down
67 changes: 42 additions & 25 deletions cylc/flow/job_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
TASK_STATUS_READY, TASK_STATUS_SUBMITTED, TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED)
from cylc.flow.ws_messages_pb2 import PbJob
from cylc.flow.ws_messages_pb2 import PbJob, JDeltas
from cylc.flow.ws_data_mgr import ID_DELIM

JOB_STATUSES_ALL = [
Expand All @@ -39,7 +39,7 @@
]


class JobPool(object):
class JobPool:
"""Pool of protobuf job messages."""
# TODO: description, args, and types

Expand All @@ -52,6 +52,9 @@ def __init__(self, suite, owner):
self.workflow_id = f'{self.owner}{ID_DELIM}{self.suite}'
self.pool = {}
self.task_jobs = {}
self.deltas = JDeltas()
self.updates = {}
self.updates_pending = False

def insert_job(self, job_conf):
"""Insert job into pool."""
Expand Down Expand Up @@ -84,23 +87,24 @@ def insert_job(self, job_conf):
cycle_point=point_string,
)
j_buf.batch_sys_conf.extend(
[f'{key}={val}' for key, val in
job_conf['batch_system_conf'].items()])
[f'{key}={val}'
for key, val in job_conf['batch_system_conf'].items()])
j_buf.directives.extend(
[f'{key}={val}' for key, val in
job_conf['directives'].items()])
[f'{key}={val}'
for key, val in job_conf['directives'].items()])
j_buf.environment.extend(
[f'{key}={val}' for key, val in
job_conf['environment'].items()])
[f'{key}={val}'
for key, val in job_conf['environment'].items()])
j_buf.param_env_tmpl.extend(
[f'{key}={val}' for key, val in
job_conf['param_env_tmpl'].items()])
[f'{key}={val}'
for key, val in job_conf['param_env_tmpl'].items()])
j_buf.param_var.extend(
[f'{key}={val}' for key, val in
job_conf['param_var'].items()])
[f'{key}={val}'
for key, val in job_conf['param_var'].items()])
j_buf.extra_logs.extend(job_conf['logfiles'])
self.pool[j_id] = j_buf
self.task_jobs.setdefault(t_id, []).append(j_id)
self.updates[j_id] = j_buf
self.task_jobs.setdefault(t_id, set()).add(j_id)
self.updates_pending = True

def add_job_msg(self, job_d, msg):
"""Add message to job."""
Expand All @@ -110,8 +114,10 @@ def add_job_msg(self, job_d, msg):
f'{self.workflow_id}{ID_DELIM}{point}'
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
try:
self.pool[j_id].messages.append(msg)
self.pool[j_id].stamp = f'{j_id}@{update_time}'
j_delta = PbJob(stamp=f'{j_id}@{update_time}')
j_delta.messages.append(msg)
self.updates.setdefault(j_id, PbJob(id=j_id)).MergeFrom(j_delta)
self.updates_pending = True
except (KeyError, TypeError):
pass

Expand All @@ -121,17 +127,19 @@ def remove_job(self, job_d):
t_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}'
j_id = f'{t_id}{ID_DELIM}{sub_num}'
try:
del self.pool[j_id]
self.task_jobs[t_id].remove(j_id)
self.task_jobs[t_id].discard(j_id)
self.deltas.pruned.append(j_id)
self.updates_pending = True
except KeyError:
pass

def remove_task_jobs(self, task_id):
"""Removed a task's jobs from the pool via task ID."""
try:
for j_id in self.task_jobs[task_id]:
del self.pool[j_id]
self.deltas.pruned.append(j_id)
del self.task_jobs[task_id]
self.updates_pending = True
except KeyError:
pass

Expand All @@ -143,8 +151,10 @@ def set_job_attr(self, job_d, attr_key, attr_val):
f'{self.workflow_id}{ID_DELIM}{point}'
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
try:
setattr(self.pool[j_id], attr_key, attr_val)
self.pool[j_id].stamp = f'{j_id}@{update_time}'
j_delta = PbJob(stamp=f'{j_id}@{update_time}')
setattr(j_delta, attr_key, attr_val)
self.updates.setdefault(j_id, PbJob(id=j_id)).MergeFrom(j_delta)
self.updates_pending = True
except (KeyError, TypeError, AttributeError):
pass

Expand All @@ -157,8 +167,13 @@ def set_job_state(self, job_d, status):
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
if status in JOB_STATUSES_ALL:
try:
self.pool[j_id].state = status
self.pool[j_id].stamp = f'{j_id}@{update_time}'
j_delta = PbJob(
stamp=f'{j_id}@{update_time}',
state=status
)
self.updates.setdefault(
j_id, PbJob(id=j_id)).MergeFrom(j_delta)
self.updates_pending = True
except KeyError:
pass

Expand All @@ -173,8 +188,10 @@ def set_job_time(self, job_d, event_key, time_str=None):
f'{self.workflow_id}{ID_DELIM}{point}'
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
try:
setattr(self.pool[j_id], event_key + '_time', time_str)
self.pool[j_id].stamp = f'{j_id}@{update_time}'
j_delta = PbJob(stamp=f'{j_id}@{update_time}')
setattr(j_delta, event_key + '_time', time_str)
self.updates.setdefault(j_id, PbJob(id=j_id)).MergeFrom(j_delta)
self.updates_pending = True
except (KeyError, TypeError, AttributeError):
pass

Expand Down
2 changes: 0 additions & 2 deletions cylc/flow/network/publisher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#!/usr/bin/env python3

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors.
#
Expand Down
2 changes: 0 additions & 2 deletions cylc/flow/network/subscriber.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#!/usr/bin/env python3

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors.
#
Expand Down
7 changes: 2 additions & 5 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1594,13 +1594,10 @@ def update_data_structure(self):
self.pool.get_pool_change_tasks())
if has_updated:
# WServer incremental data store update
self.ws_data_mgr.increment_graph_elements()
self.ws_data_mgr.update_dynamic_elements(updated_nodes)
self.ws_data_mgr.update_data_structure(updated_nodes)
# Publish updates:
flow_data = self.ws_data_mgr.data[f'{self.owner}|{self.suite}']
self.publisher.publish(
[(b'workflow', flow_data['workflow'], 'SerializeToString')]
)
self.ws_data_mgr.get_publish_deltas())
# TODO: deprecate after CLI GraphQL migration
self.state_summary_mgr.update(self)
# Database update
Expand Down
32 changes: 16 additions & 16 deletions cylc/flow/tests/test_job_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ def setUp(self) -> None:

def test_insert_job(self):
"""Test method that adds a new job to the pool."""
self.assertEqual(0, len(self.job_pool.pool))
self.assertEqual(0, len(self.job_pool.updates))
self.job_pool.insert_job(JOB_CONFIG)
self.assertEqual(1, len(self.job_pool.pool))
self.assertTrue(self.ext_id in self.job_pool.pool)
self.assertEqual(1, len(self.job_pool.updates))
self.assertTrue(self.ext_id in self.job_pool.updates)

def test_add_job_msg(self):
"""Test method adding messages to job element."""
self.job_pool.insert_job(JOB_CONFIG)
job = self.job_pool.pool[self.ext_id]
job = self.job_pool.updates[self.ext_id]
old_stamp = copy(job.stamp)
self.assertEqual(0, len(job.messages))
self.job_pool.add_job_msg('NotJobID', 'The Atomic Age')
Expand All @@ -80,28 +80,28 @@ def test_add_job_msg(self):
def test_remove_job(self):
"""Test method removing a job from the pool via internal job id."""
self.job_pool.insert_job(JOB_CONFIG)
jobs = self.job_pool.pool
self.assertEqual(1, len(jobs))
pruned = self.job_pool.deltas.pruned
self.assertEqual(0, len(pruned))
self.job_pool.remove_job('NotJobID')
self.assertEqual(1, len(jobs))
self.assertEqual(0, len(pruned))
self.job_pool.remove_job(self.int_id)
self.assertEqual(0, len(jobs))
self.assertEqual(1, len(pruned))

def test_remove_task_jobs(self):
"""Test method removing jobs from the pool via internal task ID."""
self.job_pool.insert_job(JOB_CONFIG)
jobs = self.job_pool.pool
self.assertEqual(1, len(jobs))
pruned = self.job_pool.deltas.pruned
self.assertEqual(0, len(pruned))
self.job_pool.remove_task_jobs('NotTaskID')
self.assertEqual(1, len(jobs))
task_id = self.job_pool.pool[self.ext_id].task_proxy
self.assertEqual(0, len(pruned))
task_id = self.job_pool.updates[self.ext_id].task_proxy
self.job_pool.remove_task_jobs(task_id)
self.assertEqual(0, len(jobs))
self.assertEqual(1, len(pruned))

def test_set_job_attr(self):
"""Test method setting job attribute value."""
self.job_pool.insert_job(JOB_CONFIG)
job = self.job_pool.pool[self.ext_id]
job = self.job_pool.updates[self.ext_id]
old_exit_script = copy(job.exit_script)
self.job_pool.set_job_attr(self.int_id, 'leave_scripting', 'rm -v *')
self.assertEqual(old_exit_script, job.exit_script)
Expand All @@ -113,7 +113,7 @@ def test_set_job_attr(self):
def test_set_job_state(self):
"""Test method setting the job state."""
self.job_pool.insert_job(JOB_CONFIG)
job = self.job_pool.pool[self.ext_id]
job = self.job_pool.updates[self.ext_id]
old_state = copy(job.state)
self.job_pool.set_job_state(self.int_id, 'waiting')
self.assertEqual(old_state, job.state)
Expand All @@ -126,7 +126,7 @@ def test_set_job_time(self):
"""Test method setting event time."""
event_time = get_current_time_string()
self.job_pool.insert_job(JOB_CONFIG)
job = self.job_pool.pool[self.ext_id]
job = self.job_pool.updates[self.ext_id]
old_time = copy(job.submitted_time)
self.job_pool.set_job_time(self.int_id, 'jumped', event_time)
self.assertEqual(old_time, job.submitted_time)
Expand Down
40 changes: 29 additions & 11 deletions cylc/flow/tests/test_ws_data_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,20 @@ def test_generate_definition_elements(self):
task_defs = self.scheduler.config.taskdefs.keys()
self.assertEqual(0, len(self.data[TASKS]))
self.ws_data_mgr.generate_definition_elements()
self.ws_data_mgr.apply_deltas()
self.assertEqual(len(task_defs), len(self.data[TASKS]))

def test_generate_graph_elements(self):
"""Test method that generates edge and ghost node elements
by cycle point."""
self.ws_data_mgr.generate_definition_elements()
self.ws_data_mgr.apply_deltas()
self.ws_data_mgr.pool_points = set(list(self.scheduler.pool.pool))
tasks_proxies_generated = self.data[TASK_PROXIES]
self.assertEqual(0, len(tasks_proxies_generated))
self.ws_data_mgr.generate_graph_elements(
self.data[EDGES],
self.data[TASK_PROXIES],
self.data[FAMILY_PROXIES]
)
self.ws_data_mgr.clear_deltas()
self.ws_data_mgr.generate_graph_elements()
self.ws_data_mgr.apply_deltas()
self.assertEqual(3, len(tasks_proxies_generated))

def test_get_entire_workflow(self):
Expand All @@ -119,6 +119,7 @@ def test_increment_graph_elements(self):
self.assertEqual(0, len(self.data[TASK_PROXIES]))
self.ws_data_mgr.generate_definition_elements()
self.ws_data_mgr.increment_graph_elements()
self.ws_data_mgr.apply_deltas()
self.assertTrue(self.ws_data_mgr.pool_points)
self.assertEqual(3, len(self.data[TASK_PROXIES]))

Expand All @@ -130,24 +131,37 @@ def test_initiate_data_model(self):

def test_prune_points(self):
"""Test method that removes data elements by cycle point."""
self.ws_data_mgr.generate_definition_elements()
self.ws_data_mgr.increment_graph_elements()
self.ws_data_mgr.initiate_data_model()
points = self.ws_data_mgr.cycle_states.keys()
point = next(iter(points))
self.assertTrue(point in points)
self.ws_data_mgr.clear_deltas()
self.ws_data_mgr.prune_points([point])
self.ws_data_mgr.apply_deltas()
self.assertTrue(point not in points)

def test_update_data_structure(self):
"""Test update_data_structure. This method will generate and
apply deltas/updates given."""
self.ws_data_mgr.initiate_data_model()
self.assertEqual(0, len(self._collect_states(TASK_PROXIES)))
update_tasks = self.task_pool.get_all_tasks()
self.ws_data_mgr.update_data_structure(update_tasks)
self.assertTrue(len(update_tasks) > 0)
self.assertEqual(
len(update_tasks), len(self._collect_states(TASK_PROXIES)))

def test_update_family_proxies(self):
"""Test update_family_proxies. This method will update all
WsDataMgr task_proxies of given cycle point strings."""
self.ws_data_mgr.generate_definition_elements()
self.ws_data_mgr.increment_graph_elements()
self.ws_data_mgr.initiate_data_model()
self.assertEqual(0, len(self._collect_states(FAMILY_PROXIES)))
update_tasks = self.task_pool.get_all_tasks()
update_points = set((str(t.point) for t in update_tasks))
self.ws_data_mgr.clear_deltas()
self.ws_data_mgr.update_task_proxies(update_tasks)
self.ws_data_mgr.update_family_proxies(update_points)
self.ws_data_mgr.apply_deltas()
# Find families in updated cycle points
point_fams = [
f.id
Expand All @@ -161,20 +175,24 @@ def test_update_task_proxies(self):
"""Test update_task_proxies. This method will iterate over given
task instances (TaskProxy), and update any corresponding
WsDataMgr task_proxies."""
self.ws_data_mgr.generate_definition_elements()
self.ws_data_mgr.increment_graph_elements()
self.ws_data_mgr.initiate_data_model()
self.assertEqual(0, len(self._collect_states(TASK_PROXIES)))
update_tasks = self.task_pool.get_all_tasks()
self.ws_data_mgr.clear_deltas()
self.ws_data_mgr.update_task_proxies(update_tasks)
self.ws_data_mgr.apply_deltas()
self.assertTrue(len(update_tasks) > 0)
self.assertEqual(
len(update_tasks), len(self._collect_states(TASK_PROXIES)))

def test_update_workflow(self):
"""Test method that updates the dynamic fields of the workflow msg."""
self.ws_data_mgr.generate_definition_elements()
self.ws_data_mgr.apply_deltas()
old_time = self.data[WORKFLOW].last_updated
self.ws_data_mgr.clear_deltas()
self.ws_data_mgr.update_workflow()
self.ws_data_mgr.apply_deltas()
new_time = self.data[WORKFLOW].last_updated
self.assertTrue(new_time > old_time)

Expand Down
Loading

0 comments on commit 7dbb396

Please sign in to comment.