Skip to content

Commit

Permalink
Allow running scheduled tasks on threads (#5879)
Browse files Browse the repository at this point in the history
  • Loading branch information
philippjfr authored Nov 15, 2023
1 parent f73973d commit 4bfe371
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
14 changes: 8 additions & 6 deletions panel/io/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def _on_load(self, doc: Optional[Document] = None) -> None:
self.param.trigger('_profiles')
self._loaded[doc] = True

async def _scheduled_cb(self, name: str) -> None:
async def _scheduled_cb(self, name: str, threaded: bool = False) -> None:
if name not in self._scheduled:
return
diter, cb = self._scheduled[name]
Expand All @@ -411,9 +411,7 @@ async def _scheduled_cb(self, name: str) -> None:
call_time_seconds = (at - now)
self._ioloop.call_later(delay=call_time_seconds, callback=partial(self._scheduled_cb, name))
try:
res = cb()
if inspect.isawaitable(res):
await res
self.execute(cb, schedule='thread' if threaded else 'auto')
except Exception as e:
self._handle_exception(e)

Expand Down Expand Up @@ -777,7 +775,8 @@ def reset(self):

def schedule_task(
self, name: str, callback: Callable[[], None], at: Tat =None,
period: str | dt.timedelta = None, cron: Optional[str] = None
period: str | dt.timedelta = None, cron: Optional[str] = None,
threaded : bool = False
) -> None:
"""
Schedules a task at a specific time or on a schedule.
Expand Down Expand Up @@ -825,6 +824,9 @@ def schedule_task(
cron: str
A cron expression (requires croniter to parse)
threaded: bool
Whether the callback should be run on a thread (requires
config.nthreads to be set).
"""
if name in self._scheduled:
if callback is not self._scheduled[name][1]:
Expand Down Expand Up @@ -877,7 +879,7 @@ def dgen():
return
self._scheduled[name] = (diter, callback)
self._ioloop.call_later(
delay=call_time_seconds, callback=partial(self._scheduled_cb, name)
delay=call_time_seconds, callback=partial(self._scheduled_cb, name, threaded)
)

def sync_busy(self, indicator: BooleanIndicator) -> None:
Expand Down
18 changes: 18 additions & 0 deletions panel/tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,24 @@ def app():

wait_until(lambda: state.cache['count'] > 0)

def test_server_schedule_threaded(threads):
counts = []
def periodic_cb(count=[0]):
count[0] += 1
counts.append(count[0])
time.sleep(0.5)
count[0] += -1

def app():
state.schedule_task('periodic1', periodic_cb, period='0.5s', threaded=True)
state.schedule_task('periodic2', periodic_cb, period='0.5s', threaded=True)
return '# state.schedule test'

serve_and_request(app)

# Checks whether scheduled callback was executed concurrently
wait_until(lambda: len(counts) > 0 and max(counts) > 1)


def test_server_schedule_at():
def periodic_cb():
Expand Down

0 comments on commit 4bfe371

Please sign in to comment.