Skip to content

Commit

Permalink
Data store mem leak fixes & nomenclature
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Jan 24, 2020
1 parent 8af55bc commit f522dc7
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 94 deletions.
45 changes: 29 additions & 16 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,21 @@
WORKFLOW: PbWorkflow,
}

# Protobuf message merging appends repeated field results on merge,
# unlike singular fields which are overwritten. This behaviour is
# desirable in many cases, but there are exceptions.
# The following is used to flag which fields require clearing before
# merging from respective deltas messages.
CLEAR_FIELD_MAP = {
EDGES: set(),
FAMILIES: set(),
FAMILY_PROXIES: set(),
JOBS: set(),
TASKS: set(),
TASK_PROXIES: {'prerequisites', 'outputs'},
WORKFLOW: {'state_totals', 'states'},
}


def generate_checksum(in_strings):
"""Generate cross platform & python checksum from strings."""
Expand All @@ -119,24 +134,29 @@ def apply_delta(key, delta, data):
if key == WORKFLOW:
new_data = PbWorkflow()
new_data.CopyFrom(data[key])
new_data.ClearField('state_totals')
new_data.ClearField('states')
# Clear fields the require overwrite with delta
for field in CLEAR_FIELD_MAP[key]:
new_data.ClearField(field)
new_data.MergeFrom(delta)
# fields that are set to empty kinds aren't carried
if not delta.is_held_total:
new_data.is_held_total = 0
if not delta.reloaded:
data[key].reloaded = False
# For thread safe update
data[key] = new_data
return
for element in delta.deltas:
if key in (TASK_PROXIES, FAMILY_PROXIES) and element.id in data[key]:
# fields cannot be directly assigned, so is cleared first.
if hasattr(element, 'prerequisites') and element.prerequisites:
del data[key][element.id].prerequisites[:]
# fields that are set to empty kinds aren't carried
data_ele = data[key].setdefault(element.id, MESSAGE_MAP[key]())
# Clear fields the require overwrite with delta
for field, _ in element.ListFields():
if field.name in CLEAR_FIELD_MAP[key]:
data_ele.ClearField(field.name)
# fields that are set to empty kinds aren't carried
if key in (TASK_PROXIES, FAMILY_PROXIES):
if not element.is_held:
data[key][element.id].is_held = False
data[key].setdefault(element.id, MESSAGE_MAP[key]()).MergeFrom(element)
data_ele.MergeFrom(element)
# Prune data elements by id
for del_id in delta.pruned:
if del_id not in data[key]:
Expand Down Expand Up @@ -618,8 +638,6 @@ def generate_graph_elements(self, start_point=None, stop_point=None):
stamp='f{s_task_id}@{update_time}',
id=s_task_id,
)).proxies.append(source_id)
getattr(self.deltas[WORKFLOW], TASKS).append(
s_task_id)
# Add valid source before checking for no target,
# as source may be an isolate (hence no edges).
# At present targets can't be xtriggers.
Expand All @@ -646,7 +664,6 @@ def generate_graph_elements(self, start_point=None, stop_point=None):
stamp='f{t_task_id}@{update_time}',
id=t_task_id,
)).proxies.append(target_id)
getattr(self.deltas[WORKFLOW], TASKS).append(t_task_id)

# Initiate edge element.
e_id = (
Expand Down Expand Up @@ -762,11 +779,7 @@ def prune_points(self, point_strings):
if tproxy.cycle_point in point_strings:
node_ids.add(tp_id)
self.deltas[TASK_PROXIES].pruned.append(tp_id)
self.deltas[JOBS].pruned.extend(tproxy.jobs)
try:
del self.schd.job_pool.task_jobs[tp_id]
except KeyError:
pass
self.schd.job_pool.remove_task_jobs(tp_id)

for fp_id, fproxy in list(flow_data[FAMILY_PROXIES].items()):
if fproxy.cycle_point in point_strings:
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def __init__(self, schd, context=None, barrier=None,
self.endpoints = None
self.queue = None
self.resolvers = Resolvers(
self.schd.ws_data_mgr.data,
self.schd.data_store_mgr.data,
schd=self.schd)

def _socket_options(self):
Expand Down Expand Up @@ -1223,7 +1223,7 @@ def pb_entire_workflow(self):
Serialised Protobuf message
"""
pb_msg = self.schd.ws_data_mgr.get_entire_workflow()
pb_msg = self.schd.data_store_mgr.get_entire_workflow()
return pb_msg.SerializeToString()

@authorise(Priv.READ)
Expand All @@ -1240,5 +1240,5 @@ def pb_data_elements(self, element_type):
Serialised Protobuf message
"""
pb_msg = self.schd.ws_data_mgr.get_data_elements(element_type)
pb_msg = self.schd.data_store_mgr.get_data_elements(element_type)
return pb_msg.SerializeToString()
16 changes: 8 additions & 8 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def __init__(self, is_restart, options, args):
self.command_queue = None
self.message_queue = None
self.ext_trigger_queue = None
self.ws_data_mgr = None
self.data_store_mgr = None
self.job_pool = None

self._profile_amounts = {}
Expand Down Expand Up @@ -257,7 +257,7 @@ def start(self):
if not self.options.no_detach:
daemonize(self)
self._setup_suite_logger()
self.ws_data_mgr = DataStoreMgr(self)
self.data_store_mgr = DataStoreMgr(self)

# *** Network Related ***
# TODO: this in zmq asyncio context?
Expand Down Expand Up @@ -1563,8 +1563,8 @@ def update_profiler_logs(self, tinit):
def run(self):
"""Main loop."""
self.initialise_scheduler()
self.ws_data_mgr.initiate_data_model()
self.publisher.publish(self.ws_data_mgr.get_publish_deltas())
self.data_store_mgr.initiate_data_model()
self.publisher.publish(self.data_store_mgr.get_publish_deltas())
while True: # MAIN LOOP
tinit = time()
has_reloaded = False
Expand Down Expand Up @@ -1593,9 +1593,9 @@ def run(self):

# Re-initialise data model on reload
if has_reloaded:
self.ws_data_mgr.initiate_data_model(reloaded=True)
self.data_store_mgr.initiate_data_model(reloaded=True)
self.publisher.publish(
self.ws_data_mgr.get_publish_deltas())
self.data_store_mgr.get_publish_deltas())
# Update state summary, database, and uifeed
self.suite_db_mgr.put_task_event_timers(self.task_events_mgr)
has_updated = self.update_data_structure()
Expand Down Expand Up @@ -1646,10 +1646,10 @@ def update_data_structure(self):
self.pool.get_pool_change_tasks())
if has_updated:
# WServer incremental data store update
self.ws_data_mgr.update_data_structure(updated_nodes)
self.data_store_mgr.update_data_structure(updated_nodes)
# Publish updates:
self.publisher.publish(
self.ws_data_mgr.get_publish_deltas())
self.data_store_mgr.get_publish_deltas())
# TODO: deprecate after CLI GraphQL migration
self.state_summary_mgr.update(self)
# Database update
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/tests/network/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class TestSuiteRuntimeClient(CylcWorkflowTestCase):

def setUp(self) -> None:
super(TestSuiteRuntimeClient, self).setUp()
self.scheduler.ws_data_mgr = DataStoreMgr(self.scheduler)
self.scheduler.data_store_mgr = DataStoreMgr(self.scheduler)
for name in self.scheduler.config.taskdefs:
task_proxy = create_task_proxy(
task_name=name,
Expand All @@ -78,8 +78,8 @@ def setUp(self) -> None:
)
assert warnings == 0
self.task_pool.release_runahead_tasks()
self.scheduler.ws_data_mgr.initiate_data_model()
self.workflow_id = self.scheduler.ws_data_mgr.workflow_id
self.scheduler.data_store_mgr.initiate_data_model()
self.workflow_id = self.scheduler.data_store_mgr.workflow_id
create_auth_files(self.suite_name) # auth keys are required for comms
barrier = Barrier(2, timeout=20)
self.server = SuiteRuntimeServer(
Expand Down
8 changes: 4 additions & 4 deletions cylc/flow/tests/network/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class TestWorkflowPublisher(CylcWorkflowTestCase):

def setUp(self) -> None:
super(TestWorkflowPublisher, self).setUp()
self.scheduler.ws_data_mgr = DataStoreMgr(self.scheduler)
self.scheduler.data_store_mgr = DataStoreMgr(self.scheduler)
for name in self.scheduler.config.taskdefs:
task_proxy = create_task_proxy(
task_name=name,
Expand All @@ -80,11 +80,11 @@ def setUp(self) -> None:
)
assert 0 == warnings
self.task_pool.release_runahead_tasks()
self.scheduler.ws_data_mgr.initiate_data_model()
self.workflow_id = self.scheduler.ws_data_mgr.workflow_id
self.scheduler.data_store_mgr.initiate_data_model()
self.workflow_id = self.scheduler.data_store_mgr.workflow_id
self.publisher = WorkflowPublisher(
self.suite_name, threaded=False, daemon=True)
self.pub_data = self.scheduler.ws_data_mgr.get_publish_deltas()
self.pub_data = self.scheduler.data_store_mgr.get_publish_deltas()

def tearDown(self):
self.publisher.stop()
Expand Down
10 changes: 5 additions & 5 deletions cylc/flow/tests/network/test_resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class TestResolvers(CylcWorkflowTestCase):

def setUp(self) -> None:
super(TestResolvers, self).setUp()
self.scheduler.ws_data_mgr = DataStoreMgr(self.scheduler)
self.scheduler.data_store_mgr = DataStoreMgr(self.scheduler)
for name in self.scheduler.config.taskdefs:
task_proxy = create_task_proxy(
task_name=name,
Expand All @@ -123,17 +123,17 @@ def setUp(self) -> None:
)
assert 0 == warnings
self.task_pool.release_runahead_tasks()
self.scheduler.ws_data_mgr.initiate_data_model()
self.workflow_id = self.scheduler.ws_data_mgr.workflow_id
self.data = self.scheduler.ws_data_mgr.data[self.workflow_id]
self.scheduler.data_store_mgr.initiate_data_model()
self.workflow_id = self.scheduler.data_store_mgr.workflow_id
self.data = self.scheduler.data_store_mgr.data[self.workflow_id]
self.node_ids = [
node.id
for node in self.data[TASK_PROXIES].values()]
self.edge_ids = [
edge.id
for edge in self.data[EDGES].values()]
self.resolvers = Resolvers(
self.scheduler.ws_data_mgr.data,
self.scheduler.data_store_mgr.data,
schd=self.scheduler)

def test_constructor(self):
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/tests/network/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class TestSuiteRuntimeServer(CylcWorkflowTestCase):

def setUp(self) -> None:
super(TestSuiteRuntimeServer, self).setUp()
self.scheduler.ws_data_mgr = DataStoreMgr(self.scheduler)
self.scheduler.data_store_mgr = DataStoreMgr(self.scheduler)
for name in self.scheduler.config.taskdefs:
task_proxy = create_task_proxy(
task_name=name,
Expand All @@ -79,8 +79,8 @@ def setUp(self) -> None:
)
assert 0 == warnings
self.task_pool.release_runahead_tasks()
self.scheduler.ws_data_mgr.initiate_data_model()
self.workflow_id = self.scheduler.ws_data_mgr.workflow_id
self.scheduler.data_store_mgr.initiate_data_model()
self.workflow_id = self.scheduler.data_store_mgr.workflow_id
create_auth_files(self.suite_name) # auth keys are required for comms
barrier = Barrier(2, timeout=10)
self.server = SuiteRuntimeServer(
Expand Down
8 changes: 4 additions & 4 deletions cylc/flow/tests/network/test_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class TestWorkflowSubscriber(CylcWorkflowTestCase):

def setUp(self) -> None:
super(TestWorkflowSubscriber, self).setUp()
self.scheduler.ws_data_mgr = DataStoreMgr(self.scheduler)
self.scheduler.data_store_mgr = DataStoreMgr(self.scheduler)
for name in self.scheduler.config.taskdefs:
task_proxy = create_task_proxy(
task_name=name,
Expand All @@ -85,8 +85,8 @@ def setUp(self) -> None:
)
assert warnings == 0
self.task_pool.release_runahead_tasks()
self.scheduler.ws_data_mgr.initiate_data_model()
self.workflow_id = self.scheduler.ws_data_mgr.workflow_id
self.scheduler.data_store_mgr.initiate_data_model()
self.workflow_id = self.scheduler.data_store_mgr.workflow_id
self.publisher = WorkflowPublisher(
self.suite_name, threaded=False, daemon=True)
self.publisher.start(*PORT_RANGE)
Expand All @@ -112,7 +112,7 @@ def test_constructor(self):

def test_subscribe(self):
"""Test publishing data."""
pub_data = self.scheduler.ws_data_mgr.get_publish_deltas()
pub_data = self.scheduler.data_store_mgr.get_publish_deltas()
self.publisher.publish(pub_data)

def msg_process(btopic, msg):
Expand Down
Loading

0 comments on commit f522dc7

Please sign in to comment.