Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure events are dispatched sequentially #7128

Merged
merged 3 commits into from
Aug 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 59 additions & 34 deletions panel/io/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from bokeh.document.document import Document
from bokeh.document.events import (
ColumnDataChangedEvent, ColumnsPatchedEvent, ColumnsStreamedEvent,
DocumentChangedEvent, ModelChangedEvent,
DocumentChangedEvent, MessageSentEvent, ModelChangedEvent,
)
from bokeh.model.util import visit_immediate_value_references
from bokeh.models import CustomJS
Expand All @@ -45,16 +45,12 @@

DISPATCH_EVENTS = (
ColumnDataChangedEvent, ColumnsPatchedEvent, ColumnsStreamedEvent,
ModelChangedEvent
ModelChangedEvent, MessageSentEvent
)
GC_DEBOUNCE = 5
_WRITE_LOCK = None

def WRITE_LOCK():
global _WRITE_LOCK
if _WRITE_LOCK is None:
_WRITE_LOCK = asyncio.Lock()
return _WRITE_LOCK
_WRITE_FUTURES = weakref.WeakKeyDictionary()
_WRITE_MSGS = weakref.WeakKeyDictionary()
_WRITE_BLOCK = weakref.WeakKeyDictionary()

_panel_last_cleanup = None
_write_tasks = []
Expand Down Expand Up @@ -140,19 +136,19 @@ def _cleanup_doc(doc, destroy=True):
# Destroy document
doc.destroy(None)

async def _run_write_futures(futures):
async def _run_write_futures(doc):
"""
Ensure that all write_message calls are awaited and handled.
"""
from tornado.websocket import WebSocketClosedError
async with WRITE_LOCK():
for future in futures:
try:
await future
except WebSocketClosedError:
logger.warning("Failed sending message as connection was closed")
except Exception as e:
logger.warning(f"Failed sending message due to following error: {e}")
futures = _WRITE_FUTURES.pop(doc, [])
for future in futures:
try:
await future
except WebSocketClosedError:
logger.warning("Failed sending message as connection was closed")
except Exception as e:
logger.warning(f"Failed sending message due to following error: {e}")

def _dispatch_write_task(doc, func, *args, **kwargs):
"""
Expand All @@ -165,29 +161,44 @@ def _dispatch_write_task(doc, func, *args, **kwargs):
except RuntimeError:
doc.add_next_tick_callback(partial(func, *args, **kwargs))

async def _dispatch_msgs(doc, msgs):
async def _dispatch_msgs(doc):
"""
Writes messages to a socket, ensuring that the write_lock is not
set, otherwise re-schedules the write task on the event loop.
"""
from tornado.websocket import WebSocketHandler
remaining = {}
for conn, msg in msgs.items():
futures = []
conn_msgs = _WRITE_MSGS.pop(doc, {})
for conn, msgs in conn_msgs.items():
socket = conn._socket
if hasattr(socket, 'write_lock') and socket.write_lock._block._value == 0:
remaining[conn] = msg
remaining[conn] = msgs
continue
if isinstance(conn._socket, WebSocketHandler):
futures = dispatch_tornado(conn, msg=msg)
elif (socket_type:= type(conn._socket)) in extra_socket_handlers:
futures = extra_socket_handlers[socket_type](conn, msg=msg)
for msg in msgs:
if isinstance(conn._socket, WebSocketHandler):
futures += dispatch_tornado(conn, msg=msg)
elif (socket_type:= type(conn._socket)) in extra_socket_handlers:
futures += extra_socket_handlers[socket_type](conn, msg=msg)
else:
futures += dispatch_django(conn, msg=msg)
if futures:
if doc in _WRITE_FUTURES:
_WRITE_FUTURES[doc] += futures
else:
futures = dispatch_django(conn, msg=msg)
await _run_write_futures(futures)
_WRITE_FUTURES[doc] = futures
await _run_write_futures(doc)
if not remaining:
if doc in _WRITE_BLOCK:
del _WRITE_BLOCK[doc]
return
for conn, msgs in remaining.items():
if doc in _WRITE_MSGS:
_WRITE_MSGS[doc][conn] = msgs + _WRITE_MSGS[doc].get(conn, [])
else:
_WRITE_MSGS[doc] = {conn: msgs}
await asyncio.sleep(0.01)
_dispatch_write_task(doc, _dispatch_msgs, doc, remaining)
_dispatch_write_task(doc, _dispatch_msgs, doc)

def _garbage_collect():
if (new_time:= time.monotonic()-_panel_last_cleanup) < GC_DEBOUNCE:
Expand Down Expand Up @@ -387,7 +398,7 @@ def unlocked() -> Iterator:
remaining_events, dispatch_events = [], []
try:
yield
locked = False
locked = curdoc in _WRITE_MSGS or curdoc in _WRITE_BLOCK
for conn in connections:
socket = conn._socket
if hasattr(socket, 'write_lock') and socket.write_lock._block._value == 0:
Expand All @@ -414,11 +425,17 @@ def unlocked() -> Iterator:
else:
futures += dispatch_django(conn, dispatch_events)


if futures:
if curdoc in _WRITE_FUTURES:
_WRITE_FUTURES[curdoc] += futures
else:
_WRITE_FUTURES[curdoc] = futures

if state._unblocked(curdoc):
_dispatch_write_task(curdoc, _run_write_futures, futures)
_dispatch_write_task(curdoc, _run_write_futures, curdoc)
else:
curdoc.add_next_tick_callback(partial(_run_write_futures, futures))
curdoc.add_next_tick_callback(partial(_run_write_futures, curdoc))
except Exception as e:
# If we error out during the yield, there won't be any events
# captured so we end up simply calling curdoc.unhold() and
Expand All @@ -440,14 +457,22 @@ def unlocked() -> Iterator:
leftover_events = [e for e in remaining_events if not isinstance(e, Serializable)]
remaining_events = [e for e in remaining_events if isinstance(e, Serializable)]

# Set up write locks
if remaining_events:
_WRITE_BLOCK[curdoc] = True
_WRITE_MSGS[curdoc] = msgs = _WRITE_MSGS.get(curdoc, {})
# Create messages for remaining events
msgs = {}
for conn in connections:
if not remaining_events:
continue
# Create a protocol message for any events that cannot be immediately dispatched
msgs[conn] = conn.protocol.create('PATCH-DOC', remaining_events)
_dispatch_write_task(curdoc, _dispatch_msgs, curdoc, msgs)
msg = conn.protocol.create('PATCH-DOC', remaining_events)
if conn in msgs:
msgs[conn].append(msg)
else:
msgs[conn] = [msg]

_dispatch_write_task(curdoc, _dispatch_msgs, curdoc)
curdoc.callbacks._held_events += leftover_events
curdoc.unhold()

Expand Down
2 changes: 1 addition & 1 deletion pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ ipywidgets_bokeh = "*"
numba = "*"
reacton = "*"
scipy = "*"
textual = "*"
textual = "<0.76" # Temporary fix

[feature.test-unit-task.tasks] # So it is not showing up in the test-ui environment
test-unit = 'pytest panel/tests -n logical --dist loadgroup'
Expand Down
Loading