Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async dask operation only finishing when I change things in the UI. #4235

Closed
MarcSkovMadsen opened this issue Dec 23, 2022 · 1 comment
Closed
Labels
type: bug Something isn't correct or isn't working

Comments

@MarcSkovMadsen
Copy link
Collaborator

MarcSkovMadsen commented Dec 23, 2022

I'm trying to create a How to guide for Panel+Dask in #4234.

I have created an async example. The problem is that the await operation only finishes after a very long time or very quickly if I drag an independent slider.

tasks.py

import time
import numpy as np
import logging
from datetime import datetime as dt


def blocking_computation(x: float) -> float:
    start = dt.now()
    print("starting blocking_computation", start)
    samples = []
    for _ in range(1000):
        time.sleep(0.001)
        samples.append(np.random.normal(loc=1.0, scale=1.0))
    result = x + int(np.ceil(np.mean(samples)))
    end = dt.now()
    print("ending blocking_computation", end)
    print("duration", (end-start))
    return result

app.py

from datetime import datetime as dt

import param

from dask.distributed import Client
from tasks import blocking_computation

import panel as pn

pn.extension()

@pn.cache
def get_client():
    return Client()

get_client() # For some unknown reason this is needed. Otherwise we get ModuleNotFoundError: No module named 'panel_dask_cluster'

async def submit(func, *args, **kwargs):
    client = get_client()
    future = client.submit(func, *args, **kwargs)
    return await client.gather(future, asynchronous=True)

float_input = pn.widgets.FloatInput(value=0, name="Input")
submit_button = pn.widgets.Button(name="Run in background", button_type="primary")
other_widget = pn.widgets.IntSlider(value=0, start=0, end=10, name="Non blocked slider")
pn.Column(float_input, submit_button, other_widget, other_widget.param.value).servable()

def start():
    submit_button.disabled=True
    float_input.disabled=True
    float_input.loading=True
    

def stop():
    float_input.disabled=False
    float_input.loading=False
    submit_button.disabled=False

@pn.depends(submit_button, watch=True)
async def run(_):
    start()
    s1 = dt.now()
    print("stop", s1)
    float_input.value = await submit(blocking_computation, float_input.value)
    s2 = dt.now()
    print("stop", s2)
    print("total duration", (s2-s1))
    stop()
panel serve app.py

You can see in the video that the async operation finishes after ~10secs. Most of the time (~9secs) is spent after the heavy_computation run by dask has finished. If I drag the slider you can see it finishing after ~1sec.

strange_panel_dask.mp4
@MarcSkovMadsen MarcSkovMadsen added the type: bug Something isn't correct or isn't working label Dec 23, 2022
@MarcSkovMadsen
Copy link
Collaborator Author

MarcSkovMadsen commented Dec 24, 2022

I found out that if I start the Dask server in a separate python file/process then I don't experience this. I also experience other issues when starting the server from the same file as the panel serve.

I believe we should just instruct Panel+Dask users to seperate the things. This will make things simpler.

I close this one.

For reference this is the solution

# app.py
from datetime import datetime as dt
import asyncio
import param

from dask.distributed import Client
from tasks import blocking_computation

import panel as pn

pn.extension()

DASK_ADDRESS = "tcp://127.0.0.1:64719"

async def get_client():
    if not "dask-client" in pn.state.cache:
        pn.state.cache["dask-client"]=await Client(DASK_ADDRESS, asynchronous=True)
    return pn.state.cache["dask-client"]

async def submit(func, *args, **kwargs):
    client = await get_client()
    return await client.submit(func, *args, **kwargs)

float_input = pn.widgets.FloatInput(value=0, name="Input")
submit_button = pn.widgets.Button(name="Run in background", button_type="primary")
other_widget = pn.widgets.IntSlider(value=0, start=0, end=10, name="Non blocked slider")
pn.Column(float_input, submit_button, other_widget, other_widget.param.value).servable()

def start():
    submit_button.disabled=True
    float_input.disabled=True
    float_input.loading=True
    

def stop():
    float_input.disabled=False
    float_input.loading=False
    submit_button.disabled=False

@pn.depends(submit_button, watch=True)
async def run(_):
    start()
    s1 = dt.now()
    print("stop", s1)
    float_input.value = await submit(blocking_computation, float_input.value)
    s2 = dt.now()
    print("stop", s2)
    print("total duration", (s2-s1))
    stop()
# tasks.py
import time
import numpy as np
import logging
from datetime import datetime as dt
from dask.distributed import LocalCluster

SCHEDULER_PORT = 64719

def blocking_computation(x: float) -> float:
    start = dt.now()
    print("starting blocking_computation", start)
    samples = []
    for _ in range(1000):
        time.sleep(0.001)
        samples.append(np.random.normal(loc=1.0, scale=1.0))
    result = x + int(np.ceil(np.mean(samples)))
    end = dt.now()
    print("ending blocking_computation", end)
    print("duration", (end-start))
    return result

if __name__ == '__main__':
    cluster = LocalCluster(scheduler_port=SCHEDULER_PORT, n_workers=4)
    print(cluster.scheduler_address)
    input()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug Something isn't correct or isn't working
Projects
None yet
Development

No branches or pull requests

1 participant