Skip to content

Commit

Permalink
Ensure pending writes are dispatched in order and only from correct t…
Browse files Browse the repository at this point in the history
…hread
  • Loading branch information
philippjfr committed Mar 5, 2024
1 parent 6608244 commit ad33ca6
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions panel/io/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import threading

from collections import defaultdict
from contextlib import contextmanager
from functools import partial, wraps
from typing import (
Expand Down Expand Up @@ -46,6 +47,7 @@

WRITE_TASKS = []
WRITE_LOCK = asyncio.Lock()
_pending_writes = defaultdict(list)

@dataclasses.dataclass
class Request:
Expand Down Expand Up @@ -126,19 +128,20 @@ def _cleanup_doc(doc, destroy=True):
# Destroy document
doc.destroy(None)

async def _run_write_futures(futures):
async def _run_write_futures():
"""
Ensure that all write_message calls are awaited and handled.
"""
from tornado.websocket import WebSocketClosedError
async with WRITE_LOCK:
for future in futures:
for future in _pending_writes[state._current_thread]:
try:
await future
except WebSocketClosedError:
logger.warning("Failed sending message as connection was closed")
except Exception as e:
logger.warning(f"Failed sending message due to following error: {e}")
_pending_writes[state._current_thread].clear()

def _dispatch_write_task(doc, func, *args, **kwargs):
"""
Expand Down Expand Up @@ -167,7 +170,8 @@ async def _dispatch_msgs(doc, msgs):
futures = dispatch_tornado(conn, msg=msg)
else:
futures = dispatch_django(conn, msg=msg)
await _run_write_futures(futures)
_pending_writes[state._current_thread].extend(futures)
await _run_write_futures()
if not remaining:
return
await asyncio.sleep(0.01)
Expand Down Expand Up @@ -289,7 +293,8 @@ def unlocked() -> Iterator:
curdoc = state.curdoc
session_context = getattr(curdoc, 'session_context', None)
session = getattr(session_context, 'session', None)
if curdoc is None or session_context is None or session is None or state._jupyter_kernel_context:
if (curdoc is None or session_context is None or session is None or
state._jupyter_kernel_context or state._current_thread != state._thread_id):
yield
return
elif curdoc.callbacks.hold_value:
Expand Down Expand Up @@ -328,12 +333,13 @@ def unlocked() -> Iterator:
futures += dispatch_tornado(conn, dispatch_events)
else:
futures += dispatch_django(conn, dispatch_events)
_pending_writes[state._current_thread].extend(futures)

if futures:
if state._unblocked(curdoc):
_dispatch_write_task(curdoc, _run_write_futures, futures)
_dispatch_write_task(curdoc, _run_write_futures)
else:
curdoc.add_next_tick_callback(partial(_run_write_futures, futures))
curdoc.add_next_tick_callback(_run_write_futures)

curdoc.callbacks._held_events = remaining_events
finally:
Expand Down

0 comments on commit ad33ca6

Please sign in to comment.