From e21145e0caf8c58f8f90b2451af715dafe483822 Mon Sep 17 00:00:00 2001 From: Tom Whitwell Date: Mon, 29 Nov 2021 17:26:37 +0000 Subject: [PATCH] Change to use asyncio rather than threading This allows event-driven things to be far more easily driven (ie. pulsectl) Also, Rather than defining a source / sync for the mute buttons, use the current default one - how often do you need to immediately mute a device that's not in use? --- .gitignore | 1 + .python-version | 1 + devdeck/controls/clock_control.py | 25 ++--- devdeck/controls/command_control.py | 3 +- devdeck/controls/mic_mute_control.py | 72 ++++++++------ devdeck/controls/name_list_control.py | 2 +- devdeck/controls/timer_control.py | 98 +++++++++++-------- devdeck/controls/volume_level_control.py | 15 ++- devdeck/controls/volume_mute_control.py | 86 +++++++++------- devdeck/decks/single_page_deck_controller.py | 1 + devdeck/main.py | 65 +++++++----- requirements.txt | 2 +- .../devdeck/controls/test_command_control.py | 6 +- .../controls/test_name_list_control.py | 24 +++-- tests/devdeck/controls/test_timer_control.py | 28 ++++-- .../devdeck/settings/test_devdeck_settings.py | 1 - tests/devdeck/test_deck_manager.py | 2 +- 17 files changed, 262 insertions(+), 170 deletions(-) create mode 100644 .python-version diff --git a/.gitignore b/.gitignore index 7179f70..43fbf92 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ __pycache__/ htmlcov/ venv/ +.vscode diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..30291cb --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.10.0 diff --git a/devdeck/controls/clock_control.py b/devdeck/controls/clock_control.py index be59a37..81b4e3e 100644 --- a/devdeck/controls/clock_control.py +++ b/devdeck/controls/clock_control.py @@ -1,24 +1,23 @@ +import asyncio +from asyncio.events import get_event_loop import threading from datetime import datetime -from time import sleep +from asyncio import sleep from devdeck_core.controls.deck_control import DeckControl class ClockControl(DeckControl): - def __init__(self, key_no, **kwargs): + def __init__(self, key_no: int, **kwargs): + self.loop = get_event_loop() super().__init__(key_no, **kwargs) - self.thread = None - self.running = False def initialize(self): - self.thread = threading.Thread(target=self._update_display) - self.running = True - self.thread.start() + self.loop.create_task(self._update_display()) - def _update_display(self): - while self.running is True: + async def _update_display(self): + while True: with self.deck_context() as context: now = datetime.now() @@ -33,10 +32,4 @@ def _update_display(self): .center_vertically(100) \ .font_size(75) \ .end() - sleep(1) - - def dispose(self): - self.running = False - if self.thread: - self.thread.join() - + await sleep(1) diff --git a/devdeck/controls/command_control.py b/devdeck/controls/command_control.py index fdbabe4..9c47372 100644 --- a/devdeck/controls/command_control.py +++ b/devdeck/controls/command_control.py @@ -1,3 +1,4 @@ +import asyncio import logging import os from subprocess import Popen, DEVNULL @@ -19,4 +20,4 @@ def pressed(self): try: Popen(self.settings['command'], stdout=DEVNULL, stderr=DEVNULL) except Exception as ex: - self.__logger.error("Error executing command %s: %s", self.settings['command'], str(ex)) \ No newline at end of file + self.__logger.error("Error executing command %s: %s", self.settings['command'], str(ex)) diff --git a/devdeck/controls/mic_mute_control.py b/devdeck/controls/mic_mute_control.py index c9f4c8f..e5393e9 100644 --- a/devdeck/controls/mic_mute_control.py +++ b/devdeck/controls/mic_mute_control.py @@ -1,7 +1,10 @@ +from asyncio import sleep +from asyncio.events import get_event_loop import logging import os -from pulsectl import pulsectl +import pulsectl +import pulsectl_asyncio from devdeck_core.controls.deck_control import DeckControl @@ -9,39 +12,53 @@ class MicMuteControl(DeckControl): def __init__(self, key_no, **kwargs): + self.loop = get_event_loop() self.pulse = None self.__logger = logging.getLogger('devdeck') super().__init__(key_no, **kwargs) - def initialize(self): + async def _init(self): if self.pulse is None: - self.pulse = pulsectl.Pulse('MicMuteControl') - self.__render_icon() + self.pulse = pulsectl_asyncio.PulseAsync('MicMuteControl') + await self.pulse.connect() + self.loop.create_task(self._update_display()) + self.loop.create_task(self._listen_mute()) + + async def _listen_mute(self): + async for event in self.pulse.subscribe_events('source'): + if event.t == pulsectl.PulseEventTypeEnum.change: + await self._update_display() + + def initialize(self): + self.loop.create_task(self._init()) def pressed(self): - mic = self.__get_mic() - if mic is None: - return - self.pulse.source_mute(mic.index, mute=(not mic.mute)) - self.__render_icon() + self.loop.create_task(self._handle_mute()) - def __get_mic(self): - sources = self.pulse.source_list() + async def _handle_mute(self): + mic = await self._get_source() + await self.pulse.source_mute(mic.index, mute=(not mic.mute)) + await self._update_display() - selected_mic = [mic for mic in sources if mic.description == self.settings['microphone']] - if len(selected_mic) == 0: - possible_mics = [output.description for output in sources] - self.__logger.warning("Microphone '%s' not found in list of possible inputs:\n%s", - self.settings['microphone'], - '\n'.join(possible_mics)) - return None - return selected_mic[0] + async def _get_source(self): + sources = await self.pulse.source_list() + server_info = await self.pulse.server_info() + default_source_name = server_info.default_source_name + return next((source for source in sources if source.name == default_source_name), None) - def __render_icon(self): + async def _update_display(self): with self.deck_context() as context: - mic = self.__get_mic() - if mic is None: - with context.renderer() as r: + mic = await self._get_source() + with context.renderer() as r: + try: + match mic.mute: + case 0: + r.image(os.path.join(os.path.dirname(__file__), + "../assets/font-awesome", 'microphone.png')).end() + case 1: + r.image(os.path.join(os.path.dirname(__file__), + "../assets/font-awesome", 'microphone-mute.png')).end() + except AttributeError: r \ .text('MIC \nNOT FOUND') \ .color('red') \ @@ -50,17 +67,10 @@ def __render_icon(self): .font_size(85) \ .text_align('center') \ .end() - return - if mic.mute == 0: - with context.renderer() as r: - r.image(os.path.join(os.path.dirname(__file__), "../assets/font-awesome", 'microphone.png')).end() - else: - with context.renderer() as r: - r.image(os.path.join(os.path.dirname(__file__), "../assets/font-awesome", 'microphone-mute.png')).end() def settings_schema(self): return { 'microphone': { 'type': 'string' } - } \ No newline at end of file + } diff --git a/devdeck/controls/name_list_control.py b/devdeck/controls/name_list_control.py index ff0874b..5a64461 100644 --- a/devdeck/controls/name_list_control.py +++ b/devdeck/controls/name_list_control.py @@ -1,8 +1,8 @@ +import asyncio import os from devdeck_core.controls.deck_control import DeckControl - class NameListControl(DeckControl): def __init__(self, key_no, **kwargs): diff --git a/devdeck/controls/timer_control.py b/devdeck/controls/timer_control.py index 5af7a96..b7c0630 100644 --- a/devdeck/controls/timer_control.py +++ b/devdeck/controls/timer_control.py @@ -1,62 +1,80 @@ -import datetime +from asyncio.events import get_event_loop +from datetime import datetime import os -import threading +import enum +import asyncio from time import sleep from devdeck_core.controls.deck_control import DeckControl +class TimerState(enum.Enum): + RUNNING = 1 + STOPPED = 2 + RESET = 3 + + class TimerControl(DeckControl): def __init__(self, key_no, **kwargs): - self.start_time = None - self.end_time = None - self.thread = None - super().__init__(key_no, **kwargs) + self.loop = get_event_loop() + self.start_time: datetime = None + self.end_time: datetime = None + self.state = TimerState.RESET + super().__init__(key_no, ** kwargs) def initialize(self): + self.loop.create_task(self._update_display()) with self.deck_context() as context: with context.renderer() as r: - r.image(os.path.join(os.path.dirname(__file__), "../assets/font-awesome", 'stopwatch.png')).end() + r.image(os.path.join(os.path.dirname(__file__), + "../assets/font-awesome", 'stopwatch.png')).end() def pressed(self): - if self.start_time is None: - self.start_time = datetime.datetime.now() - self.thread = threading.Thread(target=self._update_display) - self.thread.start() - elif self.end_time is None: - self.end_time = datetime.datetime.now() - self.thread.join() - with self.deck_context() as context: - with context.renderer() as r: - r.text(TimerControl.time_diff_to_str(self.end_time - self.start_time))\ - .font_size(120)\ - .color('red')\ - .center_vertically().center_horizontally().end() - else: - self.start_time = None - self.end_time = None - with self.deck_context() as context: - with context.renderer() as r: - r.image(os.path.join( - os.path.join(os.path.dirname(__file__), "../assets/font-awesome", 'stopwatch.png'))).end() - - def _update_display(self): - while self.end_time is None: - if self.start_time is None: - sleep(1) - continue - cutoff = datetime.datetime.now() if self.end_time is None else self.end_time + match self.state: + case TimerState.RESET: + self.start_time = datetime.now() + self.end_time = None + self.state = TimerState.RUNNING + case TimerState.RUNNING: + if not self.start_time: + raise Exception("how did you get here?") + self.end_time = datetime.now() + self.state = TimerState.STOPPED + case TimerState.STOPPED: + self.start_time = self.end_time = None + self.state = TimerState.RESET + + async def _update_display(self, repeat=True): + while True: with self.deck_context() as context: with context.renderer() as r: - r.text(TimerControl.time_diff_to_str(cutoff - self.start_time)) \ - .font_size(120) \ - .center_vertically().center_horizontally().end() - sleep(1) + match self.state: + case TimerState.RUNNING: + r.text(TimerControl.time_diff_to_str(datetime.now() - self.start_time))\ + .font_size(120)\ + .color('red')\ + .center_vertically().center_horizontally().end() + case TimerState.STOPPED: + r.text(TimerControl.time_diff_to_str(self.end_time - self.start_time))\ + .font_size(120)\ + .color('yellow')\ + .center_vertically().center_horizontally().end() + case _: + r.image(os.path.join( + os.path.join(os.path.dirname(__file__), "../assets/font-awesome", 'stopwatch.png'))).end() + if repeat: + await asyncio.sleep(0.1) + else: + return @staticmethod def time_diff_to_str(diff): - seconds = diff.total_seconds() - minutes, seconds = divmod(seconds, 60) + total_seconds = diff.total_seconds() + minutes, seconds = divmod(total_seconds, 60) hours, minutes = divmod(minutes, 60) + if total_seconds < 60: + return f'{int(seconds):02d}' + elif total_seconds < 3600: + return f'{int(minutes):02d}:{int(seconds):02d}' return f'{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}' diff --git a/devdeck/controls/volume_level_control.py b/devdeck/controls/volume_level_control.py index 5ba5885..dca27a4 100644 --- a/devdeck/controls/volume_level_control.py +++ b/devdeck/controls/volume_level_control.py @@ -1,3 +1,5 @@ +import asyncio +from asyncio.events import get_event_loop import logging import os @@ -8,11 +10,12 @@ class VolumeLevelControl(DeckControl): - def __init__(self, key_no, **kwargs): + def __init__(self, key_no, **kwargs): + self.loop = get_event_loop(), self.pulse = None self.volume = None self.__logger = logging.getLogger('devdeck') - super().__init__(key_no, **kwargs) + super().__init__(key_no, ** kwargs) def initialize(self): if self.pulse is None: @@ -29,10 +32,12 @@ def pressed(self): def __get_output(self): sinks = self.pulse.sink_list() - selected_output = [output for output in sinks if output.description == self.settings['output']] + selected_output = [ + output for output in sinks if output.description == self.settings['output']] if len(selected_output) == 0: possible_ouputs = [output.description for output in sinks] - self.__logger.warning("Output '%s' not found in list of possible outputs:\n%s", self.settings['output'], '\n'.join(possible_ouputs)) + self.__logger.warning("Output '%s' not found in list of possible outputs:\n%s", + self.settings['output'], '\n'.join(possible_ouputs)) return None return selected_output[0] @@ -72,4 +77,4 @@ def settings_schema(self): 'volume': { 'type': 'integer' } - } \ No newline at end of file + } diff --git a/devdeck/controls/volume_mute_control.py b/devdeck/controls/volume_mute_control.py index a7d05cf..a3d261b 100644 --- a/devdeck/controls/volume_mute_control.py +++ b/devdeck/controls/volume_mute_control.py @@ -1,7 +1,10 @@ +import asyncio +from asyncio.events import get_event_loop import logging import os -from pulsectl import pulsectl +import pulsectl +import pulsectl_asyncio from devdeck_core.controls.deck_control import DeckControl @@ -9,47 +12,58 @@ class VolumeMuteControl(DeckControl): def __init__(self, key_no, **kwargs): + self.loop = get_event_loop() self.pulse = None self.__logger = logging.getLogger('devdeck') super().__init__(key_no, **kwargs) + async def _init(self): + if self.pulse is None: + self.pulse = pulsectl_asyncio.PulseAsync('VolumeMuteControl') + await self.pulse.connect() + self.loop.create_task(self._update_display()) + self.loop.create_task(self._listen_mute()) + + async def _listen_mute(self): + async for event in self.pulse.subscribe_events('sink'): + if event.t == pulsectl.PulseEventTypeEnum.change: + await self._update_display() + def initialize(self): - self.pulse = pulsectl.Pulse() - self.__render_icon() + self.loop.create_task(self._init()) def pressed(self): - output = self.__get_output() - if output is None: - return - self.pulse.sink_mute(output.index, mute=(not output.mute)) - self.__render_icon() - - def __get_output(self): - sinks = self.pulse.sink_list() - selected_output = [output for output in sinks if output.description == self.settings['output']] - if len(selected_output) == 0: - possible_ouputs = [output.description for output in sinks] - self.__logger.warning("Output '%s' not found in list of possible outputs:\n%s", self.settings['output'], '\n'.join(possible_ouputs)) - return None - return selected_output[0] - - def __render_icon(self): + self.loop.create_task(self._handle_mute()) + + async def _handle_mute(self): + output = await self._get_sink() + await self.pulse.sink_mute(output.index, mute=(not output.mute)) + await self._update_display() + + async def _get_sink(self): + outputs = await self.pulse.sink_list() + server_info = await self.pulse.server_info() + default_sink_name = server_info.default_sink_name + return next((output for output in outputs if output.name == default_sink_name), None) + + async def _update_display(self): with self.deck_context() as context: - sink = self.__get_output() - if sink is None: - with context.renderer() as r: - r\ - .text('OUTPUT \nNOT FOUND')\ - .color('red')\ - .center_vertically()\ - .center_horizontally()\ - .font_size(85)\ - .text_align('center')\ + sink = await self._get_sink() + with context.renderer() as r: + try: + match sink.mute: + case 0: + r.image(os.path.join(os.path.dirname(__file__), + "../assets/font-awesome", 'volume-up-solid.png')).end() + case 1: + r.image(os.path.join(os.path.dirname(__file__), + "../assets/font-awesome", 'volume-off-solid.png')).end() + except AttributeError: + r \ + .text('OUTPUT \nNOT FOUND') \ + .color('red') \ + .center_vertically() \ + .center_horizontally() \ + .font_size(85) \ + .text_align('center') \ .end() - return - if sink.mute == 0: - with context.renderer() as r: - r.image(os.path.join(os.path.dirname(__file__), "../assets/font-awesome", 'volume-up-solid.png')).end() - else: - with context.renderer() as r: - r.image(os.path.join(os.path.dirname(__file__), "../assets/font-awesome", 'volume-off-solid.png')).end() diff --git a/devdeck/decks/single_page_deck_controller.py b/devdeck/decks/single_page_deck_controller.py index 78b1c03..b13f075 100644 --- a/devdeck/decks/single_page_deck_controller.py +++ b/devdeck/decks/single_page_deck_controller.py @@ -1,3 +1,4 @@ +import asyncio import logging import os diff --git a/devdeck/main.py b/devdeck/main.py index 1747404..a614912 100644 --- a/devdeck/main.py +++ b/devdeck/main.py @@ -1,7 +1,8 @@ import logging import os import sys -import threading +import signal +import asyncio from logging.handlers import RotatingFileHandler from pathlib import Path @@ -12,14 +13,19 @@ from devdeck.settings.devdeck_settings import DevDeckSettings from devdeck.settings.validation_error import ValidationError +deck = None +deck_manager = None +loop = asyncio.new_event_loop() -def main(): +async def main(): + global deck, deck_manager os.makedirs(os.path.join(str(Path.home()), '.devdeck'), exist_ok=True) root = logging.getLogger('devdeck') root.setLevel(logging.DEBUG) - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s') info_handler = logging.StreamHandler(sys.stdout) info_handler.setLevel(logging.INFO) @@ -39,17 +45,19 @@ def main(): streamdecks = DeviceManager().enumerate() - settings_filename = os.path.join(str(Path.home()), '.devdeck', 'settings.yml') + settings_filename = os.path.join( + str(Path.home()), '.devdeck', 'settings.yml') if not os.path.exists(settings_filename): root.warning("No settings file detected!") serial_numbers = [] - for index, deck in enumerate(streamdecks): + for _, deck in enumerate(streamdecks): deck.open() serial_numbers.append(deck.get_serial_number()) deck.close() if len(serial_numbers) > 0: - root.info("Generating a setting file as none exist: %s", settings_filename) + root.info("Generating a setting file as none exist: %s", + settings_filename) DevDeckSettings.generate_default(settings_filename, serial_numbers) else: root.info("""No stream deck connected. Please connect a stream deck to generate an initial config file. \n @@ -62,13 +70,15 @@ def main(): except ValidationError as validation_error: print(validation_error) - for index, deck in enumerate(streamdecks): + for _, deck in enumerate(streamdecks): deck.open() - root.info('Connecting to deck: %s (S/N: %s)', deck.id(), deck.get_serial_number()) + root.info('Connecting to deck: %s (S/N: %s)', + deck.id(), deck.get_serial_number()) deck_settings = settings.deck(deck.get_serial_number()) if deck_settings is None: - root.info("Skipping deck %s (S/N: %s) - no settings present", deck.id(), deck.get_serial_number()) + root.info("Skipping deck %s (S/N: %s) - no settings present", + deck.id(), deck.get_serial_number()) deck.close() continue @@ -77,21 +87,32 @@ def main(): # Instantiate deck main_deck = deck_settings.deck_class()(None, **deck_settings.settings()) deck_manager.set_active_deck(main_deck) - - for t in threading.enumerate(): - if t is threading.currentThread(): - continue - - if t.is_alive(): - try: - t.join() - except KeyboardInterrupt as ex: - deck_manager.close() - deck.close() - if len(streamdecks) == 0: root.info("No streamdecks detected, exiting.") +def handle_shutdown(_signo=None, _stack_frame=None): + def shutdown_exception_handler(loop, context): + if "exception" not in context \ + or not isinstance(context["exception"], asyncio.CancelledError): + loop.default_exception_handler(context) + loop.set_exception_handler(shutdown_exception_handler) + tasks = asyncio.gather(*asyncio.all_tasks(loop)) + tasks.add_done_callback(lambda t: loop.stop()) + tasks.cancel() + while not tasks.done() and not loop.is_closed(): + loop.run_forever() + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() if __name__ == '__main__': - main() + signal.signal(signal.SIGTERM, handle_shutdown) + loop.create_task(main(), name="main") + try: + sys.exit(loop.run_forever()) + except KeyboardInterrupt: + print("Attempting graceful shutdown, press Ctrl+C again to exit…", flush=True) + handle_shutdown() + finally: + deck_manager.close() + deck.close() + \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index ec2d440..a271a87 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ devdeck-core==1.0.7 emoji jsonschema pillow -pulsectl +pulsectl-asyncio pytest pyyaml requests diff --git a/tests/devdeck/controls/test_command_control.py b/tests/devdeck/controls/test_command_control.py index 6984074..0538c6d 100644 --- a/tests/devdeck/controls/test_command_control.py +++ b/tests/devdeck/controls/test_command_control.py @@ -1,3 +1,6 @@ +import asyncio +import pytest + from devdeck_core.mock_deck_context import mock_context, assert_rendered from devdeck.controls.command_control import CommandControl @@ -6,7 +9,8 @@ class TestCommandControl: def test_initialize_sets_icon(self): - control = CommandControl(0, **{'icon': TestingUtils.get_filename('test-icon.png')}) + control = CommandControl( + 0, **{'icon': TestingUtils.get_filename('test-icon.png')}) with mock_context(control) as ctx: control.initialize() assert_rendered(ctx, TestingUtils.get_filename('test-icon.png')) diff --git a/tests/devdeck/controls/test_name_list_control.py b/tests/devdeck/controls/test_name_list_control.py index 5e23c32..11a2573 100644 --- a/tests/devdeck/controls/test_name_list_control.py +++ b/tests/devdeck/controls/test_name_list_control.py @@ -1,4 +1,6 @@ +import asyncio from assertpy import assert_that +import pytest from devdeck_core.renderer import Renderer from devdeck.controls.name_list_control import NameListControl @@ -6,7 +8,6 @@ from tests.testing_utils import TestingUtils - class TestNameListControl: def test_initialize_sets_icon_and_resets_name_index(self): name_list_control = NameListControl(0) @@ -14,30 +15,37 @@ def test_initialize_sets_icon_and_resets_name_index(self): with mock_context(name_list_control) as ctx: name_list_control.initialize() assert_that(name_list_control.name_index).is_equal_to(0) - assert_rendered(ctx, TestingUtils.get_filename('../devdeck/assets/font-awesome/users.png')) + assert_rendered(ctx, TestingUtils.get_filename( + '../devdeck/assets/font-awesome/users.png')) def test_pressed_iterates_initials(self): - settings = {'names': ['Sarah Mcgrath', 'Eduardo Sanders', 'Ellis Banks']} + settings = {'names': ['Sarah Mcgrath', + 'Eduardo Sanders', 'Ellis Banks']} name_list_control = NameListControl(0, **settings) with mock_context(name_list_control) as ctx: # Initial state assert_that(name_list_control.name_index).is_equal_to(0) name_list_control.pressed() - assert_rendered(ctx, Renderer().text('SM').font_size(256).center_vertically().center_horizontally().end()) + assert_rendered(ctx, Renderer().text('SM').font_size( + 256).center_vertically().center_horizontally().end()) assert_that(name_list_control.name_index).is_equal_to(1) name_list_control.pressed() - assert_rendered(ctx, Renderer().text('ES').font_size(256).center_vertically().center_horizontally().end()) + assert_rendered(ctx, Renderer().text('ES').font_size( + 256).center_vertically().center_horizontally().end()) assert_that(name_list_control.name_index).is_equal_to(2) name_list_control.pressed() - assert_rendered(ctx, Renderer().text('EB').font_size(256).center_vertically().center_horizontally().end()) + assert_rendered(ctx, Renderer().text('EB').font_size( + 256).center_vertically().center_horizontally().end()) assert_that(name_list_control.name_index).is_equal_to(3) name_list_control.pressed() - assert_rendered(ctx, TestingUtils.get_filename('../devdeck/assets/font-awesome/users.png')) + assert_rendered(ctx, TestingUtils.get_filename( + '../devdeck/assets/font-awesome/users.png')) name_list_control.pressed() - assert_rendered(ctx, Renderer().text('SM').font_size(256).center_vertically().center_horizontally().end()) + assert_rendered(ctx, Renderer().text('SM').font_size( + 256).center_vertically().center_horizontally().end()) assert_that(name_list_control.name_index).is_equal_to(1) diff --git a/tests/devdeck/controls/test_timer_control.py b/tests/devdeck/controls/test_timer_control.py index 3cf8f57..86f2804 100644 --- a/tests/devdeck/controls/test_timer_control.py +++ b/tests/devdeck/controls/test_timer_control.py @@ -1,29 +1,41 @@ +import asyncio from datetime import datetime from assertpy import assert_that +import pytest -from devdeck.controls.timer_control import TimerControl +from devdeck.controls.timer_control import TimerControl, TimerState from devdeck_core.mock_deck_context import mock_context, assert_rendered -from tests.testing_utils import TestingUtils +from tests.testing_utils import TestingUtils class TestTimerControl: - def test_initialize_sets_icon_and_resets_name_index(self): + def test_initialize_sets_icon(self): timer_control = TimerControl(0) with mock_context(timer_control) as ctx: timer_control.initialize() - assert_rendered(ctx, TestingUtils.get_filename('../devdeck/assets/font-awesome/stopwatch.png')) + assert_rendered(ctx, TestingUtils.get_filename( + '../devdeck/assets/font-awesome/stopwatch.png')) def test_initial_state(self): timer_control = TimerControl(0) assert_that(timer_control.start_time).is_none() assert_that(timer_control.end_time).is_none() + assert_that(timer_control.state).is_equal_to(TimerState.RESET) def test_time_diff_to_str(self): start_time = datetime(2020, 12, 22, 10, 48, 0, 0) + end_time = datetime(2020, 12, 22, 10, 48, 3, 0) + assert_that(TimerControl.time_diff_to_str( + end_time - start_time)).is_equal_to("03") + end_time = datetime(2020, 12, 22, 10, 50, 3, 0) + assert_that(TimerControl.time_diff_to_str( + end_time - start_time)).is_equal_to("02:03") end_time = datetime(2020, 12, 22, 11, 50, 3, 0) - assert_that(TimerControl.time_diff_to_str(end_time - start_time)).is_equal_to("01:02:03") + assert_that(TimerControl.time_diff_to_str( + end_time - start_time)).is_equal_to("01:02:03") + def test_pressed_transitions_state_correctly(self): timer_control = TimerControl(0) @@ -32,14 +44,18 @@ def test_pressed_transitions_state_correctly(self): timer_control.pressed() assert_that(timer_control.start_time).is_not_none() assert_that(timer_control.end_time).is_none() + assert_that(timer_control.state).is_equal_to(TimerState.RUNNING) # Time ends timer_control.pressed() assert_that(timer_control.start_time).is_not_none() assert_that(timer_control.end_time).is_not_none() - assert_that(timer_control.end_time).is_greater_than_or_equal_to(timer_control.start_time) + assert_that(timer_control.end_time).is_greater_than_or_equal_to( + timer_control.start_time) + assert_that(timer_control.state).is_equal_to(TimerState.STOPPED) # Timer resets state timer_control.pressed() assert_that(timer_control.start_time).is_none() assert_that(timer_control.end_time).is_none() + assert_that(timer_control.state).is_equal_to(TimerState.RESET) diff --git a/tests/devdeck/settings/test_devdeck_settings.py b/tests/devdeck/settings/test_devdeck_settings.py index c56e391..ded185c 100644 --- a/tests/devdeck/settings/test_devdeck_settings.py +++ b/tests/devdeck/settings/test_devdeck_settings.py @@ -6,7 +6,6 @@ from devdeck.settings.validation_error import ValidationError from tests.testing_utils import TestingUtils - class TestDevDeckSettings: def test_empty_config(self): devdeck_settings = DevDeckSettings.load( diff --git a/tests/devdeck/test_deck_manager.py b/tests/devdeck/test_deck_manager.py index ab9acea..720562c 100644 --- a/tests/devdeck/test_deck_manager.py +++ b/tests/devdeck/test_deck_manager.py @@ -57,4 +57,4 @@ def test_key_callback_propogates_to_active_deck(self, first_mock_deck): # Released dev_deck.key_callback(first_mock_deck, 23, False) - first_mock_deck.released.called_pnce_with(23) \ No newline at end of file + first_mock_deck.released.called_pnce_with(23)