From 95336564ec431bd52d5a49186d9176f3276079e6 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Tue, 23 Aug 2022 13:26:34 +0100
Subject: [PATCH 1/3] Ensure final deltas published before
WorkflowRuntimeServer shutdown
---
cylc/flow/network/server.py | 32 +++++++++++++++++++++++---------
tests/integration/test_server.py | 12 ++++++++++++
2 files changed, 35 insertions(+), 9 deletions(-)
diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py
index 6802bf0cedb..d8bb8703cae 100644
--- a/cylc/flow/network/server.py
+++ b/cylc/flow/network/server.py
@@ -205,22 +205,30 @@ def start(self, barrier):
self.operate()
- async def stop(self, reason):
- """Stop the TCP servers, and clean up authentication."""
+ async def stop(self, reason: Union[Exception, str]) -> None:
+ """Stop the TCP servers, and clean up authentication.
+
+ This method must be called/awaited from a different thread to the
+ server's self.thread in order to interrupt the self.operate() loop
+ and wait for self.thread to terminate.
+ """
self.queue.put('STOP')
if self.thread and self.thread.is_alive():
while not self.stopping:
- # Non-async sleep - yield to other threads rather
- # than event loop.
+ # Non-async sleep - yield to other threads rather than
+ # event loop (allows self.operate() running in different
+ # thread to return)
sleep(self.STOP_SLEEP_INTERVAL)
if self.replier:
self.replier.stop(stop_loop=False)
if self.publisher:
+ await self.publish_queued_items()
await self.publisher.publish(
[(b'shutdown', str(reason).encode('utf-8'))]
)
self.publisher.stop(stop_loop=False)
+ self.publisher = None
if self.curve_auth:
self.curve_auth.stop() # stop the authentication thread
if self.loop and self.loop.is_running():
@@ -230,8 +238,11 @@ async def stop(self, reason):
self.stopped = True
- def operate(self):
+ def operate(self) -> None:
"""Orchestrate the receive, send, publish of messages."""
+ # Note: this cannot be an async method because the response part
+ # of the listener runs the event loop synchronously
+ # (in graphql AsyncioExecutor)
while True:
# process messages from the scheduler.
if self.queue.qsize():
@@ -243,15 +254,18 @@ def operate(self):
# Gather and respond to any requests.
self.replier.listener()
-
# Publish all requested/queued.
- while self.publish_queue.qsize():
- articles = self.publish_queue.get()
- self.loop.run_until_complete(self.publisher.publish(articles))
+ self.loop.run_until_complete(self.publish_queued_items())
# Yield control to other threads
sleep(self.OPERATE_SLEEP_INTERVAL)
+ async def publish_queued_items(self):
+ """Publish all queued items."""
+ while self.publish_queue.qsize():
+ articles = self.publish_queue.get()
+ await self.publisher.publish(articles)
+
def receiver(self, message):
"""Process incoming messages and coordinate response.
diff --git a/tests/integration/test_server.py b/tests/integration/test_server.py
index c6625b37c77..8cec055d763 100644
--- a/tests/integration/test_server.py
+++ b/tests/integration/test_server.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
+from typing import Callable
from async_timeout import timeout
import asyncio
from getpass import getuser
@@ -21,6 +22,7 @@
import pytest
from cylc.flow.network.server import PB_METHOD_MAP
+from cylc.flow.scheduler import Scheduler
@pytest.fixture(scope='module')
@@ -135,3 +137,13 @@ def _api(*args, **kwargs):
raise Exception('foo')
one.server.api = _api
assert 'error' in one.server.receiver(msg)
+
+
+async def test_publish_before_shutdown(
+ one: Scheduler, start: Callable
+):
+ """Test that the server publishes final deltas before shutting down."""
+ async with start(one):
+ one.server.publish_queue.put([(b'fake', b'blah')])
+ await one.server.stop('i said stop!')
+ assert not one.server.publish_queue.qsize()
From 70d4e0e67bb907973476c3dd4de0bc2decad51fd Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Thu, 1 Sep 2022 12:38:30 +0100
Subject: [PATCH 2/3] Improve error logging
Plus tidy + type annotations
---
cylc/flow/network/publisher.py | 5 ++---
cylc/flow/network/replier.py | 1 -
cylc/flow/network/server.py | 17 ++++++++++++-----
cylc/flow/scheduler.py | 6 +++---
tests/integration/test_zmq.py | 2 --
tests/unit/network/test_publisher.py | 4 +---
6 files changed, 18 insertions(+), 17 deletions(-)
diff --git a/cylc/flow/network/publisher.py b/cylc/flow/network/publisher.py
index 7383e7675df..35e8b50be6d 100644
--- a/cylc/flow/network/publisher.py
+++ b/cylc/flow/network/publisher.py
@@ -81,12 +81,11 @@ async def send_multi(self, topic, data, serializer=None):
"""
if self.socket:
- # don't attempt to send anything if we are in the process of
- # shutting down
self.topics.add(topic)
self.socket.send_multipart(
[topic, serialize_data(data, serializer)]
)
+ # else we are in the process of shutting down - don't send anything
async def publish(self, items):
"""Publish topics.
@@ -98,4 +97,4 @@ async def publish(self, items):
try:
await gather_coros(self.send_multi, items)
except Exception as exc:
- LOG.error('publish: %s', exc)
+ LOG.exception(f"publish: {exc}")
diff --git a/cylc/flow/network/replier.py b/cylc/flow/network/replier.py
index 68f754e862c..f272ea7d0e7 100644
--- a/cylc/flow/network/replier.py
+++ b/cylc/flow/network/replier.py
@@ -15,7 +15,6 @@
# along with this program. If not, see .
"""Server for workflow runtime API."""
-import getpass # noqa: F401
from queue import Queue
import zmq
diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py
index d8bb8703cae..bfc34e2c3ef 100644
--- a/cylc/flow/network/server.py
+++ b/cylc/flow/network/server.py
@@ -205,7 +205,7 @@ def start(self, barrier):
self.operate()
- async def stop(self, reason: Union[Exception, str]) -> None:
+ async def stop(self, reason: Union[BaseException, str]) -> None:
"""Stop the TCP servers, and clean up authentication.
This method must be called/awaited from a different thread to the
@@ -385,12 +385,19 @@ def graphql(
if executed.errors:
errors: List[Any] = []
for error in executed.errors:
+ LOG.error(error)
if hasattr(error, '__traceback__'):
import traceback
- errors.append({'error': {
- 'message': str(error),
- 'traceback': traceback.format_exception(
- error.__class__, error, error.__traceback__)}})
+ formatted_tb = traceback.format_exception(
+ type(error), error, error.__traceback__
+ )
+ LOG.error("".join(formatted_tb))
+ errors.append({
+ 'error': {
+ 'message': str(error),
+ 'traceback': formatted_tb
+ }
+ })
continue
errors.append(getattr(error, 'message', None))
return errors
diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py
index b3453cb95a6..55559e23ace 100644
--- a/cylc/flow/scheduler.py
+++ b/cylc/flow/scheduler.py
@@ -1698,7 +1698,7 @@ def check_workflow_stalled(self):
self.timers[self.EVENT_STALL_TIMEOUT].reset()
return self.is_stalled
- async def shutdown(self, reason: Exception) -> None:
+ async def shutdown(self, reason: BaseException) -> None:
"""Gracefully shut down the scheduler."""
# At the moment this method must be called from the main_loop.
# In the future it should shutdown the main_loop itself but
@@ -1719,7 +1719,7 @@ async def shutdown(self, reason: Exception) -> None:
# Re-raise exception to be caught higher up (sets the exit code)
raise exc from None
- async def _shutdown(self, reason: Exception) -> None:
+ async def _shutdown(self, reason: BaseException) -> None:
"""Shutdown the workflow."""
shutdown_msg = "Workflow shutting down"
if isinstance(reason, SchedulerStop):
@@ -1977,7 +1977,7 @@ def _check_startup_opts(self) -> None:
f"option --{opt}=reload is only valid for restart"
)
- async def handle_exception(self, exc: Exception) -> NoReturn:
+ async def handle_exception(self, exc: BaseException) -> NoReturn:
"""Gracefully shut down the scheduler given a caught exception.
Re-raises the exception to be caught higher up (sets the exit code).
diff --git a/tests/integration/test_zmq.py b/tests/integration/test_zmq.py
index 91fb454845a..24c8db6d9b0 100644
--- a/tests/integration/test_zmq.py
+++ b/tests/integration/test_zmq.py
@@ -14,8 +14,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-from time import sleep
-
import pytest
import zmq
diff --git a/tests/unit/network/test_publisher.py b/tests/unit/network/test_publisher.py
index 7dd69134f69..c43a1a64f50 100644
--- a/tests/unit/network/test_publisher.py
+++ b/tests/unit/network/test_publisher.py
@@ -14,9 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-from time import sleep
-
-from cylc.flow.network.publisher import WorkflowPublisher, serialize_data
+from cylc.flow.network.publisher import serialize_data
def test_serialize_data():
From 14f00e9e9c9e8772b44c8273b04edf3d50a8efd9 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Thu, 25 Aug 2022 12:01:17 +0100
Subject: [PATCH 3/3] Update changelog
---
CHANGES.md | 3 +++
1 file changed, 3 insertions(+)
diff --git a/CHANGES.md b/CHANGES.md
index 86e6f6d54c4..fdf1a5ae34e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -44,6 +44,9 @@ taskdefs removed before restart.
[#5091](https://github.com/cylc/cylc-flow/pull/5091) - Fix problems with
tutorial workflows.
+[#5098](https://github.com/cylc/cylc-flow/pull/5098) - Fix bug where final task
+status updates were not being sent to UI before shutdown.
+
-------------------------------------------------------------------------------
## __cylc-8.0.1 (Released 2022-08-16)__