-
-
Notifications
You must be signed in to change notification settings - Fork 524
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dynamically scaling Panel #4183
Comments
Have you tried |
It does not work as one would want it to actually due to https://stackoverflow.com/a/63849068/7398517. Therefore a better load-balancing solution any way is to use https://docs.bokeh.org/en/latest/docs/user_guide/server/deploy.html#load-balancing instead. However, these options limit you in terms of optimal resource allocation. If you use too many, you may be consuming a lot of resources, mostly idle and too few, which would not help balance the load. Ideally, I would want to scale things as and when the load increases dynamically. Maybe use some standard request-response designs or something of that sort. |
I would also very much like a way to scale panel apps dynamically for the same reasons mentioned by @govinda18. It would dramatically improve both the developper and user experience for production apps which are likely to be simultaneously accessed by several clients. Threading using By the way the panel docs section mentioning |
Could you propose some language to clarify this? |
@philippjfr |
That would be very much appreciated! |
@govinda18 & @TheoMathurin I have cobbled together a solution that uses Below is the output, which has a computation that takes roughly 10 seconds to compute (see the code at the end). The top cell output begins its execution and 10s later produces the result. The bottom cell starts a few seconds after the top cell, but still returns the result 10s later. panel-scaling-example.webmI was struggling to get the callbacks correct until I stumbled across this discourse post https://discourse.holoviz.org/t/panel-webapp-with-dask-bokeh-tornado-and-asynchronous-tasks/2388/5. The post outlines the need for periodic callbacks in order for To run the code, have both files in the same directory and run code# cluster.py
#
# NOTE: However you start your cluster, it needs to have access to the
# `blocking_computation` method. If you get an error that says `dask` cannot
# pickle the blocking computation, then change your `PYTHONPATH` to include the
# directory where this file lives.
#
# Ensure this file is in the sample place as the `dask_example.py` file.
import time
import numpy as np
from dask.distributed import LocalCluster
def blocking_computation(x: float) -> float:
samples = []
for _ in range(1000):
time.sleep(0.01)
samples.append(np.random.normal(loc=1.0, scale=1.0))
return x + int(np.ceil(np.mean(samples)))
if __name__ == "__main__":
cluster = LocalCluster(scheduler_port="8786")
print(cluster.scheduler_address)
input() # dask_example.py
from __future__ import annotations
from datetime import datetime as dt
import panel as pn
import param
from dask.distributed import Client
from cluster import blocking_computation
pn.extension()
class DaskExample(param.Parameterized):
input_ = param.Parameter(default=0.0, label="Input")
output = param.Parameter(default=1, label="Input + 1")
start = param.String(default="", label="Start time")
end = param.String(default="", label="End time")
compute = param.Event(label="Compute")
def __init__(
self: DaskExample,
refresh_rate: int = 500,
**params,
) -> None:
super().__init__(**params)
self.futures = []
self.client = Client()
self.refresh_rate = refresh_rate
self.computing = pn.indicators.LoadingSpinner(
value=False,
width=100,
height=100,
)
self.param.watch(self.create_futures, ["compute"])
def start_refreshing(self: DaskExample) -> None:
pn.state.add_periodic_callback(self.compute_futures, self.refresh_rate)
def compute_futures(self: DaskExample, *args) -> None:
if self.futures:
for future, callback in self.futures:
if future.status == "finished":
callback(future)
self.futures.remove((future, callback))
def callback(self: DaskExample, future) -> None:
self.output = future.result()
self.update_end_time()
self.computing.value = False
def create_futures(self: DaskExample, *args):
self.update_start_time()
self.computing.value = True
future = self.client.submit(blocking_computation, self.input_)
self.futures.append((future, self.callback))
def update_start_time(self: DaskExample) -> None:
start = dt.now()
self.start = start.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
self.end = ""
def update_end_time(self: DaskExample) -> None:
end = dt.now()
self.end = end.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
example = DaskExample()
pn.state.onload(example.start_refreshing)
pn.Row(example.param, example.computing).servable() |
I am not experienced in Dask or Async but it seems to me, that it should be possible to combine and would make the client code simpler. I really want to learn about using Panel and Dask in combination so I've made a feature request here #4233 |
For example this I believe is simpler and closer to something I would use. From my experiments it adds ~2x0,1sec for transfering data between Panel Server and Dask cluster. cluster.py # cluster.py
from dask.distributed import LocalCluster
SCHEDULER_PORT = 64719
if __name__ == '__main__':
cluster = LocalCluster(scheduler_port=SCHEDULER_PORT, n_workers=4)
print(cluster.scheduler_address)
input() tasks.py import time
from datetime import datetime as dt
import numpy as np
def blocking_computation(x: float) -> float:
start = dt.now()
print(start.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3])
samples = []
for _ in range(1000):
time.sleep(0.01)
samples.append(np.random.normal(loc=1.0, scale=1.0))
end = dt.now()
print(end.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3])
return x + int(np.ceil(np.mean(samples))) app.py # app.py
from __future__ import annotations
import asyncio
from datetime import datetime as dt
import panel as pn
import param
from dask.distributed import Client
from tasks import blocking_computation
pn.extension(sizing_mode="stretch_width")
DASK_ADDRESS = "tcp://127.0.0.1:64719"
async def get_client():
if not "dask-client" in pn.state.cache:
pn.state.cache["dask-client"]=await Client(DASK_ADDRESS, asynchronous=True)
return pn.state.cache["dask-client"]
async def submit(func, *args, **kwargs):
client = await get_client()
return await client.submit(func, *args, **kwargs)
class DaskExample(param.Parameterized):
input_ = param.Parameter(default=0.0, label="Input")
output = param.Parameter(default=1, label="Input + 1")
start = param.String(default="", label="Start time")
end = param.String(default="", label="End time")
compute = param.Event(label="Compute")
def __init__(
self: DaskExample,
refresh_rate: int = 500,
**params,
) -> None:
super().__init__(**params)
self.computing = pn.indicators.LoadingSpinner(
value=False,
width=100,
height=100,
)
@pn.depends("compute", watch=True)
async def create_futures(self: DaskExample, *args):
self.update_start_time()
self.output = await submit(blocking_computation, self.input_)
self.update_end_time()
def update_start_time(self: DaskExample) -> None:
self.param.compute.constant=True
self.computing.value = True
start = dt.now()
self.start = start.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
print(self.start)
self.end = ""
def update_end_time(self: DaskExample) -> None:
end = dt.now()
self.end = end.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
print(self.end)
self.computing.value = False
self.param.compute.constant=False
slider = pn.widgets.IntSlider(value=0, start=0, end=10)
example = DaskExample()
pn.template.FastListTemplate(
main = [
pn.Column("## Offload asynchronously to Dask", example.param, example.computing),
pn.Column("## Not blocked", slider, slider.param.value),
],
site="Awesome Panel",
title="Offload compute intensive tasks to Dask cluster",
).servable() python cluster.py panel serve app.py --autoreload dask-offload.mp4 |
I've updated the example above. After some experiments it turned out that it will avoid a lot of issues if you start the Dask cluster separately from the client. That is why I created the I don't know how to set |
Thanks @MarcSkovMadsen, this really looks like a good lead. Just a comment: the sleep operation is not CPU intensive and does not hold the GIL. Therefore the slider value update would work just by having a threaded panel server. Presumably it's not enabled though, so what we are seeing is really the effect of having a separate dask cluster. A tenth of a second is acceptable overhead I'd say although we would have to see how this scales. In any case, that's nice! |
After some reading and experimentation I've found.
See #4239. My reflection is that if the overhead was close to 1msec then you could/ should almost always use Dask as the execution engine. That would potentially make Panel scale really, really well. |
Thanks a lot @ndmlny-qs and @MarcSkovMadsen for the work here. Apologies for the delayed response as I was on a vacation last week. Some thoughts from the first read:
From what I could gather, it is a dask specific resistance. I have not worked closely with dask but I know for one that celery provides a way to add queues dynamically. What I mean here is that dynamically scaling workers should probably be an implementation at the developer's end as long as we have a solution/guideline flexible enough to be able to plugin other techs into it as well. Will spend some time playing around as well. If you guys have any thoughts, feel free to add. |
Hi @govinda18 Looking forward to see the results from the Celery experimentation. I think we need some more examples and knowledge on making Panel really scale by outsourcing the hard work to Dask, Celery, and Databases like Postgres, SQLlite, DuckDB etc. |
Is your feature request related to a problem? Please describe.
Currently two users simultaneously using a panel app would block each other. One workaround to this is to use num-threads but this however is not the most elegant solution as it requires the developer to be cautious with handling GIL. Logging this request to explore solutions on dynamically scaling a panel server as more users join.
Describe the solution you'd like
One idea is to have several workers at the server's disposal and events can be redirected to the workers. This requires a mechanism to appropriately store state such that the tasks assigned to workers can be uniquely identified with the clients that they are associated to.
Describe alternatives you've considered
One of the solutions using static scaling is to make multiple panel servers sit behind an nginx load balancer. This is what we are currently using.
A simple demo for the issue:
The text was updated successfully, but these errors were encountered: