Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure execution order of callbacks that are expected to be called immediately (such as call_soon) #123

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions qasync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import time
from concurrent.futures import Future
from queue import Queue
from collections import deque

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -294,6 +295,79 @@ def __log_debug(self, *args, **kwargs):
self._logger.debug(*args, **kwargs)


@with_logger
class _CallSoonQueue(QtCore.QObject):
def __init__(self):
super().__init__()
# Contains asyncio.Handle objects
# Use a deque instead of Queue, as we don't require
# synchronization between threads here.
self.__callbacks = deque()

# Keep track of the current timer.
# The queue can only have a single timer that services it.
# Once fired, all pending callbacks will be processed.
self.__timer_id = None
self.__stopped = False
self.__debug_enabled = False

def add_callback(self, handle):
# handle must be an asyncio.Handle
self.__callbacks.append(handle)
self.__log_debug("Registering call_soon handle %s", id(handle))

# Create a timer if it doesn't yet exist
if self.__timer_id is None:
# Set a 0-delay timer on itself, this will ensure thats
# it gets fired immediately after window events are processed the next time.
# See https://doc.qt.io/qt-6/qtimer.html#interval-prop
self.__timer_id = self.startTimer(0)
self.__log_debug("Registering call_soon timer %s", self.__timer_id)
return handle

def timerEvent(self, event):
timerId = event.timerId()
# We should have only one timer active at the same time, so
# this assert will get hit only when something's very bad
assert timerId == self.__timer_id

# Stop timer if stopped
if self.__stopped:
self.__log_debug("call_soon queue stopped, clearing handles")
# TODO: Do we need to del the handles or somehow invalidate them?
self.__callbacks.clear()
self.killTimer(timerId)
self.__timer_id = None
return

# Iterate over pending callbacks
# TODO: Runtime deadline, don't process the entire queue if it takes too long?
while len(self.__callbacks) > 0:
handle = self.__callbacks.popleft()
self.__log_debug("Calling call_soon handle %s", id(handle))
handle._run()

# No more callbacks exist, we can dispose this timer.
# It will be recreated once a callback is registered again.
# It's should be safe to assume that another thread isn't calling
# add_callback during the lifetime of timerEvent
self.__log_debug("Stopping call_soon timer %s", timerId)
self.killTimer(timerId)
self.__timer_id = None
assert len(self.__callbacks) == 0

def stop(self):
self.__log_debug("Stopping call_soon queue")
self.__stopped = True

def set_debug(self, enabled):
self.__debug_enabled = enabled

def __log_debug(self, *args, **kwargs):
if self.__debug_enabled:
self._logger.debug(*args, **kwargs)


def _fileno(fd):
if isinstance(fd, int):
return fd
Expand Down Expand Up @@ -339,6 +413,7 @@ def __init__(self, app=None, set_running_loop=False, already_running=False):
self._read_notifiers = {}
self._write_notifiers = {}
self._timer = _SimpleTimer()
self._call_soon_queue = _CallSoonQueue()

self.__call_soon_signaller = signaller = _make_signaller(QtCore, object, tuple)
self.__call_soon_signal = signaller.signal
Expand Down Expand Up @@ -441,6 +516,7 @@ def close(self):
super().close()

self._timer.stop()
self._call_soon_queue.stop()
self.__app = None

for notifier in itertools.chain(
Expand Down Expand Up @@ -474,6 +550,11 @@ def call_later(self, delay, callback, *args, context=None):
return self._add_callback(asyncio.Handle(callback, args, self), delay)

def _add_callback(self, handle, delay=0):
if delay == 0:
# To ensure that we can guarantee the execution order of
# 0-delay callbacks, add them to a special queue, rather than
# assume that Qt will fire the timerEvents in order
return self._call_soon_queue.add_callback(handle)
return self._timer.add_callback(handle, delay)

def call_soon(self, callback, *args, context=None):
Expand Down Expand Up @@ -717,6 +798,7 @@ def set_debug(self, enabled):
super().set_debug(enabled)
self.__debug_enabled = enabled
self._timer.set_debug(enabled)
self._call_soon_queue.set_debug(enabled)

def __enter__(self):
return self
Expand Down
Loading