Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add thread parameter to connection method, allowed "queued connections" #200

Merged
merged 20 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ docs = [
"mkdocs==1.4.2",
"mkdocstrings-python==0.8.3",
"mkdocstrings==0.20.0",
"mkdocs-spellcheck[all]"
"mkdocs-spellcheck[all]",
]
proxy = ["wrapt"]
pydantic = ["pydantic"]
Expand Down Expand Up @@ -104,14 +104,13 @@ dependencies = [
"types-attrs",
"msgspec ; python_version >= '3.8'",
]
include = [
"src/psygnal/_dataclass_utils.py",
"src/psygnal/_evented_decorator.py",
"src/psygnal/_group_descriptor.py",
"src/psygnal/_group.py",
"src/psygnal/_signal.py",
"src/psygnal/_throttler.py",
"src/psygnal/_weak_callback.py",
exclude = [
"src/psygnal/__init__.py",
"src/psygnal/_evented_model.py",
"src/psygnal/utils.py",
"src/psygnal/containers",
"src/psygnal/qt.py",
"src/psygnal/_pyinstaller_util",
]

[tool.cibuildwheel]
Expand Down Expand Up @@ -168,7 +167,7 @@ testpaths = ["tests"]
filterwarnings = [
"error",
"ignore:The distutils package is deprecated:DeprecationWarning:",
"ignore:.*BackendFinder.find_spec()", # pyinstaller import
"ignore:.*BackendFinder.find_spec()", # pyinstaller import
]

# https://mypy.readthedocs.io/en/stable/config_file.html
Expand Down
8 changes: 6 additions & 2 deletions src/psygnal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def version(package: str) -> str:
"debounced",
"EmissionInfo",
"EmitLoopError",
"emit_queued",
"evented",
"EventedModel",
"get_evented_namespace",
Expand All @@ -53,17 +54,20 @@ def version(package: str) -> str:
"PSYGNAL_UNCOMPILED no longer has any effect. If you wish to run psygnal "
"without compiled files, you can run:\n\n"
'python -c "import psygnal.utils; psygnal.utils.decompile()"\n\n'
"(You will need to reinstall psygnal to get the compiled version back.)"
"(You will need to reinstall psygnal to get the compiled version back.)",
stacklevel=2,
)

from ._evented_decorator import evented
from ._exceptions import EmitLoopError
from ._group import EmissionInfo, SignalGroup
from ._group_descriptor import (
SignalGroupDescriptor,
get_evented_namespace,
is_evented,
)
from ._signal import EmitLoopError, Signal, SignalInstance, _compiled
from ._queue import emit_queued
from ._signal import Signal, SignalInstance, _compiled
from ._throttler import debounced, throttled


Expand Down
11 changes: 11 additions & 0 deletions src/psygnal/_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
class EmitLoopError(Exception):
"""Error type raised when an exception occurs during a callback."""

def __init__(self, slot_repr: str, args: tuple, exc: BaseException) -> None:
self.slot_repr = slot_repr
self.args = args
self.__cause__ = exc # mypyc doesn't set this, but uncompiled code would
super().__init__(
f"calling {self.slot_repr} with args={args!r} caused "
f"{type(exc).__name__}: {exc}."
)
2 changes: 1 addition & 1 deletion src/psygnal/_pyinstaller_util/hook-psygnal.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def binary_files(file_list: Iterable[Union[PackagePath, Path]]) -> List[Path]:


def create_hiddenimports() -> List[str]:
res = ["mypy_extensions", "__future__"]
res = ["queue", "mypy_extensions", "__future__"]

try:
files_list = package_files("psygnal")
Expand Down
95 changes: 95 additions & 0 deletions src/psygnal/_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from __future__ import annotations

from queue import Queue
from threading import Thread, current_thread, main_thread
from typing import TYPE_CHECKING, Any, Callable, ClassVar, DefaultDict, Tuple

if TYPE_CHECKING:
from typing_extensions import Literal

from ._exceptions import EmitLoopError
from ._weak_callback import WeakCallback

Callback = Callable[[Tuple[Any, ...]], Any]
CbArgsTuple = Tuple[Callback, tuple]


class QueuedCallback(WeakCallback):
"""WeakCallback that queues the callback to be called on a different thread.

(...rather than invoking it immediately.)

Parameters
----------
wrapped : WeakCallback
The actual callback to be invoked.
thread : Thread | Literal["main", "current"] | None
The thread on which to invoke the callback. If not provided, the main
thread will be used.
"""

_GLOBAL_QUEUE: ClassVar[DefaultDict[Thread, Queue[CbArgsTuple]]] = DefaultDict(
Queue
)

def __init__(
self,
wrapped: WeakCallback,
thread: Thread | Literal["main", "current"] | None = None,
) -> None:
self._wrapped = wrapped
# keeping the wrapped key allows this slot to be disconnected
# regardless of whether it was connected with type='queue' or 'direct' ...
self._key: str = wrapped._key
self._max_args: int | None = wrapped._max_args
self._alive: bool = wrapped._alive
self._on_ref_error = wrapped._on_ref_error
tlambert03 marked this conversation as resolved.
Show resolved Hide resolved
tlambert03 marked this conversation as resolved.
Show resolved Hide resolved

if thread is None or thread == "main":
thread = main_thread()
elif thread == "current":
thread = current_thread()
elif not isinstance(thread, Thread): # pragma: no cover
raise TypeError(
f"`thread` must be a Thread instance, not {type(thread).__name__}"
)
# NOTE: for some strange reason, mypyc crashes if we use `self._thread` here
# so we use `self._thred` instead
self._thred = thread

def cb(self, args: tuple = ()) -> None:
if current_thread() is self._thred:
self._wrapped.cb(args)
else:
QueuedCallback._GLOBAL_QUEUE[self._thred].put((self._wrapped.cb, args))

def dereference(self) -> Callable | None:
return self._wrapped.dereference()


def emit_queued(thread: Thread | None = None) -> None:
"""Trigger emissions of all callbacks queued in the current thread.

Parameters
----------
thread : Thread, optional
The thread on which to invoke the callback. If not provided, the main
thread will be used.

Raises
------
EmitLoopError
If an exception is raised while invoking a queued callback.
This exception can be caught and optionally supressed or handled by the caller,
allowing the emission of other queued callbacks to continue even if one of them
raises an exception.
"""
_thread = current_thread() if thread is None else thread
queue = QueuedCallback._GLOBAL_QUEUE[_thread]

tlambert03 marked this conversation as resolved.
Show resolved Hide resolved
while not queue.empty():
cb, args = queue.get()
try:
cb(args)
except Exception as e: # pragma: no cover
raise EmitLoopError(slot_repr=repr(cb), args=args, exc=e) from e
47 changes: 27 additions & 20 deletions src/psygnal/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@
from mypy_extensions import mypyc_attr
from typing_extensions import get_args, get_origin

from psygnal._weak_callback import (
WeakCallback,
_WeakSetattr,
_WeakSetitem,
weak_callback,
)
from ._exceptions import EmitLoopError
from ._queue import QueuedCallback
from ._weak_callback import WeakCallback, _WeakSetattr, _WeakSetitem, weak_callback

if TYPE_CHECKING:
from typing_extensions import Literal
Expand All @@ -47,19 +44,6 @@
F = TypeVar("F", bound=Callable)


class EmitLoopError(Exception):
"""Error type raised when an exception occurs during a callback."""

def __init__(self, slot_repr: str, args: tuple, exc: BaseException) -> None:
self.slot_repr = slot_repr
self.args = args
self.__cause__ = exc # mypyc doesn't set this, but uncompiled code would
super().__init__(
f"calling {self.slot_repr} with args={args!r} caused "
f"{type(exc).__name__}: {exc}."
)


class Signal:
"""Declares a signal emitter on a class.

Expand Down Expand Up @@ -356,6 +340,7 @@ def __repr__(self) -> str:
def connect(
self,
*,
thread: threading.Thread | Literal["main", "current"] | None = ...,
check_nargs: bool | None = ...,
check_types: bool | None = ...,
unique: bool | str = ...,
Expand All @@ -369,6 +354,7 @@ def connect(
self,
slot: F,
*,
thread: threading.Thread | Literal["main", "current"] | None = ...,
check_nargs: bool | None = ...,
check_types: bool | None = ...,
unique: bool | str = ...,
Expand All @@ -381,6 +367,7 @@ def connect(
self,
slot: F | None = None,
*,
thread: threading.Thread | Literal["main", "current"] | None = None,
check_nargs: bool | None = None,
check_types: bool | None = None,
unique: bool | str = False,
Expand All @@ -406,6 +393,13 @@ def my_function():
...
```

!!!important
If a signal is connected with `thread != None`, then it is up to the user
to ensure that `psygnal.emit_queued` is called, or that one of the backend
convenience functions is used (e.g. `psygnal.qt.start_emitting_from_queue`).
Otherwise, callbacks that are connected to signals that are emitted from
another thread will never be called.

Parameters
----------
slot : Callable
Expand All @@ -415,6 +409,16 @@ def my_function():
check_nargs : Optional[bool]
If `True` and the provided `slot` requires more positional arguments than
the signature of this Signal, raise `TypeError`. by default `True`.
thread: Thread | Literal["main", "current"] | None
If `None` (the default), this slot will be invoked immediately when a signal
is emitted, from whatever thread emitted the signal. If a thread object is
provided, then the callback will only be immediately invoked if the signal
is emitted from that thread. Otherwise, the callback will be added to a
queue. **Note!**, when using the `thread` parameter, the user is responsible
for calling `psygnal.emit_queued()` in the corresponding thread, otherwise
the slot will never be invoked. (See note above). (The strings `"main"` and
`"current"` are also accepted, and will be interpreted as the
`threading.main_thread()` and `threading.current_thread()`, respectively).
check_types : Optional[bool]
If `True`, An additional check will be performed to make sure that types
declared in the slot signature are compatible with the signature
Expand Down Expand Up @@ -487,7 +491,10 @@ def _wrapper(
finalize=self._try_discard,
on_ref_error=_on_ref_err,
)
self._slots.append(cb)
if thread is None:
self._slots.append(cb)
else:
self._slots.append(QueuedCallback(cb, thread=thread))
return slot

return _wrapper if slot is None else _wrapper(slot)
Expand Down
76 changes: 76 additions & 0 deletions src/psygnal/qt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Module that provides Qt-specific functionality for psygnal.

This module provides convenience functions for starting and stopping a QTimer that
will monitor "queued" signals and invoke their callbacks. This is useful when
psygnal is used in a Qt application, and you'd like to emit signals from a thread
but have their callbacks invoked in the main thread.
"""
from __future__ import annotations
tlambert03 marked this conversation as resolved.
Show resolved Hide resolved

from threading import Thread, current_thread

from ._queue import emit_queued

try:
from qtpy.QtCore import Qt, QTimer
except ImportError:
tlambert03 marked this conversation as resolved.
Show resolved Hide resolved
raise ImportError(
"The psygnal.qt module requires qtpy and some Qt backend to be installed"
) from None

_TIMERS: dict[Thread, QTimer] = {}


def start_emitting_from_queue(
msec: int = 0,
timer_type: Qt.TimerType = Qt.TimerType.PreciseTimer,
thread: Thread | None = None,
) -> None:
"""Start a QTimer that will monitor the global emission queue.

If a QTimer is already running in the current thread, then this function will
update the interval and timer type of that QTimer. (It is safe to call this
function multiple times in the same thread.)

When callbacks are connected to signals with `connect(type='queued')`, then they
are not invoked immediately, but rather added to a global queue. This function
starts a QTimer that will periodically check the queue and invoke any callbacks
that are waiting to be invoked (in whatever thread this QTimer is running in).

Parameters
----------
msec : int, optional
The interval (in milliseconds) at which the QTimer will check the global
emission queue. By default, the QTimer will check the queue as often as
possible (i.e. 0 milliseconds).
timer_type : Qt.TimerType, optional
The type of timer to use. By default, Qt.PreciseTimer is used, which is
the most accurate timer available on the system.
thread : Thread, optional
The thread in which to start the QTimer. By default, the QTimer will be
started in the thread from which this function is called.
"""
_thread = current_thread() if thread is None else thread
if _thread not in _TIMERS:
_TIMERS[_thread] = QTimer()

_TIMERS[_thread].timeout.connect(emit_queued)

_TIMERS[_thread].setTimerType(timer_type)
if _TIMERS[_thread].isActive():
_TIMERS[_thread].setInterval(msec)
else:
_TIMERS[_thread].start(msec)


def stop_emitting_from_queue(thread: Thread | None = None) -> None:
"""Stop the QTimer that monitors the global emission queue.

thread : Thread, optional
The thread in which to stop the QTimer. By default, will stop any QTimers
in the thread from which this function is called.
"""
_thread = current_thread() if thread is None else thread
timer = _TIMERS.get(_thread)
if timer is not None:
timer.stop()
Loading