Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding support for async callbacks and page layouts #3089

Open
wants to merge 15 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 239 additions & 1 deletion dash/_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from functools import wraps
from typing import Callable, Optional, Any

import asyncio
import flask

from .dependencies import (
Expand Down Expand Up @@ -39,6 +40,16 @@
from ._callback_context import context_value


async def _async_invoke_callback(
func, *args, **kwargs
): # used to mark the frame for the debugger
# Check if the function is a coroutine function
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs) # %% callback invoked %%
# If the function is not a coroutine, call it directly
return func(*args, **kwargs) # %% callback invoked %%


def _invoke_callback(func, *args, **kwargs): # used to mark the frame for the debugger
return func(*args, **kwargs) # %% callback invoked %%

Expand Down Expand Up @@ -353,6 +364,230 @@ def wrap_func(func):
)

@wraps(func)
async def async_add_context(*args, **kwargs):
output_spec = kwargs.pop("outputs_list")
app_callback_manager = kwargs.pop("long_callback_manager", None)

callback_ctx = kwargs.pop(
"callback_context", AttributeDict({"updated_props": {}})
)
app = kwargs.pop("app", None)
callback_manager = long and long.get("manager", app_callback_manager)
error_handler = on_error or kwargs.pop("app_on_error", None)
original_packages = set(ComponentRegistry.registry)

if has_output:
_validate.validate_output_spec(insert_output, output_spec, Output)

context_value.set(callback_ctx)

func_args, func_kwargs = _validate.validate_and_group_input_args(
args, inputs_state_indices
)

response: dict = {"multi": True}
has_update = False

if long is not None:
if not callback_manager:
raise MissingLongCallbackManagerError(
"Running `long` callbacks requires a manager to be installed.\n"
"Available managers:\n"
"- Diskcache (`pip install dash[diskcache]`) to run callbacks in a separate Process"
" and store results on the local filesystem.\n"
"- Celery (`pip install dash[celery]`) to run callbacks in a celery worker"
" and store results on redis.\n"
)

progress_outputs = long.get("progress")
cache_key = flask.request.args.get("cacheKey")
job_id = flask.request.args.get("job")
old_job = flask.request.args.getlist("oldJob")

current_key = callback_manager.build_cache_key(
func,
# Inputs provided as dict is kwargs.
func_args if func_args else func_kwargs,
long.get("cache_args_to_ignore", []),
)

if old_job:
for job in old_job:
callback_manager.terminate_job(job)

if not cache_key:
cache_key = current_key

job_fn = callback_manager.func_registry.get(long_key)

ctx_value = AttributeDict(**context_value.get())
ctx_value.ignore_register_page = True
ctx_value.pop("background_callback_manager")
ctx_value.pop("dash_response")

job = callback_manager.call_job_fn(
cache_key,
job_fn,
func_args if func_args else func_kwargs,
ctx_value,
)

data = {
"cacheKey": cache_key,
"job": job,
}

cancel = long.get("cancel")
if cancel:
data["cancel"] = cancel

progress_default = long.get("progressDefault")
if progress_default:
data["progressDefault"] = {
str(o): x
for o, x in zip(progress_outputs, progress_default)
}
return to_json(data)
if progress_outputs:
# Get the progress before the result as it would be erased after the results.
progress = callback_manager.get_progress(cache_key)
if progress:
response["progress"] = {
str(x): progress[i] for i, x in enumerate(progress_outputs)
}

output_value = callback_manager.get_result(cache_key, job_id)
# Must get job_running after get_result since get_results terminates it.
job_running = callback_manager.job_running(job_id)
if not job_running and output_value is callback_manager.UNDEFINED:
# Job canceled -> no output to close the loop.
output_value = NoUpdate()

elif (
isinstance(output_value, dict)
and "long_callback_error" in output_value
):
error = output_value.get("long_callback_error", {})
exc = LongCallbackError(
f"An error occurred inside a long callback: {error['msg']}\n{error['tb']}"
)
if error_handler:
output_value = error_handler(exc)

if output_value is None:
output_value = NoUpdate()
# set_props from the error handler uses the original ctx
# instead of manager.get_updated_props since it runs in the
# request process.
has_update = (
_set_side_update(callback_ctx, response)
or output_value is not None
)
else:
raise exc

if job_running and output_value is not callback_manager.UNDEFINED:
# cached results.
callback_manager.terminate_job(job_id)

if multi and isinstance(output_value, (list, tuple)):
output_value = [
NoUpdate() if NoUpdate.is_no_update(r) else r
for r in output_value
]
updated_props = callback_manager.get_updated_props(cache_key)
if len(updated_props) > 0:
response["sideUpdate"] = updated_props
has_update = True

if output_value is callback_manager.UNDEFINED:
return to_json(response)
else:
try:
output_value = await _async_invoke_callback(
func, *func_args, **func_kwargs
)
except PreventUpdate as err:
raise err
except Exception as err: # pylint: disable=broad-exception-caught
if error_handler:
output_value = error_handler(err)

# If the error returns nothing, automatically puts NoUpdate for response.
if output_value is None and has_output:
output_value = NoUpdate()
else:
raise err

component_ids = collections.defaultdict(dict)

if has_output:
if not multi:
output_value, output_spec = [output_value], [output_spec]
flat_output_values = output_value
else:
if isinstance(output_value, (list, tuple)):
# For multi-output, allow top-level collection to be
# list or tuple
output_value = list(output_value)

if NoUpdate.is_no_update(output_value):
flat_output_values = [output_value]
else:
# Flatten grouping and validate grouping structure
flat_output_values = flatten_grouping(output_value, output)

if not NoUpdate.is_no_update(output_value):
_validate.validate_multi_return(
output_spec, flat_output_values, callback_id
)

for val, spec in zip(flat_output_values, output_spec):
if NoUpdate.is_no_update(val):
continue
for vali, speci in (
zip(val, spec) if isinstance(spec, list) else [[val, spec]]
):
if not NoUpdate.is_no_update(vali):
has_update = True
id_str = stringify_id(speci["id"])
prop = clean_property_name(speci["property"])
component_ids[id_str][prop] = vali
else:
if output_value is not None:
raise InvalidCallbackReturnValue(
f"No-output callback received return value: {output_value}"
)
output_value = []
flat_output_values = []

if not long:
has_update = _set_side_update(callback_ctx, response) or has_update

if not has_update:
raise PreventUpdate

response["response"] = component_ids

if len(ComponentRegistry.registry) != len(original_packages):
diff_packages = list(
set(ComponentRegistry.registry).difference(original_packages)
)
if not allow_dynamic_callbacks:
raise ImportedInsideCallbackError(
f"Component librar{'y' if len(diff_packages) == 1 else 'ies'} was imported during callback.\n"
"You can set `_allow_dynamic_callbacks` to allow for development purpose only."
)
dist = app.get_dist(diff_packages)
response["dist"] = dist

try:
jsonResponse = to_json(response)
except TypeError:
_validate.fail_callback_output(output_value, output)

return jsonResponse

def add_context(*args, **kwargs):
output_spec = kwargs.pop("outputs_list")
app_callback_manager = kwargs.pop("long_callback_manager", None)
Expand Down Expand Up @@ -575,7 +810,10 @@ def add_context(*args, **kwargs):

return jsonResponse

callback_map[callback_id]["callback"] = add_context
if asyncio.iscoroutinefunction(func):
callback_map[callback_id]["callback"] = async_add_context
else:
callback_map[callback_id]["callback"] = add_context

return func

Expand Down
Loading