Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outdated and/or unclear documentation about SchedulerPlugin #8719

Open
RaphaelRobidas opened this issue Jun 23, 2024 · 2 comments · May be fixed by #8729
Open

Outdated and/or unclear documentation about SchedulerPlugin #8719

RaphaelRobidas opened this issue Jun 23, 2024 · 2 comments · May be fixed by #8729
Labels
documentation Improve or add to documentation enhancement Improve existing functionality or make things work better

Comments

@RaphaelRobidas
Copy link

Describe the issue:
The current documentation regarding SchedulerPlugin with full task state access gives the following code as example:

from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         # Get full TaskState
         ts = self.scheduler.tasks[key]

@click.command()
def dask_setup(scheduler):
    plugin = MyPlugin(scheduler)
    scheduler.add_plugin(plugin)

The dask_setup function seems to appear out of no-where and click isn't imported. Moreover, this approach does not seem to correctly register the plugin. For example, the follow code runs successfully:

import click
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed import Client, LocalCluster

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         ts = self.scheduler.tasks[key]
         raise Exception

@click.command()
def dask_setup(scheduler):
    plugin = MyPlugin(scheduler)
    scheduler.add_plugin(plugin)

def job(i):
    return i*2

if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)
    ret = client.submit(job, 1).result()
    print(ret)

The console log reads "2", while we would expect an Exception to be raised.

It is unclear how to register a scheduler plugin that can access the full task state. client.scheduler does not seem to be serializable, so the following code does not work and crashes with the error TypeError: cannot pickle 'coroutine' object:

from distributed.diagnostics.plugin import SchedulerPlugin
from distributed import Client, LocalCluster

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         ts = self.scheduler.tasks[key]
         raise Exception

def job(i):
    return i*2

if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)

    plugin = MyPlugin(client.scheduler)
    client.register_scheduler_plugin(plugin)

    ret = client.submit(job, 1).result()
    print(ret)

Environment:

  • Dask version: 2024.4.2
  • Python version: 3.10.12
  • Operating System: Linux Mint 21.2
  • Install method (conda, pip, source): pip
@hendrikmakait
Copy link
Member

hendrikmakait commented Jun 24, 2024

@RaphaelRobidas: Thanks for reporting this issue. There are several things going on here, so let me try to unpack them:

The current documentation regarding SchedulerPlugin with full task state access gives the following code as example:

from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         # Get full TaskState
         ts = self.scheduler.tasks[key]

@click.command()
def dask_setup(scheduler):
    plugin = MyPlugin(scheduler)
    scheduler.add_plugin(plugin)

The dask_setup function seems to appear out of no-where and click isn't imported.

This is a good catch, the documentation is a bit unclear here. Note that the RabbitMQ Example contains this line:

Run with: dask scheduler --preload <filename.py>

From what I can tell, the example you mention copies that pattern, but we improve it by importing click and making it clear how to run this (e.g., by simply copying the line I quotes above). Would you be interested in contributing a PR for this?

Moreover, this approach does not seem to correctly register the plugin. For example, the follow code runs successfully:

import click
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed import Client, LocalCluster

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         ts = self.scheduler.tasks[key]
         raise Exception

@click.command()
def dask_setup(scheduler):
    plugin = MyPlugin(scheduler)
    scheduler.add_plugin(plugin)

def job(i):
    return i*2

if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)
    ret = client.submit(job, 1).result()
    print(ret)

The console log reads "2", while we would expect an Exception to be raised.

Your script does indeed not register the scheduler plugin, which is why you don't see an exception being logged.

As an aside, raising an exception within Scheduler.transition does not raise an exception on the client but instead logs the exception on the scheduler:

try:
plugin.transition(
key, start, actual_finish, stimulus_id=stimulus_id, **kwargs
)
except Exception:
logger.info("Plugin failed with exception", exc_info=True)

This is by design to avoid scheduler corruption by faulty user code.

It is unclear how to register a scheduler plugin that can access the full task state. client.scheduler does not seem to be serializable, so the following code does not work and crashes with the error TypeError: cannot pickle 'coroutine' object:

from distributed.diagnostics.plugin import SchedulerPlugin
from distributed import Client, LocalCluster

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         ts = self.scheduler.tasks[key]
         raise Exception

def job(i):
    return i*2

if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)

    plugin = MyPlugin(client.scheduler)
    client.register_scheduler_plugin(plugin)

    ret = client.submit(job, 1).result()
    print(ret)

There are two ways to register a plugin: With a scheduler preload script (which is what the example you mention in the beginning does) and via Client.register_plugin. Both work slightly differently. For example, the dask_setup function in the preload script would hand you a Scheduler object, whereas Client.scheduler is merely an RPC object that you can use to call remote procedures on the scheduler. For Client.register_plugin, SchedulerPlugin.start()will get called upon registration on the scheduler and this would hand you aScheduler` object as well. This adds to the confusion here. I've added a ticket aimed at improving our documentation around this here: #8721

@hendrikmakait hendrikmakait added enhancement Improve existing functionality or make things work better documentation Improve or add to documentation and removed needs triage labels Jun 24, 2024
RaphaelRobidas added a commit to RaphaelRobidas/distributed that referenced this issue Jun 24, 2024
@RaphaelRobidas
Copy link
Author

@hendrikmakait Thanks for the detailed reply, see PR #8729!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improve or add to documentation enhancement Improve existing functionality or make things work better
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants