From 3f3aa8f2726edeb545ab6e21d093501bd0dd3a39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natt=C5=8Dsai=20Mit=C5=8D?= Date: Sun, 14 Aug 2022 10:53:43 +0900 Subject: [PATCH] add 'watch_touch', and let 'rest_of_touch_moves' use it --- asynckivy/__init__.py | 2 +- asynckivy/_touch.py | 156 ++++++++++++++++++++++++++++++ tests/test_rest_of_touch_moves.py | 38 +++----- tests/test_watch_touch.py | 141 +++++++++++++++++++++++++++ 4 files changed, 314 insertions(+), 23 deletions(-) create mode 100644 asynckivy/_touch.py create mode 100644 tests/test_watch_touch.py diff --git a/asynckivy/__init__.py b/asynckivy/__init__.py index 9e1bd60..83685e2 100644 --- a/asynckivy/__init__.py +++ b/asynckivy/__init__.py @@ -5,7 +5,7 @@ from ._event import * from ._animation import * from ._interpolate import * -from ._rest_of_touch_moves import * +from ._touch import * from ._threading import * from ._start_soon import * from ._n_frames import * diff --git a/asynckivy/_touch.py b/asynckivy/_touch.py new file mode 100644 index 0000000..8c6f2ab --- /dev/null +++ b/asynckivy/_touch.py @@ -0,0 +1,156 @@ +__all__ = ('watch_touch', 'rest_of_touch_moves', ) + +import types +import functools +import asynckivy as ak + + +class watch_touch: + ''' + Return an async context manager that provides an easy way to handle touch events. + + Usage + ----- + + .. code-block:: python + + async with watch_touch(widget, touch) as is_touch_move: + while await is_touch_move(): + print('on_touch_move') + else: + print('on_touch_up') + + Restriction + ----------- + + 1. The only thing you can 'await' inside the context manager is that the return value of the callable that is + bound to the identifier in the as-clause. + + .. code-block:: python + + async with watch_touch(widget, touch) as is_touch_move: + await is_touch_move() # ALLOWED + await something_else # NOT ALLOWED + + 2. Since the context manager grabs/ungrabs the ``touch``, the ``widget`` must NOT grab/ungrab it. Most of the + widgets that interact to touches (``Button``, ``ScrollView`` and ``Carousel``, for instance) wouldn't work with + this context manager unless you use it in a specific way. + ''' + __slots__ = ('_widget', '_touch', '_stop_dispatching', '_timeout', '_uid_up', '_uid_move', '_no_cleanup', ) + + def __init__(self, widget, touch, stop_dispatching=False, timeout=1.): + self._widget = widget + self._touch = touch + self._stop_dispatching = stop_dispatching + self._timeout = timeout + self._no_cleanup = False + + def _on_touch_up_sd(step_coro, touch, w, t): + if t is touch: + if t.grab_current is w: + t.ungrab(w) + step_coro(False) + return True + + def _on_touch_move_sd(step_coro, touch, w, t): + if t is touch: + if t.grab_current is w: + step_coro(True) + return True + + def _on_touch_up(step_coro, touch, w, t): + if t.grab_current is w and t is touch: + t.ungrab(w) + step_coro(False) + return True + + def _on_touch_move(step_coro, touch, w, t): + if t.grab_current is w and t is touch: + step_coro(True) + return True + + _callbacks = ((_on_touch_up_sd, _on_touch_move_sd, ), (_on_touch_up, _on_touch_move, ), ) + del _on_touch_up, _on_touch_move, _on_touch_up_sd, _on_touch_move_sd + + @types.coroutine + def _true_if_touch_move_false_if_touch_up() -> bool: + return (yield lambda step_coro: None)[0][0] + + @types.coroutine + def _always_false() -> bool: + return False + yield # just to make this function a generator function + + async def __aenter__( + self, get_step_coro=ak.get_step_coro, partial=functools.partial, _callbacks=_callbacks, ak=ak, + _always_false=_always_false, _true_if_touch_move_false_if_touch_up=_true_if_touch_move_false_if_touch_up, + ): + touch = self._touch + widget = self._widget + if touch.time_end != -1: + # `on_touch_up` might have been already fired so we need to find out it actually was or not. + tasks = await ak.or_( + ak.sleep(self._timeout), + ak.event(widget, 'on_touch_up', filter=lambda w, t: t is touch), + ) + if tasks[0].done: + raise ak.MotionEventAlreadyEndedError(f"MotionEvent(uid={touch.uid}) has already ended") + else: + self._no_cleanup = True + return _always_false + step_coro = await get_step_coro() + on_touch_up, on_touch_move = _callbacks[not self._stop_dispatching] + touch.grab(widget) + self._uid_up = widget.fbind('on_touch_up', partial(on_touch_up, step_coro, touch)) + self._uid_move = widget.fbind('on_touch_move', partial(on_touch_move, step_coro, touch)) + assert self._uid_up + assert self._uid_move + return _true_if_touch_move_false_if_touch_up + + del _always_false, _true_if_touch_move_false_if_touch_up, _callbacks + + async def __aexit__(self, *args): + if self._no_cleanup: + return + w = self._widget + self._touch.ungrab(w) + w.unbind_uid('on_touch_up', self._uid_up) + w.unbind_uid('on_touch_move', self._uid_move) + + +async def rest_of_touch_moves(widget, touch, *, stop_dispatching=False, timeout=1.): + ''' + Wrap ``watch_touch()`` in a more intuitive interface. + + Usage + ----- + + .. code-block:: python + + async for __ in rest_of_touch_moves(widget, touch): + print('on_touch_move') + else: + print('on_touch_up') + + Restriction + ----------- + + 1. You are not allowed to 'await' anything during the iterations. + + .. code-block:: python + + async for __ in rest_of_touch_moves(widget, touch): + await something # <- NOT ALLOWED + + 2. Like ``watch_touch``, this wouldn't work with the widgets that interact to touches. + + Downside compared to ``watch_touch`` + ------------------------------------ + + 1. Since this creates an async generator, it may not work if Kivy is running in asyncio/trio mode. + See https://peps.python.org/pep-0525/#finalization for details. + ''' + + async with watch_touch(widget, touch, stop_dispatching, timeout) as is_touch_move: + while await is_touch_move(): + yield diff --git a/tests/test_rest_of_touch_moves.py b/tests/test_rest_of_touch_moves.py index b7c740c..1e09beb 100644 --- a/tests/test_rest_of_touch_moves.py +++ b/tests/test_rest_of_touch_moves.py @@ -2,12 +2,12 @@ @pytest.mark.parametrize('n_touch_moves', [0, 1, 10]) -def test_a_number_of_on_touch_moves_fired(n_touch_moves): +def test_a_number_of_touch_moves(n_touch_moves): from kivy.uix.widget import Widget from kivy.tests.common import UnitTestTouch import asynckivy as ak - async def _test(w, t): + async def async_fn(w, t): n = 0 async for __ in ak.rest_of_touch_moves(w, t): n += 1 @@ -15,7 +15,7 @@ async def _test(w, t): w = Widget() t = UnitTestTouch(0, 0) - task = ak.start(_test(w, t)) + task = ak.start(async_fn(w, t)) for __ in range(n_touch_moves): t.grab_current = None w.dispatch('on_touch_move', t) @@ -33,7 +33,7 @@ def test_break_during_a_for_loop(): from kivy.tests.common import UnitTestTouch import asynckivy as ak - async def _test(w, t): + async def async_fn(w, t): import weakref nonlocal n_touch_moves weak_w = weakref.ref(w) @@ -49,7 +49,7 @@ async def _test(w, t): n_touch_moves = 0 w = Widget() t = UnitTestTouch(0, 0) - task = ak.start(_test(w, t)) + task = ak.start(async_fn(w, t)) for expected in (1, 2, 2, ): t.grab_current = None w.dispatch('on_touch_move', t) @@ -75,7 +75,7 @@ def test_stop_dispatching(stop_dispatching, expectation): from kivy.tests.common import UnitTestTouch import asynckivy as ak - async def _test(parent, t): + async def async_fn(parent, t): async for __ in ak.rest_of_touch_moves( parent, t, stop_dispatching=stop_dispatching): pass @@ -93,7 +93,7 @@ def on_touch_up(*args): ) parent.add_widget(child) t = UnitTestTouch(0, 0) - task = ak.start(_test(parent, t)) + task = ak.start(async_fn(parent, t)) for i in range(2): t.grab_current = None @@ -109,33 +109,27 @@ def on_touch_up(*args): assert task.done +@pytest.mark.parametrize('timeout', (.2, 1.)) @pytest.mark.parametrize('actually_ended', (True, False)) -def test_the_touch_that_might_already_ended(actually_ended): - import time - from kivy.clock import Clock +def test_a_touch_that_might_have_already_ended(sleep_then_tick, timeout, actually_ended): + from contextlib import nullcontext from kivy.uix.widget import Widget from kivy.tests.common import UnitTestTouch import asynckivy as ak from asynckivy.exceptions import MotionEventAlreadyEndedError - Clock.tick() - - async def job(w, t): - if actually_ended: - with pytest.raises(MotionEventAlreadyEndedError): - async for __ in ak.rest_of_touch_moves(w, t): - pass - else: - async for __ in ak.rest_of_touch_moves(w, t): + + async def async_fn(w, t): + with pytest.raises(MotionEventAlreadyEndedError) if actually_ended else nullcontext(): + async for __ in ak.rest_of_touch_moves(w, t, timeout=timeout): pass w = Widget() t = UnitTestTouch(0, 0) t.time_end = 1 # something other than -1 - task = ak.start(job(w, t)) + task = ak.start(async_fn(w, t)) if actually_ended: - time.sleep(2.) - Clock.tick() + sleep_then_tick(timeout) else: t.grab_current = None w.dispatch('on_touch_up', t) diff --git a/tests/test_watch_touch.py b/tests/test_watch_touch.py new file mode 100644 index 0000000..bfe44aa --- /dev/null +++ b/tests/test_watch_touch.py @@ -0,0 +1,141 @@ +import pytest + + +@pytest.mark.parametrize('n_touch_moves', [0, 1, 10]) +def test_a_number_of_touch_moves(n_touch_moves): + from kivy.uix.widget import Widget + from kivy.tests.common import UnitTestTouch + import asynckivy as ak + + async def async_fn(w, t): + n = 0 + async with ak.watch_touch(w, t) as is_touch_move: + while await is_touch_move(): + n += 1 + assert n == n_touch_moves + + w = Widget() + t = UnitTestTouch(0, 0) + task = ak.start(async_fn(w, t)) + for __ in range(n_touch_moves): + t.grab_current = None + w.dispatch('on_touch_move', t) + t.grab_current = w + w.dispatch('on_touch_move', t) + t.grab_current = None + w.dispatch('on_touch_up', t) + t.grab_current = w + w.dispatch('on_touch_up', t) + assert task.done + + +def test_stop_watching_before_touch_ends(): + from kivy.uix.widget import Widget + from kivy.tests.common import UnitTestTouch + import asynckivy as ak + + async def async_fn(w, t): + import weakref + nonlocal n_touch_moves + weak_w = weakref.ref(w) + assert weak_w not in t.grab_list + async with ak.watch_touch(w, t) as is_touch_move: + while await is_touch_move(): + assert weak_w in t.grab_list + n_touch_moves += 1 + if n_touch_moves == 2: + break + assert weak_w not in t.grab_list + await ak.event(w, 'on_touch_up') + + n_touch_moves = 0 + w = Widget() + t = UnitTestTouch(0, 0) + task = ak.start(async_fn(w, t)) + for expected in (1, 2, 2, ): + t.grab_current = None + w.dispatch('on_touch_move', t) + t.grab_current = w + w.dispatch('on_touch_move', t) + assert n_touch_moves == expected + assert not task.done + t.grab_current = None + w.dispatch('on_touch_up', t) + t.grab_current = w + w.dispatch('on_touch_up', t) + assert n_touch_moves == 2 + assert task.done + + +@pytest.mark.parametrize( + 'stop_dispatching, expectation', [ + (True, [0, 0, 0, ], ), + (False, [1, 2, 1, ], ), + ]) +def test_stop_dispatching(stop_dispatching, expectation): + from kivy.uix.widget import Widget + from kivy.tests.common import UnitTestTouch + import asynckivy as ak + + async def async_fn(parent, t): + async with ak.watch_touch(parent, t, stop_dispatching=stop_dispatching) as is_touch_move: + while await is_touch_move(): + pass + + n_touches = {'move': 0, 'up': 0, } + def on_touch_move(*args): + n_touches['move'] += 1 + def on_touch_up(*args): + n_touches['up'] += 1 + + parent = Widget() + child = Widget( + on_touch_move=on_touch_move, + on_touch_up=on_touch_up, + ) + parent.add_widget(child) + t = UnitTestTouch(0, 0) + task = ak.start(async_fn(parent, t)) + + for i in range(2): + t.grab_current = None + parent.dispatch('on_touch_move', t) + t.grab_current = parent + parent.dispatch('on_touch_move', t) + assert n_touches['move'] == expectation[i] + t.grab_current = None + parent.dispatch('on_touch_up', t) + t.grab_current = parent + parent.dispatch('on_touch_up', t) + assert n_touches['up'] == expectation[2] + assert task.done + + +@pytest.mark.parametrize('timeout', (.2, 1.)) +@pytest.mark.parametrize('actually_ended', (True, False)) +def test_a_touch_that_might_have_already_ended(sleep_then_tick, timeout, actually_ended): + from contextlib import nullcontext + from kivy.uix.widget import Widget + from kivy.tests.common import UnitTestTouch + import asynckivy as ak + from asynckivy.exceptions import MotionEventAlreadyEndedError + + async def async_fn(w, t): + with pytest.raises(MotionEventAlreadyEndedError) if actually_ended else nullcontext(): + async with ak.watch_touch(w, t, timeout=timeout) as is_touch_move: + while await is_touch_move(): + pass + + w = Widget() + t = UnitTestTouch(0, 0) + t.time_end = 1 # something other than -1 + task = ak.start(async_fn(w, t)) + + if actually_ended: + sleep_then_tick(timeout) + else: + t.grab_current = None + w.dispatch('on_touch_up', t) + t.grab_current = w + w.dispatch('on_touch_up', t) + assert task.done