diff --git a/labthings/core/event.py b/labthings/core/event.py index a6fcc23b..778360a9 100644 --- a/labthings/core/event.py +++ b/labthings/core/event.py @@ -4,6 +4,7 @@ import logging from gevent.monkey import get_original +from gevent.lock import Semaphore # Guarantee that Task threads will always be proper system threads, regardless of Gevent patches Event = get_original("threading", "Event") @@ -19,6 +20,7 @@ class ClientEvent(object): def __init__(self): self.events = {} + self._setting_lock = Semaphore(value=1) def wait(self, timeout: int = 5): """Wait for the next data frame (invoked from each client's thread).""" @@ -40,23 +42,24 @@ def wait(self, timeout: int = 5): def set(self, timeout=5): """Signal that a new frame is available.""" - now = time.time() - remove = None - for ident, event in self.events.items(): - if not event[0].is_set(): - # if this client's event is not set, then set it - # also update the last set timestamp to now - event[0].set() - event[1] = now - else: - # if the client's event is already set, it means the client - # did not process a previous frame - # if the event stays set for more than `timeout` seconds, then assume - # the client is gone and remove it - if now - event[1] >= timeout: - remove = ident - if remove: - del self.events[remove] + with self._setting_lock: + now = time.time() + remove_keys = set() + for ident, event in self.events.items(): + if not event[0].is_set(): + # if this client's event is not set, then set it + # also update the last set timestamp to now + event[0].set() + event[1] = now + else: + # if the client's event is already set, it means the client + # did not process a previous frame + # if the event stays set for more than `timeout` seconds, then + # assume the client is gone and remove it + if now - event[1] >= timeout: + remove_keys.add(ident) + if remove_keys: + del self.events[ident] def clear(self): """Clear frame event, once processed."""