diff --git a/panel/io/state.py b/panel/io/state.py index 350d9f0b86..efbfe8bb11 100644 --- a/panel/io/state.py +++ b/panel/io/state.py @@ -397,7 +397,7 @@ def _on_load(self, doc: Optional[Document] = None) -> None: self.param.trigger('_profiles') self._loaded[doc] = True - async def _scheduled_cb(self, name: str) -> None: + async def _scheduled_cb(self, name: str, threaded: bool = False) -> None: if name not in self._scheduled: return diter, cb = self._scheduled[name] @@ -411,9 +411,7 @@ async def _scheduled_cb(self, name: str) -> None: call_time_seconds = (at - now) self._ioloop.call_later(delay=call_time_seconds, callback=partial(self._scheduled_cb, name)) try: - res = cb() - if inspect.isawaitable(res): - await res + self.execute(cb, schedule='thread' if threaded else 'auto') except Exception as e: self._handle_exception(e) @@ -777,7 +775,8 @@ def reset(self): def schedule_task( self, name: str, callback: Callable[[], None], at: Tat =None, - period: str | dt.timedelta = None, cron: Optional[str] = None + period: str | dt.timedelta = None, cron: Optional[str] = None, + threaded : bool = False ) -> None: """ Schedules a task at a specific time or on a schedule. @@ -825,6 +824,9 @@ def schedule_task( cron: str A cron expression (requires croniter to parse) + threaded: bool + Whether the callback should be run on a thread (requires + config.nthreads to be set). """ if name in self._scheduled: if callback is not self._scheduled[name][1]: @@ -877,7 +879,7 @@ def dgen(): return self._scheduled[name] = (diter, callback) self._ioloop.call_later( - delay=call_time_seconds, callback=partial(self._scheduled_cb, name) + delay=call_time_seconds, callback=partial(self._scheduled_cb, name, threaded) ) def sync_busy(self, indicator: BooleanIndicator) -> None: diff --git a/panel/tests/test_server.py b/panel/tests/test_server.py index d965e991f1..cc2272865c 100644 --- a/panel/tests/test_server.py +++ b/panel/tests/test_server.py @@ -295,6 +295,24 @@ def app(): wait_until(lambda: state.cache['count'] > 0) +def test_server_schedule_threaded(threads): + counts = [] + def periodic_cb(count=[0]): + count[0] += 1 + counts.append(count[0]) + time.sleep(0.5) + count[0] += -1 + + def app(): + state.schedule_task('periodic1', periodic_cb, period='0.5s', threaded=True) + state.schedule_task('periodic2', periodic_cb, period='0.5s', threaded=True) + return '# state.schedule test' + + serve_and_request(app) + + # Checks whether scheduled callback was executed concurrently + wait_until(lambda: len(counts) > 0 and max(counts) > 1) + def test_server_schedule_at(): def periodic_cb():