From a4c65541f71e8e56ef89a604aee636e152bc3ebc Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 7 Jul 2021 15:13:47 +0100 Subject: [PATCH 01/11] Add synced dict between cluster and scheduler to store cluster info --- distributed/deploy/cluster.py | 43 +++++++++++++++++++++++--- distributed/deploy/tests/test_local.py | 14 +++++++++ 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 37ddc31f39b..bc513297f19 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -1,4 +1,6 @@ import asyncio +import contextlib +import copy import datetime import logging import threading @@ -13,7 +15,15 @@ from ..core import Status from ..objects import SchedulerInfo -from ..utils import Log, Logs, format_dashboard_link, log_errors, sync, thread_state +from ..utils import ( + Log, + Logs, + format_dashboard_link, + log_errors, + sync, + thread_state, + typename, +) from .adaptive import Adaptive logger = logging.getLogger(__name__) @@ -51,17 +61,26 @@ def __init__(self, asynchronous, quiet=False, name=None): self._asynchronous = asynchronous self._watch_worker_status_comm = None self._watch_worker_status_task = None + self._sync_cluster_info_task = None self._cluster_manager_logs = [] self.quiet = quiet self.scheduler_comm = None self._adaptive = None - if name is not None: - self.name = name - elif self.name is None: - self.name = str(uuid.uuid4())[:8] + if name is None: + name = str(uuid.uuid4())[:8] + + self.cluster_info = {"name": name, "type": typename(type(self))} self.status = Status.created + @property + def name(self): + return self.cluster_info["name"] + + @name.setter + def name(self, name): + self.cluster_info["name"] = name + async def _start(self): comm = await self.scheduler_comm.live_comm() await comm.write({"op": "subscribe_worker_status"}) @@ -70,8 +89,22 @@ async def _start(self): self._watch_worker_status_task = asyncio.ensure_future( self._watch_worker_status(comm) ) + + with contextlib.suppress(KeyError): + self.cluster_info.update( + (await self.scheduler_comm.get_metadata(keys=["cluster-manager-info"])) + ) + self._sync_cluster_info_task = asyncio.ensure_future(self._sync_cluster_info()) self.status = Status.running + async def _sync_cluster_info(self): + while True: + await self.scheduler_comm.set_metadata( + keys=["cluster-manager-info"], + value=copy.copy(self.cluster_info), + ) + await asyncio.sleep(1) + async def _close(self): if self.status == Status.closed: return diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index 13a3a16d928..930d5156a76 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -1079,3 +1079,17 @@ async def test_local_cluster_redundant_kwarg(nanny): async with Client(cluster) as c: f = c.submit(sleep, 0) await f + + +@pytest.mark.asyncio +async def test_cluster_info_sync(): + async with LocalCluster(processes=False, asynchronous=True) as cluster: + assert cluster.cluster_info["name"] == cluster.name + info = await cluster.scheduler_comm.get_metadata(keys=["cluster-manager-info"]) + assert info["name"] == cluster.name + info = cluster.scheduler.get_metadata(keys=["cluster-manager-info"]) + assert info["name"] == cluster.name + + cluster.cluster_info["foo"] = "bar" + await asyncio.sleep(2) + assert "foo" in cluster.scheduler.get_metadata(keys=["cluster-manager-info"]) From 42f363ecc5c4414eb18d38bef0abb3b079f6db82 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 7 Jul 2021 16:00:14 +0100 Subject: [PATCH 02/11] Add suppress comment --- distributed/deploy/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index bc513297f19..c7fb0c64494 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -90,7 +90,7 @@ async def _start(self): self._watch_worker_status(comm) ) - with contextlib.suppress(KeyError): + with contextlib.suppress(KeyError): # The scheduler might not have any info self.cluster_info.update( (await self.scheduler_comm.get_metadata(keys=["cluster-manager-info"])) ) From 459dde72a384aa2bf110bf438a21aba118a3cfe7 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 7 Jul 2021 16:03:30 +0100 Subject: [PATCH 03/11] Add cancellation on stop of sync task --- distributed/deploy/cluster.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index c7fb0c64494..fefbca3e590 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -98,12 +98,15 @@ async def _start(self): self.status = Status.running async def _sync_cluster_info(self): - while True: - await self.scheduler_comm.set_metadata( - keys=["cluster-manager-info"], - value=copy.copy(self.cluster_info), - ) - await asyncio.sleep(1) + try: + while True: + await self.scheduler_comm.set_metadata( + keys=["cluster-manager-info"], + value=copy.copy(self.cluster_info), + ) + await asyncio.sleep(1) + except asyncio.CancelledError: + return async def _close(self): if self.status == Status.closed: @@ -116,6 +119,8 @@ async def _close(self): await self._watch_worker_status_comm.close() if self._watch_worker_status_task: await self._watch_worker_status_task + if self._sync_cluster_info_task: + await self._sync_cluster_info_task.cancel() for pc in self.periodic_callbacks.values(): pc.stop() From d0ba7f6e7bd4441206f878b1f71f46735281365d Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 7 Jul 2021 16:05:51 +0100 Subject: [PATCH 04/11] Remove extraneous await --- distributed/deploy/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index fefbca3e590..747a9c9c900 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -120,7 +120,7 @@ async def _close(self): if self._watch_worker_status_task: await self._watch_worker_status_task if self._sync_cluster_info_task: - await self._sync_cluster_info_task.cancel() + self._sync_cluster_info_task.cancel() for pc in self.periodic_callbacks.values(): pc.stop() From 276ae107126e3450eb7c4e2e45ca1bda3baae3a3 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 8 Jul 2021 12:25:11 +0200 Subject: [PATCH 05/11] Use PC instead of future --- distributed/deploy/cluster.py | 34 +++++++++++++------------- distributed/deploy/local.py | 2 ++ distributed/deploy/spec.py | 2 ++ distributed/deploy/tests/test_local.py | 7 ++++-- 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 747a9c9c900..5587c79ba5b 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -8,7 +8,7 @@ from contextlib import suppress from inspect import isawaitable -from tornado.ioloop import PeriodicCallback +from tornado.ioloop import IOLoop, PeriodicCallback import dask.config from dask.utils import _deprecated, format_bytes, parse_timedelta @@ -55,7 +55,7 @@ class Cluster: _supports_scaling = True name = None - def __init__(self, asynchronous, quiet=False, name=None): + def __init__(self, asynchronous, quiet=False, name=None, scheduler_sync_interval=1): self.scheduler_info = {"workers": {}} self.periodic_callbacks = {} self._asynchronous = asynchronous @@ -66,6 +66,9 @@ def __init__(self, asynchronous, quiet=False, name=None): self.quiet = quiet self.scheduler_comm = None self._adaptive = None + self._sync_interval = parse_timedelta( + scheduler_sync_interval, default="seconds" + ) if name is None: name = str(uuid.uuid4())[:8] @@ -94,19 +97,18 @@ async def _start(self): self.cluster_info.update( (await self.scheduler_comm.get_metadata(keys=["cluster-manager-info"])) ) - self._sync_cluster_info_task = asyncio.ensure_future(self._sync_cluster_info()) + self.loop = IOLoop.current() + self.periodic_callbacks["sync-cluster-info"] = pc = PeriodicCallback( + self._sync_cluster_info, self._sync_interval * 1000 + ) + self.loop.add_callback(pc.start) self.status = Status.running async def _sync_cluster_info(self): - try: - while True: - await self.scheduler_comm.set_metadata( - keys=["cluster-manager-info"], - value=copy.copy(self.cluster_info), - ) - await asyncio.sleep(1) - except asyncio.CancelledError: - return + await self.scheduler_comm.set_metadata( + keys=["cluster-manager-info"], + value=copy.copy(self.cluster_info), + ) async def _close(self): if self.status == Status.closed: @@ -119,15 +121,13 @@ async def _close(self): await self._watch_worker_status_comm.close() if self._watch_worker_status_task: await self._watch_worker_status_task - if self._sync_cluster_info_task: - self._sync_cluster_info_task.cancel() - - for pc in self.periodic_callbacks.values(): - pc.stop() if self.scheduler_comm: await self.scheduler_comm.close_rpc() + for pc in self.periodic_callbacks.values(): + pc.stop() + self.status = Status.closed def close(self, timeout=None): diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index e759d8b1216..d5fc94e9174 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -120,6 +120,7 @@ def __init__( interface=None, worker_class=None, scheduler_kwargs=None, + scheduler_sync_interval=1, **worker_kwargs, ): if ip is not None: @@ -240,6 +241,7 @@ def __init__( asynchronous=asynchronous, silence_logs=silence_logs, security=security, + scheduler_sync_interval=scheduler_sync_interval, ) def start_worker(self, *args, **kwargs): diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index aa3dc3f7b84..33ef1c843f8 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -241,6 +241,7 @@ def __init__( silence_logs=False, name=None, shutdown_on_close=True, + scheduler_sync_interval=1, ): self._created = weakref.WeakSet() @@ -270,6 +271,7 @@ def __init__( super().__init__( asynchronous=asynchronous, name=name, + scheduler_sync_interval=scheduler_sync_interval, ) if not self.asynchronous: diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index 930d5156a76..20392239b92 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -1083,13 +1083,16 @@ async def test_local_cluster_redundant_kwarg(nanny): @pytest.mark.asyncio async def test_cluster_info_sync(): - async with LocalCluster(processes=False, asynchronous=True) as cluster: + async with LocalCluster( + processes=False, asynchronous=True, scheduler_sync_interval="2ms" + ) as cluster: assert cluster.cluster_info["name"] == cluster.name + await asyncio.sleep(0.005) info = await cluster.scheduler_comm.get_metadata(keys=["cluster-manager-info"]) assert info["name"] == cluster.name info = cluster.scheduler.get_metadata(keys=["cluster-manager-info"]) assert info["name"] == cluster.name cluster.cluster_info["foo"] = "bar" - await asyncio.sleep(2) + await asyncio.sleep(0.005) assert "foo" in cluster.scheduler.get_metadata(keys=["cluster-manager-info"]) From 7a4980ec33204722555fe3da5336b161f530782e Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 8 Jul 2021 14:38:39 +0100 Subject: [PATCH 06/11] Update distributed/deploy/cluster.py Co-authored-by: Florian Jetter --- distributed/deploy/cluster.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 5587c79ba5b..7e46ecc9f88 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -61,7 +61,6 @@ def __init__(self, asynchronous, quiet=False, name=None, scheduler_sync_interval self._asynchronous = asynchronous self._watch_worker_status_comm = None self._watch_worker_status_task = None - self._sync_cluster_info_task = None self._cluster_manager_logs = [] self.quiet = quiet self.scheduler_comm = None From 2ef3b256ca0202f3eefcd7cc0614f730a0da8e5f Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 8 Jul 2021 16:49:46 +0100 Subject: [PATCH 07/11] Start callbacks all together (by @jrbourbeau) --- distributed/deploy/cluster.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 7e46ecc9f88..c9862c00d0f 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -8,7 +8,7 @@ from contextlib import suppress from inspect import isawaitable -from tornado.ioloop import IOLoop, PeriodicCallback +from tornado.ioloop import PeriodicCallback import dask.config from dask.utils import _deprecated, format_bytes, parse_timedelta @@ -96,11 +96,11 @@ async def _start(self): self.cluster_info.update( (await self.scheduler_comm.get_metadata(keys=["cluster-manager-info"])) ) - self.loop = IOLoop.current() - self.periodic_callbacks["sync-cluster-info"] = pc = PeriodicCallback( + self.periodic_callbacks["sync-cluster-info"] = PeriodicCallback( self._sync_cluster_info, self._sync_interval * 1000 ) - self.loop.add_callback(pc.start) + for pc in self.periodic_callbacks.values(): + pc.start() self.status = Status.running async def _sync_cluster_info(self): From d451d36ed61269d5b49b6ce05ce122285cfebf78 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Fri, 9 Jul 2021 15:19:55 +0100 Subject: [PATCH 08/11] Instead of suppressing a KeyError set a default value of {} instead --- distributed/deploy/cluster.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index c9862c00d0f..3eb9acd08e5 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -1,5 +1,4 @@ import asyncio -import contextlib import copy import datetime import logging @@ -92,10 +91,11 @@ async def _start(self): self._watch_worker_status(comm) ) - with contextlib.suppress(KeyError): # The scheduler might not have any info - self.cluster_info.update( - (await self.scheduler_comm.get_metadata(keys=["cluster-manager-info"])) - ) + info = await self.scheduler_comm.get_metadata( + keys=["cluster-manager-info"], default={} + ) + self.cluster_info.update(info) + self.periodic_callbacks["sync-cluster-info"] = PeriodicCallback( self._sync_cluster_info, self._sync_interval * 1000 ) From 9a215a30a2b26ef54c03f6c57da87875393f62a1 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Fri, 16 Jul 2021 11:30:08 +0100 Subject: [PATCH 09/11] Shortening interval and increasing sleep to allow for more syncs before asserting --- distributed/deploy/tests/test_local.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index 6cb85f126a3..182465adfd2 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -1082,15 +1082,15 @@ async def test_local_cluster_redundant_kwarg(nanny): @pytest.mark.asyncio async def test_cluster_info_sync(): async with LocalCluster( - processes=False, asynchronous=True, scheduler_sync_interval="2ms" + processes=False, asynchronous=True, scheduler_sync_interval="1ms" ) as cluster: assert cluster.cluster_info["name"] == cluster.name - await asyncio.sleep(0.005) + await asyncio.sleep(0.01) info = await cluster.scheduler_comm.get_metadata(keys=["cluster-manager-info"]) assert info["name"] == cluster.name info = cluster.scheduler.get_metadata(keys=["cluster-manager-info"]) assert info["name"] == cluster.name cluster.cluster_info["foo"] = "bar" - await asyncio.sleep(0.005) + await asyncio.sleep(0.01) assert "foo" in cluster.scheduler.get_metadata(keys=["cluster-manager-info"]) From 367b1855faab659ed07da01ba58206d280d85c4c Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 25 Aug 2021 14:38:01 +0100 Subject: [PATCH 10/11] Poke CI From 933b0d1897a5febc5004b3919b95245ccf95a83b Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 26 Aug 2021 10:30:07 +0100 Subject: [PATCH 11/11] Make cluster_info provate and make tests more robust --- distributed/deploy/cluster.py | 23 +++++++---------------- distributed/deploy/tests/test_local.py | 20 +++++++++++++++----- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 71a4de12aa5..8beef70f6ba 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -10,20 +10,12 @@ from tornado.ioloop import PeriodicCallback import dask.config -from dask.utils import _deprecated, format_bytes, parse_timedelta +from dask.utils import _deprecated, format_bytes, parse_timedelta, typename from dask.widgets import get_template from ..core import Status from ..objects import SchedulerInfo -from ..utils import ( - Log, - Logs, - format_dashboard_link, - log_errors, - sync, - thread_state, - typename, -) +from ..utils import Log, Logs, format_dashboard_link, log_errors, sync, thread_state from .adaptive import Adaptive logger = logging.getLogger(__name__) @@ -53,7 +45,6 @@ class Cluster: """ _supports_scaling = True - name = None def __init__(self, asynchronous, quiet=False, name=None, scheduler_sync_interval=1): self.scheduler_info = {"workers": {}} @@ -72,16 +63,16 @@ def __init__(self, asynchronous, quiet=False, name=None, scheduler_sync_interval if name is None: name = str(uuid.uuid4())[:8] - self.cluster_info = {"name": name, "type": typename(type(self))} + self._cluster_info = {"name": name, "type": typename(type(self))} self.status = Status.created @property def name(self): - return self.cluster_info["name"] + return self._cluster_info["name"] @name.setter def name(self, name): - self.cluster_info["name"] = name + self._cluster_info["name"] = name async def _start(self): comm = await self.scheduler_comm.live_comm() @@ -95,7 +86,7 @@ async def _start(self): info = await self.scheduler_comm.get_metadata( keys=["cluster-manager-info"], default={} ) - self.cluster_info.update(info) + self._cluster_info.update(info) self.periodic_callbacks["sync-cluster-info"] = PeriodicCallback( self._sync_cluster_info, self._sync_interval * 1000 @@ -107,7 +98,7 @@ async def _start(self): async def _sync_cluster_info(self): await self.scheduler_comm.set_metadata( keys=["cluster-manager-info"], - value=copy.copy(self.cluster_info), + value=copy.copy(self._cluster_info), ) async def _close(self): diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index 846d64fd7ed..3f77bd46df9 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -1080,13 +1080,23 @@ async def test_cluster_info_sync(): async with LocalCluster( processes=False, asynchronous=True, scheduler_sync_interval="1ms" ) as cluster: - assert cluster.cluster_info["name"] == cluster.name - await asyncio.sleep(0.01) + assert cluster._cluster_info["name"] == cluster.name + + while "name" not in cluster.scheduler.get_metadata( + keys=["cluster-manager-info"] + ): + await asyncio.sleep(0.01) + info = await cluster.scheduler_comm.get_metadata(keys=["cluster-manager-info"]) assert info["name"] == cluster.name info = cluster.scheduler.get_metadata(keys=["cluster-manager-info"]) assert info["name"] == cluster.name - cluster.cluster_info["foo"] = "bar" - await asyncio.sleep(0.01) - assert "foo" in cluster.scheduler.get_metadata(keys=["cluster-manager-info"]) + cluster._cluster_info["foo"] = "bar" + while "foo" not in cluster.scheduler.get_metadata( + keys=["cluster-manager-info"] + ): + await asyncio.sleep(0.01) + + info = cluster.scheduler.get_metadata(keys=["cluster-manager-info"]) + assert info["foo"] == "bar"