Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure queued deltas are published before shutdown #5098

Merged
merged 4 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ taskdefs removed before restart.
[#5091](https://github.com/cylc/cylc-flow/pull/5091) - Fix problems with
tutorial workflows.

[#5098](https://github.com/cylc/cylc-flow/pull/5098) - Fix bug where final task
status updates were not being sent to UI before shutdown.

[#5114](https://github.com/cylc/cylc-flow/pull/5114) - Fix bug where
validation errors during workflow startup were not printed to stderr before
daemonisation.
Expand Down
5 changes: 2 additions & 3 deletions cylc/flow/network/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,11 @@ async def send_multi(self, topic, data, serializer=None):

"""
if self.socket:
# don't attempt to send anything if we are in the process of
# shutting down
self.topics.add(topic)
self.socket.send_multipart(
[topic, serialize_data(data, serializer)]
)
# else we are in the process of shutting down - don't send anything

async def publish(self, items):
"""Publish topics.
Expand All @@ -98,4 +97,4 @@ async def publish(self, items):
try:
await gather_coros(self.send_multi, items)
except Exception as exc:
LOG.error('publish: %s', exc)
LOG.exception(f"publish: {exc}")
1 change: 0 additions & 1 deletion cylc/flow/network/replier.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Server for workflow runtime API."""

import getpass # noqa: F401
from queue import Queue

import zmq
Expand Down
47 changes: 34 additions & 13 deletions cylc/flow/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,22 +205,30 @@ def start(self, barrier):

self.operate()

async def stop(self, reason):
"""Stop the TCP servers, and clean up authentication."""
async def stop(self, reason: Union[BaseException, str]) -> None:
"""Stop the TCP servers, and clean up authentication.

This method must be called/awaited from a different thread to the
server's self.thread in order to interrupt the self.operate() loop
and wait for self.thread to terminate.
"""
self.queue.put('STOP')
if self.thread and self.thread.is_alive():
while not self.stopping:
# Non-async sleep - yield to other threads rather
# than event loop.
# Non-async sleep - yield to other threads rather than
# event loop (allows self.operate() running in different
# thread to return)
sleep(self.STOP_SLEEP_INTERVAL)

if self.replier:
self.replier.stop(stop_loop=False)
if self.publisher:
await self.publish_queued_items()
await self.publisher.publish(
[(b'shutdown', str(reason).encode('utf-8'))]
)
self.publisher.stop(stop_loop=False)
self.publisher = None
if self.curve_auth:
self.curve_auth.stop() # stop the authentication thread
if self.loop and self.loop.is_running():
Expand All @@ -230,8 +238,11 @@ async def stop(self, reason):

self.stopped = True

def operate(self):
def operate(self) -> None:
"""Orchestrate the receive, send, publish of messages."""
# Note: this cannot be an async method because the response part
# of the listener runs the event loop synchronously
# (in graphql AsyncioExecutor)
while True:
# process messages from the scheduler.
if self.queue.qsize():
Expand All @@ -243,15 +254,18 @@ def operate(self):

# Gather and respond to any requests.
self.replier.listener()

# Publish all requested/queued.
while self.publish_queue.qsize():
articles = self.publish_queue.get()
self.loop.run_until_complete(self.publisher.publish(articles))
self.loop.run_until_complete(self.publish_queued_items())

# Yield control to other threads
sleep(self.OPERATE_SLEEP_INTERVAL)

async def publish_queued_items(self):
"""Publish all queued items."""
while self.publish_queue.qsize():
articles = self.publish_queue.get()
await self.publisher.publish(articles)

def receiver(self, message):
"""Process incoming messages and coordinate response.

Expand Down Expand Up @@ -371,12 +385,19 @@ def graphql(
if executed.errors:
errors: List[Any] = []
for error in executed.errors:
LOG.error(error)
if hasattr(error, '__traceback__'):
import traceback
errors.append({'error': {
'message': str(error),
'traceback': traceback.format_exception(
error.__class__, error, error.__traceback__)}})
formatted_tb = traceback.format_exception(
type(error), error, error.__traceback__
)
LOG.error("".join(formatted_tb))
errors.append({
'error': {
'message': str(error),
'traceback': formatted_tb
}
})
continue
errors.append(getattr(error, 'message', None))
return errors
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1712,7 +1712,7 @@ def check_workflow_stalled(self) -> bool:
self.timers[self.EVENT_STALL_TIMEOUT].reset()
return self.is_stalled

async def shutdown(self, reason: Exception) -> None:
async def shutdown(self, reason: BaseException) -> None:
"""Gracefully shut down the scheduler."""
# At the moment this method must be called from the main_loop.
# In the future it should shutdown the main_loop itself but
Expand All @@ -1733,7 +1733,7 @@ async def shutdown(self, reason: Exception) -> None:
# Re-raise exception to be caught higher up (sets the exit code)
raise exc from None

async def _shutdown(self, reason: Exception) -> None:
async def _shutdown(self, reason: BaseException) -> None:
"""Shutdown the workflow."""
shutdown_msg = "Workflow shutting down"
with patch_log_level(LOG):
Expand Down Expand Up @@ -1995,7 +1995,7 @@ def _check_startup_opts(self) -> None:
f"option --{opt}=reload is only valid for restart"
)

async def handle_exception(self, exc: Exception) -> NoReturn:
async def handle_exception(self, exc: BaseException) -> NoReturn:
"""Gracefully shut down the scheduler given a caught exception.

Re-raises the exception to be caught higher up (sets the exit code).
Expand Down
12 changes: 12 additions & 0 deletions tests/integration/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from typing import Callable
from async_timeout import timeout
import asyncio
from getpass import getuser

import pytest

from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.scheduler import Scheduler


@pytest.fixture(scope='module')
Expand Down Expand Up @@ -135,3 +137,13 @@ def _api(*args, **kwargs):
raise Exception('foo')
one.server.api = _api
assert 'error' in one.server.receiver(msg)


async def test_publish_before_shutdown(
one: Scheduler, start: Callable
):
"""Test that the server publishes final deltas before shutting down."""
async with start(one):
one.server.publish_queue.put([(b'fake', b'blah')])
await one.server.stop('i said stop!')
assert not one.server.publish_queue.qsize()
2 changes: 0 additions & 2 deletions tests/integration/test_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from time import sleep

import pytest
import zmq

Expand Down
4 changes: 1 addition & 3 deletions tests/unit/network/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from time import sleep

from cylc.flow.network.publisher import WorkflowPublisher, serialize_data
from cylc.flow.network.publisher import serialize_data


def test_serialize_data():
Expand Down