Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose scheduler idle via RPC and HTTP API #7642

Merged
merged 2 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions distributed/http/scheduler/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,26 @@ def get(self):
self.write(json.dumps({"Error": "Internal Server Error"}))


class CheckIdleHandler(RequestHandler):
def get(self):
self.set_header("Content-Type", "application/json")
scheduler = self.server
try:
idle_since = scheduler.check_idle()
response = {
"idle": idle_since is None,
"idle_since": idle_since,
}
self.write(json.dumps(response))
except Exception as e:
self.set_status(500, str(e))
self.write(json.dumps({"Error": "Internal Server Error"}))


routes: list[tuple] = [
("/api/v1", APIHandler, {}),
("/api/v1/retire_workers", RetireWorkersHandler, {}),
("/api/v1/get_workers", GetWorkersHandler, {}),
("/api/v1/adaptive_target", AdaptiveTargetHandler, {}),
("/api/v1/check_idle", CheckIdleHandler, {}),
]
25 changes: 25 additions & 0 deletions distributed/http/scheduler/tests/test_scheduler_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,3 +481,28 @@ async def test_adaptive_target(c, s, a, b):
assert resp.headers["Content-Type"] == "application/json"
num_workers = json.loads(await resp.text())["workers"]
assert num_workers == 0


@gen_cluster(
client=True,
clean_kwargs={"threads": False},
config={
"distributed.scheduler.http.routes": DEFAULT_ROUTES
+ ["distributed.http.scheduler.api"]
},
)
async def test_check_idle(c, s, a, b):
aiohttp = pytest.importorskip("aiohttp")

async with aiohttp.ClientSession() as session:
async with session.get(
"http://localhost:%d/api/v1/check_idle" % s.http_server.port
) as resp:
assert resp.status == 200
assert resp.headers["Content-Type"] == "application/json"
response = json.loads(await resp.text())
assert isinstance(response["idle"], bool)
assert (
isinstance(response["idle_since"], int)
or response["idle_since"] is None
)
32 changes: 17 additions & 15 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3643,6 +3643,7 @@ def __init__(
"dump_cluster_state_to_url": self.dump_cluster_state_to_url,
"benchmark_hardware": self.benchmark_hardware,
"get_story": self.get_story,
"check_idle": self.check_idle,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jacobtomlinson just curious, where is this RPC used? I don't see a corresponding client method, or use on workers. Are you planning to call it in dask-k8s?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah using it in dask-kubernetes, but only as a fallback if the HTTP API is not enabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I actually missed this. I think "best practice" should be to register this in dask-k8s with a scheduler extension. If I just had a look at this code base, it would look like dead code and I might remove it.

We're rarely doing these refactorings but there is nothing here that tells us what is actually "public" and what isn't so there is always a risk

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback of the fallback is to register a scheduler extension so that we can support older versions of distributed. My worry is that scheduler extensions still depend on scheduler internals like Scheduler.idle_since which could also change.

There are tests in dask-kubernetes that cover this, so in terms of risk there is some mitigation. But it would be really nice to try and move towards a world where we have well defined public APIs in terms of the scheduler object (for extensions), the RPC and the HTTP API. Otherwise it is always going to be risky developing against the scheduler.

We could also expose this via the client and treat that as the public API instead of prodding the RPC directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are tests in dask-kubernetes that cover this, so in terms of risk there is some mitigation. But it would be really nice to try and move towards a world where we have well defined public APIs in terms of the scheduler object (for extensions), the RPC and the HTTP API. Otherwise it is always going to be risky developing against the scheduler.

This is a much bigger thing that I want it to be but yes I would love to see nothing of the actual Scheduler object to be public. It is just too big, there are too many small things, it's changing too quickly, ...

We could also expose this via the client and treat that as the public API instead of prodding the RPC directly.

That's typically something I feel very comfortable with since it is very explicit that this is intended for external usage.


Just to be very clear: It's fine keeping it like this. "Fixing" this API leakage is very hard. I'm open to any suggestions but handing users (both in extension but especially in plugins) the entire object isn't maintainable, particularly not as long as there is no proper discipline around "using underscores"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a much bigger thing that I want it to be but yes I would love to see nothing of the actual Scheduler object to be public.

I totally sympathise with that. Although scheduler plugins/extensions/run_on_scheduler are all well established patterns. So backing out from those at this point will be hard.

That's typically something I feel very comfortable with since it is very explicit that this is intended for external usage.

Most, if not all, of the cluster managers open an RPC to the scheduler and invoke methods directly. I think part of the challenge here is that some cluster managers live in distributed and some live in dask/dask{kubernetes,jobqueue,yarn,gateway,cloudprovider,etc}.

For a long time the dask-foo projects were considered part of core Dask and it wasn't unusual to make a PR to both distributed and the cluster manager you were working on which were coupled. However this is a maintenance challenge because tests for code in distributed run in other repos.

Perhaps a good step forward would be for dask-foo projects to only interact with the scheduler via the Client and the HTTP API. These are API surfaces that we can consider to be public.

The question is what do we do with the base Cluster and SpecCluster classes in distributed? These both use the RPC directly and are intended to be subclassed by third-party libraries. Maybe these classes should be updated to stop using the RPC and to use a Client instead?

}

connection_limit = get_fileno_limit() / 2
Expand Down Expand Up @@ -3675,9 +3676,8 @@ def __init__(
pc = PeriodicCallback(self.check_worker_ttl, self.worker_ttl * 1000)
self.periodic_callbacks["worker-ttl"] = pc

if self.idle_timeout:
pc = PeriodicCallback(self.check_idle, self.idle_timeout * 1000 / 4)
self.periodic_callbacks["idle-timeout"] = pc
pc = PeriodicCallback(self.check_idle, (self.idle_timeout or 1) * 1000 / 4)
self.periodic_callbacks["idle-timeout"] = pc

if extensions is None:
extensions = DEFAULT_EXTENSIONS.copy()
Expand Down Expand Up @@ -7708,34 +7708,36 @@ async def check_worker_ttl(self):
)
await self.remove_worker(address=ws.address, stimulus_id=stimulus_id)

def check_idle(self):
assert self.idle_timeout
def check_idle(self) -> int | None:
if self.status in (Status.closing, Status.closed):
return
return None

if self.transition_counter != self._idle_transition_counter:
self._idle_transition_counter = self.transition_counter
self.idle_since = None
return
return None

if (
self.queued
or self.unrunnable
or any([ws.processing for ws in self.workers.values()])
):
self.idle_since = None
return
return None

if not self.idle_since:
self.idle_since = time()
return self.idle_since

if time() > self.idle_since + self.idle_timeout:
assert self.idle_since
logger.info(
"Scheduler closing after being idle for %s",
format_time(self.idle_timeout),
)
self._ongoing_background_tasks.call_soon(self.close)
if self.idle_timeout:
if time() > self.idle_since + self.idle_timeout:
assert self.idle_since
logger.info(
"Scheduler closing after being idle for %s",
format_time(self.idle_timeout),
)
self._ongoing_background_tasks.call_soon(self.close)
return None

def adaptive_target(self, target_duration=None):
"""Desired number of workers based on the current workload
Expand Down
20 changes: 10 additions & 10 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2288,6 +2288,8 @@ async def test_idle_timeout(c, s, a, b):
pc.start()
await future
assert s.idle_since is None or s.idle_since > beginning
_idle_since = s.check_idle()
assert _idle_since == s.idle_since

with captured_logger("distributed.scheduler") as logs:
start = time()
Expand All @@ -2312,36 +2314,34 @@ async def test_idle_timeout(c, s, a, b):
nthreads=[],
)
async def test_idle_timeout_no_workers(c, s):
# Cancel the idle check periodic timeout so we can step through manually
s.periodic_callbacks["idle-timeout"].stop()

s.idle_timeout = 0.1
future = c.submit(inc, 1)
while not s.tasks:
await asyncio.sleep(0.1)

s.check_idle()
assert not s.idle_since
assert not s.check_idle()

for _ in range(10):
await asyncio.sleep(0.01)
s.check_idle()
assert not s.idle_since
assert not s.check_idle()
assert s.tasks

async with Worker(s.address):
await future
s.check_idle()
assert not s.idle_since
assert not s.check_idle()
del future

while s.tasks:
await asyncio.sleep(0.1)

# We only set idleness once nothing happened between two consecutive
# check_idle calls
s.check_idle()
assert not s.idle_since
assert not s.check_idle()

s.check_idle()
assert s.idle_since
assert s.check_idle()


@gen_cluster(client=True, config={"distributed.scheduler.bandwidth": "100 GB"})
Expand Down