-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run the heartbeat event loop on a separate thread #147
Comments
Implementation 1: Create a new thread, create and set an event loop for the thread, provide the loop to the
Implementation: import time
class Heartbeat:
"""
Sends a heatbeat timestamp in epoch seconds to an async function at a specified interval.
This class is not thead safe.
"""
def __init__(self, heartbeat_function: Callable[[int], None], interval_sec: int):
"""
Create the hearbeat instance.
heartbeat_function - the async function to call on each heartbeat. Accepts a single
argument which is the heartbeat timestamp in milliseconds since the Unix epoch.
interval_sec - the interval between heartbeats in seconds.
"""
print("create hb", time.time())
self._hbf = heartbeat_function
# Start the scheduler in a new thread with a new loop so that if the main thread / loop
# has long wait times between `await` calls the heartbeat is still sent
ready = threading.Event()
print(ready.is_set())
threading.Thread(
target=self._start_background_scheduler,
args=(ready,),
daemon=True
).start()
ready.wait()
self._job_id = self._schd.add_job(
func=self._heartbeat,
trigger=IntervalTrigger(seconds=interval_sec),
coalesce=True,
).id
self._schd.start(paused=True)
print("finish hb", time.time())
def _start_background_scheduler(self, ready: threading.Event):
print("start background", time.time())
# Create the loop in the thread that's going to use it
# https://discuss.python.org/t/is-it-possible-to-detect-the-thread-an-event-loop-is-running-in/2892/6
# There's lots of examples out there that violate this rule
self._loop = asyncio.new_event_loop() # no loop in new thread
asyncio.set_event_loop(self._loop)
self._schd = AsyncIOScheduler(event_loop=self._loop)
print("pre set", time.time())
ready.set() # go
print("post set", time.time())
self._loop.run_forever()
print("thread out", time.time())
# TODO NOW check time.sleep in main thread
async def _heartbeat(self):
print("heartbeat", time.time())
await self._hbf(now_epoch_millis())
def start(self):
"""
Start the heartbeat, sending the first heartbeat after one interval.
"""
if not self._schd:
raise ValueError("Heartbeat has been destroyed")
print("start schd", time.time())
print(self._schd.running)
print(self._schd.get_job(self._job_id))
self._schd.resume()
print(self._schd.running)
print(self._schd.get_job(self._job_id))
def destroy(self):
"""
Stop the heartbeat and destroy any resources associated with it.
"""
self._schd.shutdown(wait=True)
self._schd.remove_job(self._job_id)
self._schd = None
self._loop.stop() # thread should end here
self._loop = None |
Implementation 2: Create a new thread and use Implementation: class Heartbeat:
"""
Sends a heatbeat timestamp in epoch seconds to an async function at a specified interval.
This class is not thead safe.
"""
def __init__(self, heartbeat_function: Callable[[int], None], interval_sec: int):
"""
Create the hearbeat instance.
heartbeat_function - the async function to call on each heartbeat. Accepts a single
argument which is the heartbeat timestamp in milliseconds since the Unix epoch.
interval_sec - the interval between heartbeats in seconds.
"""
self._hbf = heartbeat_function
# Start the scheduler in a new thread with a new loop so that if the main thread / loop
# has long wait times between `await` calls the heartbeat is still sent
ready = threading.Event()
self._thread = threading.Thread(
target=self._start_background_scheduler,
args=(ready,),
daemon=True
)
self._thread.start()
ready.wait()
self._job_id = self._schd.add_job(
func=self._heartbeat,
trigger=IntervalTrigger(seconds=interval_sec),
coalesce=True,
).id
self._schd.start(paused=True)
def _start_background_scheduler(self, ready: threading.Event) -> None:
# For reasons I don't understand, run() here works but calling new_event_loop() in the
# new thread doesn't, and results in async methods in the main thread throwing errors like
# ```
# RuntimeError: <asyncio.locks.Event object at 0x7f2444066b90 [unset]> is bound to a
# different event loop
# ```
# Note that passing loops from thread to thread is not advised, apparently:
# https://discuss.python.org/t/is-it-possible-to-detect-the-thread-an-event-loop-is-running-in/2892/6
# I see lots of examples dingo exactly that though
asyncio.run(self._start_background_scheduler_async(ready))
async def _start_background_scheduler_async(self, ready: threading.Event) -> None:
self._loop = asyncio.get_event_loop()
self._schd = AsyncIOScheduler()
ready.set() # go
async def _heartbeat(self) -> None:
await self._hbf(now_epoch_millis())
def start(self) -> None:
"""
Start the heartbeat, sending the first heartbeat after one interval.
"""
if not self._schd:
raise ValueError("Heartbeat has been destroyed")
self._schd.resume()
def destroy(self) -> None:
"""
Stop the heartbeat and destroy any resources associated with it.
"""
self._schd.shutdown(wait=True)
self._schd.remove_job(self._job_id)
self._schd = None
self._loop.stop() # thread should end here
self._loop = None
# not really necessary, but good to ensure we're not leaving threads around
self._thread.join()
self._thread = None |
Implementation 3: Use a
Implementation: class Heartbeat:
"""
Sends a heatbeat timestamp in epoch seconds to an async function at a specified interval,
running in the default asyncio event loop.
"""
def __init__(self, heartbeat_function: Callable[[int], None], interval_sec: int):
"""
Create the hearbeat instance.
heartbeat_function - the async function to call on each heartbeat. Accepts a single
argument which is the heartbeat timestamp in milliseconds since the Unix epoch.
interval_sec - the interval between heartbeats in seconds.
"""
self._hbf = heartbeat_function
# Use a separate thread (and therefore event loop) just in case some of the async
# matching operations have long non-interruptable sections
# Tried using an AsyncIOScheduler set to use an event loop running in a separate thread
# to avoid calling asyncio.run() on every heartbeat but got errros like
# "RuntimeError: <asyncio.locks.Event object at 0x7f2444066b90 [unset]> is bound
# to a different event loop"
# Not sure why
self._schd = BackgroundScheduler()
self._schd.add_job(
func=self._heartbeat,
trigger=IntervalTrigger(seconds=interval_sec),
coalesce=True,
)
self._schd.start(paused=True)
def _heartbeat(self):
asyncio.run(self._hbf(now_epoch_millis()))
def start(self):
"""
Start the heartbeat, sending the first heartbeat after one interval.
"""
if not self._schd:
raise ValueError("Heartbeat has been destroyed")
self._schd.resume()
def destroy(self):
"""
Stop the heartbeat and destroy any resources associated with it.
"""
self._schd.shutdown(wait=True)
self._schd = None |
Currently the heartbeat [1] for matching / selection processes runs in the same event loop as the main matching process. That means if there's a long period in the main processes between
await
s the heartbeat won't be sent. This probably isn't a huge problem as the heartbeat is currently sent every 10 seconds and the process considered dead if there's no heartbeat for 60s, but the "correct" solution would run the heartbeat in a separate thread and separate event loop in case there is a long, CPU intensive stage in a match process that could exceed 60s.So far I haven't been able to get this to work correctly. I've tried three different implementations, but they all have their own problems. Each post below will describe one implementation.
I generally test by bringing up the services via docker compose, adding a 60 second
sleep()
(either viatime
orasyncio
as appropriate for the test) in the taxa count matching process, and running a taxa count match, deleting the match record from the database between tests.[1]
collections/src/service/processing.py
Line 57 in e301b2b
The text was updated successfully, but these errors were encountered: