diff --git a/distributed/client.py b/distributed/client.py index 24c4b271d1..82474fb762 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -4418,16 +4418,14 @@ async def _get_task_stream( else: return msgs - async def _register_scheduler_plugin(self, plugin, name, **kwargs): - if isinstance(plugin, type): - plugin = plugin(**kwargs) - + async def _register_scheduler_plugin(self, plugin, name, idempotent=False): return await self.scheduler.register_scheduler_plugin( plugin=dumps(plugin, protocol=4), name=name, + idempotent=idempotent, ) - def register_scheduler_plugin(self, plugin, name=None): + def register_scheduler_plugin(self, plugin, name=None, idempotent=False): """Register a scheduler plugin. See https://distributed.readthedocs.io/en/latest/plugins.html#scheduler-plugins @@ -4439,6 +4437,8 @@ def register_scheduler_plugin(self, plugin, name=None): name : str Name for the plugin; if None, a name is taken from the plugin instance or automatically generated if not present. + idempotent : bool + Do not re-register if a plugin of the given name already exists. """ if name is None: name = _get_plugin_name(plugin) @@ -4447,6 +4447,7 @@ def register_scheduler_plugin(self, plugin, name=None): self._register_scheduler_plugin, plugin=plugin, name=name, + idempotent=idempotent, ) def register_worker_callbacks(self, setup=None):