From ba28da9e520a317f494219fe0a4dcdef77220243 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Sun, 11 Aug 2024 12:15:12 +0200 Subject: [PATCH 1/3] Ensure events are dispatched sequentially --- panel/io/document.py | 70 ++++++++++++++++++++++++++++++++------------ 1 file changed, 51 insertions(+), 19 deletions(-) diff --git a/panel/io/document.py b/panel/io/document.py index 646152a7bb..67fb86cacb 100644 --- a/panel/io/document.py +++ b/panel/io/document.py @@ -23,7 +23,7 @@ from bokeh.document.document import Document from bokeh.document.events import ( ColumnDataChangedEvent, ColumnsPatchedEvent, ColumnsStreamedEvent, - DocumentChangedEvent, ModelChangedEvent, + DocumentChangedEvent, MessageSentEvent, ModelChangedEvent, ) from bokeh.model.util import visit_immediate_value_references from bokeh.models import CustomJS @@ -45,10 +45,13 @@ DISPATCH_EVENTS = ( ColumnDataChangedEvent, ColumnsPatchedEvent, ColumnsStreamedEvent, - ModelChangedEvent + ModelChangedEvent, MessageSentEvent ) GC_DEBOUNCE = 5 _WRITE_LOCK = None +_WRITE_FUTURES = weakref.WeakKeyDictionary() +_WRITE_MSGS = weakref.WeakKeyDictionary() +_WRITE_BLOCK = weakref.WeakKeyDictionary() def WRITE_LOCK(): global _WRITE_LOCK @@ -140,12 +143,13 @@ def _cleanup_doc(doc, destroy=True): # Destroy document doc.destroy(None) -async def _run_write_futures(futures): +async def _run_write_futures(doc): """ Ensure that all write_message calls are awaited and handled. """ from tornado.websocket import WebSocketClosedError async with WRITE_LOCK(): + futures = _WRITE_FUTURES.pop(doc, []) for future in futures: try: await future @@ -165,29 +169,44 @@ def _dispatch_write_task(doc, func, *args, **kwargs): except RuntimeError: doc.add_next_tick_callback(partial(func, *args, **kwargs)) -async def _dispatch_msgs(doc, msgs): +async def _dispatch_msgs(doc): """ Writes messages to a socket, ensuring that the write_lock is not set, otherwise re-schedules the write task on the event loop. """ from tornado.websocket import WebSocketHandler remaining = {} - for conn, msg in msgs.items(): + futures = [] + conn_msgs = _WRITE_MSGS.pop(doc, {}) + for conn, msgs in conn_msgs.items(): socket = conn._socket if hasattr(socket, 'write_lock') and socket.write_lock._block._value == 0: - remaining[conn] = msg + remaining[conn] = msgs continue - if isinstance(conn._socket, WebSocketHandler): - futures = dispatch_tornado(conn, msg=msg) - elif (socket_type:= type(conn._socket)) in extra_socket_handlers: - futures = extra_socket_handlers[socket_type](conn, msg=msg) + for msg in msgs: + if isinstance(conn._socket, WebSocketHandler): + futures += dispatch_tornado(conn, msg=msg) + elif (socket_type:= type(conn._socket)) in extra_socket_handlers: + futures += extra_socket_handlers[socket_type](conn, msg=msg) + else: + futures += dispatch_django(conn, msg=msg) + if futures: + if doc in _WRITE_FUTURES: + _WRITE_FUTURES[doc] += futures else: - futures = dispatch_django(conn, msg=msg) - await _run_write_futures(futures) + _WRITE_FUTURES[doc] = futures + await _run_write_futures(doc) if not remaining: + if doc in _WRITE_BLOCK: + del _WRITE_BLOCK[doc] return + for conn, msgs in remaining.items(): + if doc in _WRITE_MSGS: + _WRITE_MSGS[doc][conn] = msgs + _WRITE_MSGS[doc].get(conn, []) + else: + _WRITE_MSGS[doc] = {conn: msgs} await asyncio.sleep(0.01) - _dispatch_write_task(doc, _dispatch_msgs, doc, remaining) + _dispatch_write_task(doc, _dispatch_msgs, doc) def _garbage_collect(): if (new_time:= time.monotonic()-_panel_last_cleanup) < GC_DEBOUNCE: @@ -387,7 +406,7 @@ def unlocked() -> Iterator: remaining_events, dispatch_events = [], [] try: yield - locked = False + locked = curdoc in _WRITE_MSGS or curdoc in _WRITE_BLOCK for conn in connections: socket = conn._socket if hasattr(socket, 'write_lock') and socket.write_lock._block._value == 0: @@ -414,11 +433,16 @@ def unlocked() -> Iterator: else: futures += dispatch_django(conn, dispatch_events) + if curdoc in _WRITE_FUTURES: + _WRITE_FUTURES[curdoc] += futures + else: + _WRITE_FUTURES[curdoc] = futures + if futures: if state._unblocked(curdoc): - _dispatch_write_task(curdoc, _run_write_futures, futures) + _dispatch_write_task(curdoc, _run_write_futures, curdoc) else: - curdoc.add_next_tick_callback(partial(_run_write_futures, futures)) + curdoc.add_next_tick_callback(partial(_run_write_futures, curdoc)) except Exception as e: # If we error out during the yield, there won't be any events # captured so we end up simply calling curdoc.unhold() and @@ -440,14 +464,22 @@ def unlocked() -> Iterator: leftover_events = [e for e in remaining_events if not isinstance(e, Serializable)] remaining_events = [e for e in remaining_events if isinstance(e, Serializable)] + # Set up write locks + if remaining_events: + _WRITE_BLOCK[curdoc] = True + _WRITE_MSGS[curdoc] = msgs = _WRITE_MSGS.get(curdoc, {}) # Create messages for remaining events - msgs = {} for conn in connections: if not remaining_events: continue # Create a protocol message for any events that cannot be immediately dispatched - msgs[conn] = conn.protocol.create('PATCH-DOC', remaining_events) - _dispatch_write_task(curdoc, _dispatch_msgs, curdoc, msgs) + msg = conn.protocol.create('PATCH-DOC', remaining_events) + if conn in msgs: + msgs[conn].append(msg) + else: + msgs[conn] = [msg] + + _dispatch_write_task(curdoc, _dispatch_msgs, curdoc) curdoc.callbacks._held_events += leftover_events curdoc.unhold() From ef6d25789d849e30dcf2d71f4ed4554e37e2aaaf Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Sun, 11 Aug 2024 12:50:25 +0200 Subject: [PATCH 2/3] Cleanup --- panel/io/document.py | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/panel/io/document.py b/panel/io/document.py index 67fb86cacb..744a805a53 100644 --- a/panel/io/document.py +++ b/panel/io/document.py @@ -48,17 +48,10 @@ ModelChangedEvent, MessageSentEvent ) GC_DEBOUNCE = 5 -_WRITE_LOCK = None _WRITE_FUTURES = weakref.WeakKeyDictionary() _WRITE_MSGS = weakref.WeakKeyDictionary() _WRITE_BLOCK = weakref.WeakKeyDictionary() -def WRITE_LOCK(): - global _WRITE_LOCK - if _WRITE_LOCK is None: - _WRITE_LOCK = asyncio.Lock() - return _WRITE_LOCK - _panel_last_cleanup = None _write_tasks = [] @@ -148,15 +141,14 @@ async def _run_write_futures(doc): Ensure that all write_message calls are awaited and handled. """ from tornado.websocket import WebSocketClosedError - async with WRITE_LOCK(): - futures = _WRITE_FUTURES.pop(doc, []) - for future in futures: - try: - await future - except WebSocketClosedError: - logger.warning("Failed sending message as connection was closed") - except Exception as e: - logger.warning(f"Failed sending message due to following error: {e}") + futures = _WRITE_FUTURES.pop(doc, []) + for future in futures: + try: + await future + except WebSocketClosedError: + logger.warning("Failed sending message as connection was closed") + except Exception as e: + logger.warning(f"Failed sending message due to following error: {e}") def _dispatch_write_task(doc, func, *args, **kwargs): """ @@ -433,12 +425,13 @@ def unlocked() -> Iterator: else: futures += dispatch_django(conn, dispatch_events) - if curdoc in _WRITE_FUTURES: - _WRITE_FUTURES[curdoc] += futures - else: - _WRITE_FUTURES[curdoc] = futures if futures: + if curdoc in _WRITE_FUTURES: + _WRITE_FUTURES[curdoc] += futures + else: + _WRITE_FUTURES[curdoc] = futures + if state._unblocked(curdoc): _dispatch_write_task(curdoc, _run_write_futures, curdoc) else: From 57a0b1ba5f6828dc1d9d6debc079077dfed17e2f Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Sun, 11 Aug 2024 13:25:10 +0200 Subject: [PATCH 3/3] Pin textual for now --- pixi.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pixi.toml b/pixi.toml index 6cbdb658cb..cdf3b56d5d 100644 --- a/pixi.toml +++ b/pixi.toml @@ -116,7 +116,7 @@ ipywidgets_bokeh = "*" numba = "*" reacton = "*" scipy = "*" -textual = "*" +textual = "<0.76" # Temporary fix [feature.test-unit-task.tasks] # So it is not showing up in the test-ui environment test-unit = 'pytest panel/tests -n logical --dist loadgroup'