diff --git a/.gitignore b/.gitignore index 35fa75659..4283fcb0f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,18 @@ +# Ignore version file .gitversion + +# Ignore macOS DS_Store files .DS_Store + +# Ignore build artifacts build -ChangeLog.html +dist.win32/ dist.* + +# Ignore generated ChangeLog.html file +ChangeLog.html + +# Ignore files dump *.bak *.pyc @@ -11,20 +21,37 @@ dump *.pdb *.msi *.wixobj +*.zip + +# Ignore Update Things EDMarketConnector_Installer_*.exe appcast_win_*.xml appcast_mac_*.xml -EDMarketConnector.VisualElementsManifest.xml -*.zip EDMC_Installer_Config.iss +EDMarketConnector.wxs +wix/components.wxs +# Ignore Visual Elements Manifest file for Windows +EDMarketConnector.VisualElementsManifest.xml + +# Ignore IDE and editor configuration files .idea .vscode + +# Ignore virtual environments .venv/ venv/ +venv2 + +# Ignore workspace file for Visual Studio Code *.code-workspace + +# Ignore coverage reports htmlcov/ .ignored .coverage -EDMarketConnector.wxs -wix/components.wxs +pylintrc +pylint.txt + +# Ignore Submodule data directory +coriolis-data/ diff --git a/config/__init__.py b/config/__init__.py index a31cdd89e..ca964274e 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -1,11 +1,15 @@ """ -Code dealing with the configuration of the program. +__init__.py - Code dealing with the configuration of the program. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. Windows uses the Registry to store values in a flat manner. Linux uses a file, but for commonality it's still a flat data structure. macOS uses a 'defaults' object. """ - +from __future__ import annotations __all__ = [ # defined in the order they appear in the file @@ -40,10 +44,8 @@ import traceback import warnings from abc import abstractmethod -from typing import Any, Callable, Optional, Type, TypeVar - +from typing import Any, Callable, Type, TypeVar import semantic_version - from constants import GITVERSION_FILE, applongname, appname # Any of these may be imported by plugins @@ -52,9 +54,9 @@ # # Major.Minor.Patch(-prerelease)(+buildmetadata) # NB: Do *not* import this, use the functions appversion() and appversion_nobuild() -_static_appversion = '5.9.5' +_static_appversion = '5.10.0-alpha0' -_cached_version: Optional[semantic_version.Version] = None +_cached_version: semantic_version.Version | None = None copyright = '© 2015-2019 Jonathan Harris, 2020-2023 EDCD' update_feed = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/edmarketconnector.xml' @@ -66,7 +68,7 @@ trace_on: list[str] = [] capi_pretend_down: bool = False -capi_debug_access_token: Optional[str] = None +capi_debug_access_token: str | None = None # This must be done here in order to avoid an import cycle with EDMCLogging. # Other code should use EDMCLogging.get_main_logger if os.getenv("EDMC_NO_UI"): @@ -79,7 +81,6 @@ _T = TypeVar('_T') -########################################################################### def git_shorthash_from_head() -> str: """ Determine short hash for current git HEAD. @@ -91,13 +92,14 @@ def git_shorthash_from_head() -> str: shorthash: str = None # type: ignore try: - git_cmd = subprocess.Popen('git rev-parse --short HEAD'.split(), - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT - ) + git_cmd = subprocess.Popen( + "git rev-parse --short HEAD".split(), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) out, err = git_cmd.communicate() - except Exception as e: + except subprocess.CalledProcessError as e: logger.info(f"Couldn't run git command for short hash: {e!r}") else: @@ -131,7 +133,7 @@ def appversion() -> semantic_version.Version: if getattr(sys, 'frozen', False): # Running frozen, so we should have a .gitversion file # Yes, .parent because if frozen we're inside library.zip - with open(pathlib.Path(sys.path[0]).parent / GITVERSION_FILE, 'r', encoding='utf-8') as gitv: + with open(pathlib.Path(sys.path[0]).parent / GITVERSION_FILE, encoding='utf-8') as gitv: shorthash = gitv.read() else: @@ -157,11 +159,14 @@ def appversion_nobuild() -> semantic_version.Version: :return: App version without any build meta data. """ return appversion().truncate('prerelease') -########################################################################### class AbstractConfig(abc.ABC): - """Abstract root class of all platform specific Config implementations.""" + """ + Abstract root class of all platform specific Config implementations. + + Commented lines are no longer supported or replaced. + """ OUT_EDDN_SEND_STATION_DATA = 1 # OUT_MKT_BPC = 2 # No longer supported @@ -185,7 +190,6 @@ class AbstractConfig(abc.ABC): respath_path: pathlib.Path home_path: pathlib.Path default_journal_dir_path: pathlib.Path - identifier: str __in_shutdown = False # Is the application currently shutting down ? @@ -241,7 +245,7 @@ def set_eddn_url(self, eddn_url: str): self.__eddn_url = eddn_url @property - def eddn_url(self) -> Optional[str]: + def eddn_url(self) -> str | None: """ Provide the custom EDDN URL. @@ -296,14 +300,14 @@ def default_journal_dir(self) -> str: def _suppress_call( func: Callable[..., _T], exceptions: Type[BaseException] | list[Type[BaseException]] = Exception, *args: Any, **kwargs: Any - ) -> Optional[_T]: + ) -> _T | None: if exceptions is None: exceptions = [Exception] if not isinstance(exceptions, list): exceptions = [exceptions] - with contextlib.suppress(*exceptions): # type: ignore # it works fine, mypy + with contextlib.suppress(*exceptions): return func(*args, **kwargs) return None @@ -326,16 +330,16 @@ def get( if (a_list := self._suppress_call(self.get_list, ValueError, key, default=None)) is not None: return a_list - elif (a_str := self._suppress_call(self.get_str, ValueError, key, default=None)) is not None: + if (a_str := self._suppress_call(self.get_str, ValueError, key, default=None)) is not None: return a_str - elif (a_bool := self._suppress_call(self.get_bool, ValueError, key, default=None)) is not None: + if (a_bool := self._suppress_call(self.get_bool, ValueError, key, default=None)) is not None: return a_bool - elif (an_int := self._suppress_call(self.get_int, ValueError, key, default=None)) is not None: + if (an_int := self._suppress_call(self.get_int, ValueError, key, default=None)) is not None: return an_int - return default # type: ignore + return default @abstractmethod def get_list(self, key: str, *, default: list | None = None) -> list: @@ -462,16 +466,15 @@ def get_config(*args, **kwargs) -> AbstractConfig: from .darwin import MacConfig return MacConfig(*args, **kwargs) - elif sys.platform == "win32": # pragma: sys-platform-win32 + if sys.platform == "win32": # pragma: sys-platform-win32 from .windows import WinConfig return WinConfig(*args, **kwargs) - elif sys.platform == "linux": # pragma: sys-platform-linux + if sys.platform == "linux": # pragma: sys-platform-linux from .linux import LinuxConfig return LinuxConfig(*args, **kwargs) - else: # pragma: sys-platform-not-known - raise ValueError(f'Unknown platform: {sys.platform=}') + raise ValueError(f'Unknown platform: {sys.platform=}') config = get_config() diff --git a/config/darwin.py b/config/darwin.py index 895218a89..9c15ec32d 100644 --- a/config/darwin.py +++ b/config/darwin.py @@ -1,13 +1,19 @@ -"""Darwin/macOS implementation of AbstractConfig.""" +""" +darwin.py - Darwin/macOS implementation of AbstractConfig. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" +from __future__ import annotations + import pathlib import sys -from typing import Any, Dict, List, Union - +from typing import Any from Foundation import ( # type: ignore NSApplicationSupportDirectory, NSBundle, NSDocumentDirectory, NSSearchPathForDirectoriesInDomains, NSUserDefaults, NSUserDomainMask ) - from config import AbstractConfig, appname, logger assert sys.platform == 'darwin' @@ -48,14 +54,14 @@ def __init__(self) -> None: self.default_journal_dir_path = support_path / 'Frontier Developments' / 'Elite Dangerous' self._defaults: Any = NSUserDefaults.standardUserDefaults() - self._settings: Dict[str, Union[int, str, list]] = dict( + self._settings: dict[str, int | str | list] = dict( self._defaults.persistentDomainForName_(self.identifier) or {} ) # make writeable if (out_dir := self.get_str('out_dir')) is None or not pathlib.Path(out_dir).exists(): self.set('outdir', NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask, True)[0]) - def __raw_get(self, key: str) -> Union[None, list, str, int]: + def __raw_get(self, key: str) -> None | list | str | int: """ Retrieve the raw data for the given key. @@ -82,7 +88,7 @@ def get_str(self, key: str, *, default: str = None) -> str: """ res = self.__raw_get(key) if res is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default if not isinstance(res, str): raise ValueError(f'unexpected data returned from __raw_get: {type(res)=} {res}') @@ -97,9 +103,9 @@ def get_list(self, key: str, *, default: list = None) -> list: """ res = self.__raw_get(key) if res is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default - elif not isinstance(res, list): + if not isinstance(res, list): raise ValueError(f'__raw_get returned unexpected type {type(res)=} {res!r}') return res @@ -114,7 +120,7 @@ def get_int(self, key: str, *, default: int = 0) -> int: if res is None: return default - elif not isinstance(res, (str, int)): + if not isinstance(res, (str, int)): raise ValueError(f'__raw_get returned unexpected type {type(res)=} {res!r}') try: @@ -122,7 +128,7 @@ def get_int(self, key: str, *, default: int = 0) -> int: except ValueError as e: logger.error(f'__raw_get returned {res!r} which cannot be parsed to an int: {e}') - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default def get_bool(self, key: str, *, default: bool = None) -> bool: """ @@ -132,14 +138,14 @@ def get_bool(self, key: str, *, default: bool = None) -> bool: """ res = self.__raw_get(key) if res is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default - elif not isinstance(res, bool): + if not isinstance(res, bool): raise ValueError(f'__raw_get returned unexpected type {type(res)=} {res!r}') return res - def set(self, key: str, val: Union[int, str, List[str], bool]) -> None: + def set(self, key: str, val: int | str | list[str] | bool) -> None: """ Set the given key's data to the given value. diff --git a/config/linux.py b/config/linux.py index 5d543d3fa..73100800f 100644 --- a/config/linux.py +++ b/config/linux.py @@ -1,9 +1,14 @@ -"""Linux config implementation.""" +""" +linux.py - Linux config implementation. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" import os import pathlib import sys from configparser import ConfigParser - from config import AbstractConfig, appname, logger assert sys.platform == 'linux' diff --git a/config/windows.py b/config/windows.py index f7590a923..7fb53a372 100644 --- a/config/windows.py +++ b/config/windows.py @@ -1,6 +1,12 @@ -"""Windows config implementation.""" +""" +windows.py - Windows config implementation. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" +from __future__ import annotations -# spell-checker: words folderid deps hkey edcd import ctypes import functools import pathlib @@ -8,8 +14,7 @@ import uuid import winreg from ctypes.wintypes import DWORD, HANDLE -from typing import List, Literal, Optional, Union - +from typing import Literal from config import AbstractConfig, applongname, appname, logger, update_interval assert sys.platform == 'win32' @@ -29,7 +34,7 @@ CoTaskMemFree.argtypes = [ctypes.c_void_p] -def known_folder_path(guid: uuid.UUID) -> Optional[str]: +def known_folder_path(guid: uuid.UUID) -> str | None: """Look up a Windows GUID to actual folder path name.""" buf = ctypes.c_wchar_p() if SHGetKnownFolderPath(ctypes.create_string_buffer(guid.bytes_le), 0, 0, ctypes.byref(buf)): @@ -43,7 +48,8 @@ class WinConfig(AbstractConfig): """Implementation of AbstractConfig for Windows.""" def __init__(self, do_winsparkle=True) -> None: - self.app_dir_path = pathlib.Path(str(known_folder_path(FOLDERID_LocalAppData))) / appname + super().__init__() + self.app_dir_path = pathlib.Path(known_folder_path(FOLDERID_LocalAppData)) / appname # type: ignore self.app_dir_path.mkdir(exist_ok=True) self.plugin_dir_path = self.app_dir_path / 'plugins' @@ -52,19 +58,17 @@ def __init__(self, do_winsparkle=True) -> None: if getattr(sys, 'frozen', False): self.respath_path = pathlib.Path(sys.executable).parent self.internal_plugin_dir_path = self.respath_path / 'plugins' - else: self.respath_path = pathlib.Path(__file__).parent.parent self.internal_plugin_dir_path = self.respath_path / 'plugins' self.home_path = pathlib.Path.home() - journal_dir_str = known_folder_path(FOLDERID_SavedGames) - journaldir = pathlib.Path(journal_dir_str) if journal_dir_str is not None else None - self.default_journal_dir_path = None # type: ignore - if journaldir is not None: - self.default_journal_dir_path = journaldir / 'Frontier Developments' / 'Elite Dangerous' + journal_dir_path = pathlib.Path( + known_folder_path(FOLDERID_SavedGames)) / 'Frontier Developments' / 'Elite Dangerous' # type: ignore + self.default_journal_dir_path = journal_dir_path if journal_dir_path.is_dir() else None # type: ignore + REGISTRY_SUBKEY = r'Software\Marginal\EDMarketConnector' # noqa: N806 create_key_defaults = functools.partial( winreg.CreateKeyEx, key=winreg.HKEY_CURRENT_USER, @@ -72,20 +76,18 @@ def __init__(self, do_winsparkle=True) -> None: ) try: - self.__reg_handle: winreg.HKEYType = create_key_defaults( - sub_key=r'Software\Marginal\EDMarketConnector' - ) + self.__reg_handle: winreg.HKEYType = create_key_defaults(sub_key=REGISTRY_SUBKEY) if do_winsparkle: self.__setup_winsparkle() except OSError: - logger.exception('could not create required registry keys') + logger.exception('Could not create required registry keys') raise self.identifier = applongname if (outdir_str := self.get_str('outdir')) is None or not pathlib.Path(outdir_str).is_dir(): docs = known_folder_path(FOLDERID_Documents) - self.set('outdir', docs if docs is not None else self.home) + self.set("outdir", docs if docs is not None else self.home) def __setup_winsparkle(self): """Ensure the necessary Registry keys for WinSparkle are present.""" @@ -94,32 +96,30 @@ def __setup_winsparkle(self): key=winreg.HKEY_CURRENT_USER, access=winreg.KEY_ALL_ACCESS | winreg.KEY_WOW64_64KEY, ) - try: - edcd_handle: winreg.HKEYType = create_key_defaults(sub_key=r'Software\EDCD\EDMarketConnector') - winsparkle_reg: winreg.HKEYType = winreg.CreateKeyEx( - edcd_handle, sub_key='WinSparkle', access=winreg.KEY_ALL_ACCESS | winreg.KEY_WOW64_64KEY - ) + try: + with create_key_defaults(sub_key=r'Software\EDCD\EDMarketConnector') as edcd_handle: + with winreg.CreateKeyEx(edcd_handle, sub_key='WinSparkle', + access=winreg.KEY_ALL_ACCESS | winreg.KEY_WOW64_64KEY) as winsparkle_reg: + # Set WinSparkle defaults - https://github.com/vslavik/winsparkle/wiki/Registry-Settings + UPDATE_INTERVAL_NAME = 'UpdateInterval' # noqa: N806 + CHECK_FOR_UPDATES_NAME = 'CheckForUpdates' # noqa: N806 + REG_SZ = winreg.REG_SZ # noqa: N806 + + winreg.SetValueEx(winsparkle_reg, UPDATE_INTERVAL_NAME, REG_RESERVED_ALWAYS_ZERO, REG_SZ, + str(update_interval)) + + try: + winreg.QueryValueEx(winsparkle_reg, CHECK_FOR_UPDATES_NAME) + except FileNotFoundError: + # Key doesn't exist, set it to a default + winreg.SetValueEx(winsparkle_reg, CHECK_FOR_UPDATES_NAME, REG_RESERVED_ALWAYS_ZERO, REG_SZ, + '1') except OSError: - logger.exception('could not open WinSparkle handle') + logger.exception('Could not open WinSparkle handle') raise - # set WinSparkle defaults - https://github.com/vslavik/winsparkle/wiki/Registry-Settings - winreg.SetValueEx( - winsparkle_reg, 'UpdateInterval', REG_RESERVED_ALWAYS_ZERO, winreg.REG_SZ, str(update_interval) - ) - - try: - winreg.QueryValueEx(winsparkle_reg, 'CheckForUpdates') - - except FileNotFoundError: - # Key doesn't exist, set it to a default - winreg.SetValueEx(winsparkle_reg, 'CheckForUpdates', REG_RESERVED_ALWAYS_ZERO, winreg.REG_SZ, '1') - - winsparkle_reg.Close() - edcd_handle.Close() - - def __get_regentry(self, key: str) -> Union[None, list, str, int]: + def __get_regentry(self, key: str) -> None | list | str | int: """Access the Registry for the raw entry.""" try: value, _type = winreg.QueryValueEx(self.__reg_handle, key) @@ -127,20 +127,18 @@ def __get_regentry(self, key: str) -> Union[None, list, str, int]: # Key doesn't exist return None - # The type returned is actually as we'd expect for each of these. The casts are here for type checkers and # For programmers who want to actually know what is going on if _type == winreg.REG_SZ: return str(value) - elif _type == winreg.REG_DWORD: + if _type == winreg.REG_DWORD: return int(value) - elif _type == winreg.REG_MULTI_SZ: + if _type == winreg.REG_MULTI_SZ: return list(value) - else: - logger.warning(f'registry key {key=} returned unknown type {_type=} {value=}') - return None + logger.warning(f'Registry key {key=} returned unknown type {_type=} {value=}') + return None def get_str(self, key: str, *, default: str | None = None) -> str: """ @@ -152,7 +150,7 @@ def get_str(self, key: str, *, default: str | None = None) -> str: if res is None: return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default - elif not isinstance(res, str): + if not isinstance(res, str): raise ValueError(f'Data from registry is not a string: {type(res)=} {res=}') return res @@ -167,7 +165,7 @@ def get_list(self, key: str, *, default: list | None = None) -> list: if res is None: return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default - elif not isinstance(res, list): + if not isinstance(res, list): raise ValueError(f'Data from registry is not a list: {type(res)=} {res}') return res @@ -195,11 +193,11 @@ def get_bool(self, key: str, *, default: bool | None = None) -> bool: """ res = self.get_int(key, default=default) # type: ignore if res is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default return bool(res) - def set(self, key: str, val: Union[int, str, List[str], bool]) -> None: + def set(self, key: str, val: int | str | list[str] | bool) -> None: """ Set the given key's data to the given value. @@ -209,9 +207,8 @@ def set(self, key: str, val: Union[int, str, List[str], bool]) -> None: reg_type: Literal[1] | Literal[4] | Literal[7] if isinstance(val, str): reg_type = winreg.REG_SZ - winreg.SetValueEx(self.__reg_handle, key, REG_RESERVED_ALWAYS_ZERO, winreg.REG_SZ, val) - elif isinstance(val, int): # The original code checked for numbers.Integral, I dont think that is needed. + elif isinstance(val, int): reg_type = winreg.REG_DWORD elif isinstance(val, list): @@ -224,7 +221,6 @@ def set(self, key: str, val: Union[int, str, List[str], bool]) -> None: else: raise ValueError(f'Unexpected type for value {type(val)=}') - # Its complaining about the list, it works, tested on windows, ignored. winreg.SetValueEx(self.__reg_handle, key, REG_RESERVED_ALWAYS_ZERO, reg_type, val) # type: ignore def delete(self, key: str, *, suppress=False) -> None: diff --git a/plugins/coriolis.py b/plugins/coriolis.py index 7161a4e07..283b49d8b 100644 --- a/plugins/coriolis.py +++ b/plugins/coriolis.py @@ -19,6 +19,7 @@ `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations import base64 import gzip @@ -26,7 +27,7 @@ import json import tkinter as tk from tkinter import ttk -from typing import TYPE_CHECKING, Union, Optional +from typing import TYPE_CHECKING import myNotebook as nb # noqa: N813 # its not my fault. from EDMCLogging import get_main_logger from plug import show_error @@ -80,7 +81,7 @@ def plugin_start3(path: str) -> str: return 'Coriolis' -def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk.Frame: +def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame: """Set up plugin preferences.""" PADX = 10 # noqa: N806 @@ -130,7 +131,7 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk return conf_frame -def prefs_changed(cmdr: Optional[str], is_beta: bool) -> None: +def prefs_changed(cmdr: str | None, is_beta: bool) -> None: """ Update URLs and override mode based on user preferences. @@ -175,7 +176,7 @@ def _get_target_url(is_beta: bool) -> str: return coriolis_config.normal_url -def shipyard_url(loadout, is_beta) -> Union[str, bool]: +def shipyard_url(loadout, is_beta) -> str | bool: """Return a URL for the current ship.""" # most compact representation string = json.dumps(loadout, ensure_ascii=False, sort_keys=True, separators=(',', ':')).encode('utf-8') diff --git a/plugins/eddn.py b/plugins/eddn.py index d2e7d01ef..ee70d6e9b 100644 --- a/plugins/eddn.py +++ b/plugins/eddn.py @@ -18,6 +18,8 @@ `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations + import http import itertools import json @@ -37,9 +39,6 @@ Iterator, Mapping, MutableMapping, - Optional, - Dict, - List, ) from typing import OrderedDict as OrderedDictT from typing import Tuple, Union @@ -86,27 +85,27 @@ def __init__(self): self.odyssey = False # Track location to add to Journal events - self.system_address: Optional[str] = None - self.system_name: Optional[str] = None - self.coordinates: Optional[Tuple] = None - self.body_name: Optional[str] = None - self.body_id: Optional[int] = None - self.body_type: Optional[int] = None - self.station_name: Optional[str] = None - self.station_type: Optional[str] = None - self.station_marketid: Optional[str] = None + self.system_address: str | None = None + self.system_name: str | None = None + self.coordinates: tuple | None = None + self.body_name: str | None = None + self.body_id: int | None = None + self.body_type: int | None = None + self.station_name: str | None = None + self.station_type: str | None = None + self.station_marketid: str | None = None # Track Status.json data - self.status_body_name: Optional[str] = None + self.status_body_name: str | None = None # Avoid duplicates - self.marketId: Optional[str] = None - self.commodities: Optional[List[OrderedDictT[str, Any]]] = None - self.outfitting: Optional[Tuple[bool, List[str]]] = None - self.shipyard: Optional[Tuple[bool, List[Mapping[str, Any]]]] = None + self.marketId: str | None = None + self.commodities: list[OrderedDictT[str, Any]] | None = None + self.outfitting: Tuple[bool, list[str]] | None = None + self.shipyard: Tuple[bool, list[Mapping[str, Any]]] | None = None self.fcmaterials_marketid: int = 0 - self.fcmaterials: Optional[List[OrderedDictT[str, Any]]] = None + self.fcmaterials: list[OrderedDictT[str, Any]] | None = None self.fcmaterials_capi_marketid: int = 0 - self.fcmaterials_capi: Optional[List[OrderedDictT[str, Any]]] = None + self.fcmaterials_capi: list[OrderedDictT[str, Any]] | None = None # For the tkinter parent window, so we can call update_idletasks() self.parent: tk.Tk @@ -395,7 +394,7 @@ def send_message(self, msg: str) -> bool: """ logger.trace_if("plugin.eddn.send", "Sending message") should_return: bool - new_data: Dict[str, Any] + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', json.loads(msg)) if should_return: @@ -404,7 +403,7 @@ def send_message(self, msg: str) -> bool: # Even the smallest possible message compresses somewhat, so always compress encoded, compressed = text.gzip(json.dumps(new_data, separators=(',', ':')), max_size=0) - headers: Optional[Dict[str, str]] = None + headers: dict[str, str] | None = None if compressed: headers = {'Content-Encoding': 'gzip'} @@ -612,7 +611,7 @@ def __init__(self, parent: tk.Tk): self.sender = EDDNSender(self, self.eddn_url) - self.fss_signals: List[Mapping[str, Any]] = [] + self.fss_signals: list[Mapping[str, Any]] = [] def close(self): """Close down the EDDN class instance.""" @@ -636,7 +635,7 @@ def export_commodities(self, data: CAPIData, is_beta: bool) -> None: # noqa: CC :param is_beta: whether or not we're currently in beta mode """ should_return: bool - new_data: Dict[str, Any] + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request./market', {}) if should_return: logger.warning("capi.request./market has been disabled by killswitch. Returning.") @@ -653,7 +652,7 @@ def export_commodities(self, data: CAPIData, is_beta: bool) -> None: # noqa: CC modules, ships ) - commodities: List[OrderedDictT[str, Any]] = [] + commodities: list[OrderedDictT[str, Any]] = [] for commodity in data['lastStarport'].get('commodities') or []: # Check 'marketable' and 'not prohibited' if (category_map.get(commodity['categoryname'], True) @@ -726,7 +725,7 @@ def safe_modules_and_ships(self, data: Mapping[str, Any]) -> Tuple[dict, dict]: :param data: The raw CAPI data. :return: Sanity-checked data. """ - modules: Dict[str, Any] = data['lastStarport'].get('modules') + modules: dict[str, Any] = data['lastStarport'].get('modules') if modules is None or not isinstance(modules, dict): if modules is None: logger.debug('modules was None. FC or Damaged Station?') @@ -743,13 +742,13 @@ def safe_modules_and_ships(self, data: Mapping[str, Any]) -> Tuple[dict, dict]: # Set a safe value modules = {} - ships: Dict[str, Any] = data['lastStarport'].get('ships') + ships: dict[str, Any] = data['lastStarport'].get('ships') if ships is None or not isinstance(ships, dict): if ships is None: logger.debug('ships was None') else: - logger.error(f'ships was neither None nor a Dict! Type = {type(ships)}') + logger.error(f'ships was neither None nor a dict! Type = {type(ships)}') # Set a safe value ships = {'shipyard_list': {}, 'unavailable_list': []} @@ -769,7 +768,7 @@ def export_outfitting(self, data: CAPIData, is_beta: bool) -> None: :param is_beta: whether or not we're currently in beta mode """ should_return: bool - new_data: Dict[str, Any] + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {}) if should_return: logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.") @@ -796,7 +795,7 @@ def export_outfitting(self, data: CAPIData, is_beta: bool) -> None: modules.values() ) - outfitting: List[str] = sorted( + outfitting: list[str] = sorted( self.MODULE_RE.sub(lambda match: match.group(0).capitalize(), mod['name'].lower()) for mod in to_search ) @@ -837,7 +836,7 @@ def export_shipyard(self, data: CAPIData, is_beta: bool) -> None: :param is_beta: whether or not we are in beta mode """ should_return: bool - new_data: Dict[str, Any] + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {}) if should_return: logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.") @@ -856,7 +855,7 @@ def export_shipyard(self, data: CAPIData, is_beta: bool) -> None: ships ) - shipyard: List[Mapping[str, Any]] = sorted( + shipyard: list[Mapping[str, Any]] = sorted( itertools.chain( (ship['name'].lower() for ship in (ships['shipyard_list'] or {}).values()), (ship['name'].lower() for ship in ships['unavailable_list'] or {}), @@ -899,8 +898,8 @@ def export_journal_commodities(self, cmdr: str, is_beta: bool, entry: Mapping[st :param is_beta: whether or not we're in beta mode :param entry: the journal entry containing the commodities data """ - items: List[Mapping[str, Any]] = entry.get('Items') or [] - commodities: List[OrderedDictT[str, Any]] = sorted((OrderedDict([ + items: list[Mapping[str, Any]] = entry.get('Items') or [] + commodities: list[OrderedDictT[str, Any]] = sorted((OrderedDict([ ('name', self.canonicalise(commodity['Name'])), ('meanPrice', commodity['MeanPrice']), ('buyPrice', commodity['BuyPrice']), @@ -947,11 +946,11 @@ def export_journal_outfitting(self, cmdr: str, is_beta: bool, entry: Mapping[str :param is_beta: Whether or not we're in beta mode :param entry: The relevant journal entry """ - modules: List[Mapping[str, Any]] = entry.get('Items', []) + modules: list[Mapping[str, Any]] = entry.get('Items', []) horizons: bool = entry.get('Horizons', False) # outfitting = sorted([self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), module['Name']) # for module in modules if module['Name'] != 'int_planetapproachsuite']) - outfitting: List[str] = sorted( + outfitting: list[str] = sorted( self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), mod['Name']) for mod in filter(lambda m: m['Name'] != 'int_planetapproachsuite', modules) ) @@ -986,7 +985,7 @@ def export_journal_shipyard(self, cmdr: str, is_beta: bool, entry: Mapping[str, :param is_beta: Whether or not we're in beta mode :param entry: the relevant journal entry """ - ships: List[Mapping[str, Any]] = entry.get('PriceList') or [] + ships: list[Mapping[str, Any]] = entry.get('Pricelist') or [] horizons: bool = entry.get('Horizons', False) shipyard = sorted(ship['ShipType'] for ship in ships) # Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard. @@ -1042,7 +1041,7 @@ def send_message(self, cmdr: str, msg: MutableMapping[str, Any]) -> None: self.sender.send_message_by_id(msg_id) def standard_header( - self, game_version: Optional[str] = None, game_build: Optional[str] = None + self, game_version: str | None = None, game_build: str | None = None ) -> MutableMapping[str, Any]: """ Return the standard header for an EDDN message, given tracked state. @@ -1134,7 +1133,7 @@ def entry_augment_system_data( def export_journal_fssdiscoveryscan( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: Mapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FSSDiscoveryScan to EDDN on the correct schema. @@ -1176,7 +1175,7 @@ def export_journal_fssdiscoveryscan( def export_journal_navbeaconscan( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: Mapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an NavBeaconScan to EDDN on the correct schema. @@ -1218,7 +1217,7 @@ def export_journal_navbeaconscan( def export_journal_codexentry( # noqa: CCR001 self, cmdr: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send a CodexEntry to EDDN on the correct schema. @@ -1320,7 +1319,7 @@ def export_journal_codexentry( # noqa: CCR001 def export_journal_scanbarycentre( self, cmdr: str, system_starpos: list, is_beta: bool, entry: Mapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send a ScanBaryCentre to EDDN on the correct schema. @@ -1374,7 +1373,7 @@ def export_journal_scanbarycentre( def export_journal_navroute( self, cmdr: str, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send a NavRoute to EDDN on the correct schema. @@ -1447,7 +1446,7 @@ def export_journal_navroute( def export_journal_fcmaterials( self, cmdr: str, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FCMaterials message to EDDN on the correct schema. @@ -1531,7 +1530,7 @@ def export_journal_fcmaterials( def export_capi_fcmaterials( self, data: CAPIData, is_beta: bool, horizons: bool - ) -> Optional[str]: + ) -> str | None: """ Send CAPI-sourced 'onfootmicroresources' data on `fcmaterials/1` schema. @@ -1594,7 +1593,7 @@ def export_capi_fcmaterials( def export_journal_approachsettlement( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an ApproachSettlement to EDDN on the correct schema. @@ -1669,7 +1668,7 @@ def export_journal_approachsettlement( def export_journal_fssallbodiesfound( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FSSAllBodiesFound message to EDDN on the correct schema. @@ -1719,7 +1718,7 @@ def export_journal_fssallbodiesfound( def export_journal_fssbodysignals( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FSSBodySignals message to EDDN on the correct schema. @@ -1789,7 +1788,7 @@ def enqueue_journal_fsssignaldiscovered(self, entry: MutableMapping[str, Any]) - def export_journal_fsssignaldiscovered( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FSSSignalDiscovered message to EDDN on the correct schema. @@ -1892,7 +1891,7 @@ def canonicalise(self, item: str) -> str: match = self.CANONICALISE_RE.match(item) return match and match.group(1) or item - def capi_gameversion_from_host_endpoint(self, capi_host: Optional[str], capi_endpoint: str) -> str: + def capi_gameversion_from_host_endpoint(self, capi_host: str | None, capi_endpoint: str) -> str: """ Return the correct CAPI gameversion string for the given host/endpoint. @@ -1910,7 +1909,7 @@ def capi_gameversion_from_host_endpoint(self, capi_host: Optional[str], capi_end gv = 'CAPI-Legacy-' else: - # Technically incorrect, but it will inform Listeners + # Technically incorrect, but it will inform listeners logger.error(f"{capi_host=} lead to bad gameversion") gv = 'CAPI-UNKNOWN-' ####################################################################### @@ -1924,7 +1923,7 @@ def capi_gameversion_from_host_endpoint(self, capi_host: Optional[str], capi_end gv += 'shipyard' else: - # Technically incorrect, but it will inform Listeners + # Technically incorrect, but it will inform listeners logger.error(f"{capi_endpoint=} lead to bad gameversion") gv += 'UNKNOWN' ####################################################################### @@ -1943,7 +1942,7 @@ def plugin_start3(plugin_dir: str) -> str: return 'EDDN' -def plugin_app(parent: tk.Tk) -> Optional[tk.Frame]: +def plugin_app(parent: tk.Tk) -> tk.Frame | None: """ Set up any plugin-specific UI. @@ -2183,7 +2182,7 @@ def filter_localised(d: Mapping[str, Any]) -> OrderedDictT[str, Any]: """ Recursively remove any dict keys with names ending `_Localised` from a dict. - :param d: Dict to filter keys of. + :param d: dict to filter keys of. :return: The filtered dict. """ filtered: OrderedDictT[str, Any] = OrderedDict() @@ -2207,7 +2206,7 @@ def capi_filter_localised(d: Mapping[str, Any]) -> OrderedDictT[str, Any]: """ Recursively remove any dict keys for known CAPI 'localised' names. - :param d: Dict to filter keys of. + :param d: dict to filter keys of. :return: The filtered dict. """ filtered: OrderedDictT[str, Any] = OrderedDict() @@ -2234,7 +2233,7 @@ def journal_entry( # noqa: C901, CCR001 station: str, entry: MutableMapping[str, Any], state: Mapping[str, Any] -) -> Optional[str]: +) -> str | None: """ Process a new Journal entry. @@ -2491,7 +2490,7 @@ def journal_entry( # noqa: C901, CCR001 return None -def cmdr_data_legacy(data: CAPIData, is_beta: bool) -> Optional[str]: +def cmdr_data_legacy(data: CAPIData, is_beta: bool) -> str | None: """ Process new CAPI data for Legacy galaxy. @@ -2510,7 +2509,7 @@ def cmdr_data_legacy(data: CAPIData, is_beta: bool) -> Optional[str]: return cmdr_data(data, is_beta) -def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001 +def cmdr_data(data: CAPIData, is_beta: bool) -> str | None: # noqa: CCR001 """ Process new CAPI data for not-Legacy galaxy (might be beta). @@ -2611,7 +2610,7 @@ def capi_is_horizons(economies: MAP_STR_ANY, modules: MAP_STR_ANY, ships: MAP_ST return economies_colony or modules_horizons or ship_horizons -def dashboard_entry(cmdr: str, is_beta: bool, entry: Dict[str, Any]) -> None: +def dashboard_entry(cmdr: str, is_beta: bool, entry: dict[str, Any]) -> None: """ Process Status.json data to track things like current Body. diff --git a/plugins/edsm.py b/plugins/edsm.py index 2f644dcb9..33af692bb 100644 --- a/plugins/edsm.py +++ b/plugins/edsm.py @@ -18,6 +18,8 @@ `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations + import json import threading import tkinter as tk @@ -26,7 +28,7 @@ from threading import Thread from time import sleep from tkinter import ttk -from typing import TYPE_CHECKING, Any, Dict, List, Literal, Mapping, MutableMapping, Optional, Set, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Literal, Mapping, MutableMapping, cast import requests import killswitch import monitor @@ -72,27 +74,27 @@ def __init__(self): self.game_build = "" # Handle only sending Live galaxy data - self.legacy_galaxy_last_notified: Optional[datetime] = None + self.legacy_galaxy_last_notified: datetime | None = None self.session: requests.Session = requests.Session() self.session.headers['User-Agent'] = user_agent self.queue: Queue = Queue() # Items to be sent to EDSM by worker thread - self.discarded_events: Set[str] = set() # List discarded events from EDSM - self.lastlookup: Dict[str, Any] # Result of last system lookup + self.discarded_events: set[str] = set() # List discarded events from EDSM + self.lastlookup: dict[str, Any] # Result of last system lookup # Game state self.multicrew: bool = False # don't send captain's ship info to EDSM while on a crew - self.coordinates: Optional[Tuple[int, int, int]] = None + self.coordinates: tuple[int, int, int] | None = None self.newgame: bool = False # starting up - batch initial burst of events self.newgame_docked: bool = False # starting up while docked self.navbeaconscan: int = 0 # batch up burst of Scan events after NavBeaconScan - self.system_link: Optional[tk.Widget] = None - self.system_name: Optional[tk.Tk] = None - self.system_address: Optional[int] = None # Frontier SystemAddress - self.system_population: Optional[int] = None - self.station_link: Optional[tk.Widget] = None - self.station_name: Optional[str] = None - self.station_marketid: Optional[int] = None # Frontier MarketID + self.system_link: tk.Widget | None = None + self.system_name: tk.Tk | None = None + self.system_address: int | None = None # Frontier SystemAddress + self.system_population: int | None = None + self.station_link: tk.Widget | None = None + self.station_name: str | None = None + self.station_marketid: int | None = None # Frontier MarketID self.on_foot = False self._IMG_KNOWN = None @@ -100,21 +102,21 @@ def __init__(self): self._IMG_NEW = None self._IMG_ERROR = None - self.thread: Optional[threading.Thread] = None + self.thread: threading.Thread | None = None - self.log: Optional[tk.IntVar] = None - self.log_button: Optional[ttk.Checkbutton] = None + self.log: tk.IntVar | None = None + self.log_button: ttk.Checkbutton | None = None - self.label: Optional[tk.Widget] = None + self.label: tk.Widget | None = None - self.cmdr_label: Optional[nb.Label] = None - self.cmdr_text: Optional[nb.Label] = None + self.cmdr_label: nb.Label | None = None + self.cmdr_text: nb.Label | None = None - self.user_label: Optional[nb.Label] = None - self.user: Optional[nb.Entry] = None + self.user_label: nb.Label | None = None + self.user: nb.Entry | None = None - self.apikey_label: Optional[nb.Label] = None - self.apikey: Optional[nb.Entry] = None + self.apikey_label: nb.Label | None = None + self.apikey: nb.Entry | None = None this = This() @@ -277,7 +279,7 @@ def toggle_password_visibility(): this.apikey.config(show="*") # type: ignore -def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk.Frame: +def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame: """ Plugin preferences setup hook. @@ -361,7 +363,7 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk return frame -def prefs_cmdr_changed(cmdr: Optional[str], is_beta: bool) -> None: # noqa: CCR001 +def prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None: # noqa: CCR001 """ Handle the Commander name changing whilst Settings was open. @@ -390,7 +392,7 @@ def prefs_cmdr_changed(cmdr: Optional[str], is_beta: bool) -> None: # noqa: CCR # LANG: We have no data on the current commander this.cmdr_text['text'] = _('None') - to_set: Union[Literal['normal'], Literal['disabled']] = tk.DISABLED + to_set: Literal['normal'] | Literal['disabled'] = tk.DISABLED if cmdr and not is_beta and this.log and this.log.get(): to_set = tk.NORMAL @@ -440,9 +442,9 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None: config.set('edsm_out', this.log.get()) if cmdr and not is_beta: - cmdrs: List[str] = config.get_list('edsm_cmdrs', default=[]) - usernames: List[str] = config.get_list('edsm_usernames', default=[]) - apikeys: List[str] = config.get_list('edsm_apikeys', default=[]) + cmdrs: list[str] = config.get_list('edsm_cmdrs', default=[]) + usernames: list[str] = config.get_list('edsm_usernames', default=[]) + apikeys: list[str] = config.get_list('edsm_apikeys', default=[]) if this.user and this.apikey: if cmdr in cmdrs: @@ -460,7 +462,7 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None: config.set('edsm_apikeys', apikeys) -def credentials(cmdr: str) -> Optional[Tuple[str, str]]: +def credentials(cmdr: str) -> tuple[str, str] | None: """ Get credentials for the given commander, if they exist. @@ -635,7 +637,7 @@ def journal_entry( # noqa: C901, CCR001 # Update system data -def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001 +def cmdr_data(data: CAPIData, is_beta: bool) -> str | None: # noqa: CCR001 """ Process new CAPI data. @@ -722,7 +724,7 @@ def worker() -> None: # noqa: CCR001 C901 :return: None """ logger.debug('Starting...') - pending: List[Mapping[str, Any]] = [] # Unsent events + pending: list[Mapping[str, Any]] = [] # Unsent events closing = False cmdr: str = "" last_game_version = "" @@ -744,7 +746,7 @@ def worker() -> None: # noqa: CCR001 C901 logger.debug(f'{this.shutting_down=}, so setting closing = True') closing = True - item: Optional[Tuple[str, str, str, Mapping[str, Any]]] = this.queue.get() + item: tuple[str, str, str, Mapping[str, Any]] | None = this.queue.get() if item: (cmdr, game_version, game_build, entry) = item logger.trace_if(CMDR_EVENTS, f'De-queued ({cmdr=}, {game_version=}, {game_build=}, {entry["event"]=})') @@ -756,7 +758,7 @@ def worker() -> None: # noqa: CCR001 C901 retrying = 0 while retrying < 3: if item is None: - item = cast(Tuple[str, str, str, Mapping[str, Any]], ("", {})) + item = cast(tuple[str, str, str, Mapping[str, Any]], ("", {})) should_skip, new_item = killswitch.check_killswitch( 'plugins.edsm.worker', item, @@ -909,7 +911,7 @@ def worker() -> None: # noqa: CCR001 C901 last_game_build = game_build -def should_send(entries: List[Mapping[str, Any]], event: str) -> bool: # noqa: CCR001 +def should_send(entries: list[Mapping[str, Any]], event: str) -> bool: # noqa: CCR001 """ Whether or not any of the given entries should be sent to EDSM. diff --git a/plugins/edsy.py b/plugins/edsy.py index 0c78a4292..a02d34248 100644 --- a/plugins/edsy.py +++ b/plugins/edsy.py @@ -18,11 +18,13 @@ `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations + import base64 import gzip import io import json -from typing import Any, Mapping, Union +from typing import Any, Mapping def plugin_start3(plugin_dir: str) -> str: @@ -36,7 +38,7 @@ def plugin_start3(plugin_dir: str) -> str: # Return a URL for the current ship -def shipyard_url(loadout: Mapping[str, Any], is_beta: bool) -> Union[bool, str]: +def shipyard_url(loadout: Mapping[str, Any], is_beta: bool) -> bool | str: """ Construct a URL for ship loadout. diff --git a/plugins/inara.py b/plugins/inara.py index efe010df6..2a124f549 100644 --- a/plugins/inara.py +++ b/plugins/inara.py @@ -18,6 +18,7 @@ `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations import json import threading @@ -29,9 +30,8 @@ from operator import itemgetter from threading import Lock, Thread from tkinter import ttk -from typing import TYPE_CHECKING, Any, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional +from typing import TYPE_CHECKING, Any, Callable, Deque, Mapping, NamedTuple, Sequence, cast, Union from typing import OrderedDict as OrderedDictT -from typing import Sequence, Union, cast import requests import edmc_data import killswitch @@ -63,8 +63,8 @@ def _(x: str) -> str: class Credentials(NamedTuple): """Credentials holds the set of credentials required to identify an inara API payload to inara.""" - cmdr: Optional[str] - fid: Optional[str] + cmdr: str | None + fid: str | None api_key: str @@ -89,25 +89,25 @@ def __init__(self): self.parent: tk.Tk # Handle only sending Live galaxy data - self.legacy_galaxy_last_notified: Optional[datetime] = None + self.legacy_galaxy_last_notified: datetime | None = None self.lastlocation = None # eventData from the last Commander's Flight Log event self.lastship = None # eventData from the last addCommanderShip or setCommanderShip event # Cached Cmdr state - self.cmdr: Optional[str] = None - self.FID: Optional[str] = None # Frontier ID + self.cmdr: str | None = None + self.FID: str | None = None # Frontier ID self.multicrew: bool = False # don't send captain's ship info to Inara while on a crew self.newuser: bool = False # just entered API Key - send state immediately self.newsession: bool = True # starting a new session - wait for Cargo event self.undocked: bool = False # just undocked self.suppress_docked = False # Skip initial Docked event if started docked - self.cargo: Optional[List[OrderedDictT[str, Any]]] = None - self.materials: Optional[List[OrderedDictT[str, Any]]] = None + self.cargo: list[OrderedDictT[str, Any]] | None = None + self.materials: list[OrderedDictT[str, Any]] | None = None self.last_credits: int = 0 # Send credit update soon after Startup / new game - self.storedmodules: Optional[List[OrderedDictT[str, Any]]] = None - self.loadout: Optional[OrderedDictT[str, Any]] = None - self.fleet: Optional[List[OrderedDictT[str, Any]]] = None + self.storedmodules: list[OrderedDictT[str, Any]] | None = None + self.loadout: OrderedDictT[str, Any] | None = None + self.fleet: list[OrderedDictT[str, Any]] | None = None self.shipswap: bool = False # just swapped ship self.on_foot = False @@ -115,9 +115,9 @@ def __init__(self): # Main window clicks self.system_link: tk.Widget = None # type: ignore - self.system_name: Optional[str] = None # type: ignore - self.system_address: Optional[str] = None # type: ignore - self.system_population: Optional[int] = None + self.system_name: str | None = None # type: ignore + self.system_address: str | None = None # type: ignore + self.system_population: int | None = None self.station_link: tk.Widget = None # type: ignore self.station = None self.station_marketid = None @@ -129,7 +129,7 @@ def __init__(self): self.apikey: nb.Entry self.apikey_label: tk.Label - self.events: Dict[Credentials, Deque[Event]] = defaultdict(deque) + self.events: dict[Credentials, Deque[Event]] = defaultdict(deque) self.event_lock: Lock = threading.Lock() # protects events, for use when rewriting events def filter_events(self, key: Credentials, predicate: Callable[[Event], bool]) -> None: @@ -361,7 +361,7 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None: ) -def credentials(cmdr: Optional[str]) -> Optional[str]: +def credentials(cmdr: str | None) -> str | None: """ Get the credentials for the current commander. @@ -383,7 +383,7 @@ def credentials(cmdr: Optional[str]) -> Optional[str]: def journal_entry( # noqa: C901, CCR001 - cmdr: str, is_beta: bool, system: str, station: str, entry: Dict[str, Any], state: Dict[str, Any] + cmdr: str, is_beta: bool, system: str, station: str, entry: dict[str, Any], state: dict[str, Any] ) -> str: """ Journal entry hook. @@ -394,7 +394,7 @@ def journal_entry( # noqa: C901, CCR001 # causing users to spam Inara with 'URL provider' queries, and we want to # stop that. should_return: bool - new_entry: Dict[str, Any] = {} + new_entry: dict[str, Any] = {} should_return, new_entry = killswitch.check_killswitch('plugins.inara.journal', entry, logger) if should_return: @@ -813,7 +813,7 @@ def journal_entry( # noqa: C901, CCR001 # Fleet if event_name == 'StoredShips': - fleet: List[OrderedDictT[str, Any]] = sorted( + fleet: list[OrderedDictT[str, Any]] = sorted( [OrderedDict({ 'shipType': x['ShipType'], 'shipGameID': x['ShipID'], @@ -860,7 +860,7 @@ def journal_entry( # noqa: C901, CCR001 # Stored modules if event_name == 'StoredModules': items = {mod['StorageSlot']: mod for mod in entry['Items']} # Impose an order - modules: List[OrderedDictT[str, Any]] = [] + modules: list[OrderedDictT[str, Any]] = [] for slot in sorted(items): item = items[slot] module: OrderedDictT[str, Any] = OrderedDict([ @@ -1088,7 +1088,7 @@ def journal_entry( # noqa: C901, CCR001 # # So we're going to do a lot of checking here and bail out if we dont like the look of ANYTHING here - to_send_data: Optional[Dict[str, Any]] = {} # This is a glorified sentinel until lower down. + to_send_data: dict[str, Any] | None = {} # This is a glorified sentinel until lower down. # On Horizons, neither of these exist on TouchDown star_system_name = entry.get('StarSystem', this.system_name) body_name = entry.get('Body', state['Body'] if state['BodyType'] == 'Planet' else None) @@ -1370,7 +1370,7 @@ def cmdr_data(data: CAPIData, is_beta): # noqa: CCR001, reanalyze me later pass -def make_loadout(state: Dict[str, Any]) -> OrderedDictT[str, Any]: # noqa: CCR001 +def make_loadout(state: dict[str, Any]) -> OrderedDictT[str, Any]: # noqa: CCR001 """ Construct an inara loadout from an event. @@ -1440,8 +1440,8 @@ def new_add_event( name: str, timestamp: str, data: EVENT_DATA, - cmdr: Optional[str] = None, - fid: Optional[str] = None + cmdr: str | None = None, + fid: str | None = None ): """ Add a journal event to the queue, to be sent to inara at the next opportunity. @@ -1470,11 +1470,11 @@ def new_add_event( this.events[key].append(Event(name, timestamp, data)) -def clean_event_list(event_list: List[Event]) -> List[Event]: +def clean_event_list(event_list: list[Event]) -> list[Event]: """ Check for killswitched events and remove or modify them as requested. - :param event_list: List of events to clean + :param event_list: list of events to clean :return: Cleaned list of events """ cleaned_events = [] @@ -1533,14 +1533,14 @@ def new_worker(): logger.debug('Done.') -def get_events(clear: bool = True) -> Dict[Credentials, List[Event]]: +def get_events(clear: bool = True) -> dict[Credentials, list[Event]]: """ Fetch a copy of all events from the current queue. :param clear: whether to clear the queues as we go, defaults to True :return: a copy of the event dictionary """ - events_copy: Dict[Credentials, List[Event]] = {} + events_copy: dict[Credentials, list[Event]] = {} with this.event_lock: for key, events in this.events.items(): @@ -1590,7 +1590,7 @@ def send_data(url: str, data: Mapping[str, Any]) -> bool: return True # Regardless of errors above, we DID manage to send it, therefore inform our caller as such -def handle_api_error(data: Mapping[str, Any], status: int, reply: Dict[str, Any]) -> None: +def handle_api_error(data: Mapping[str, Any], status: int, reply: dict[str, Any]) -> None: """ Handle API error response. @@ -1604,7 +1604,7 @@ def handle_api_error(data: Mapping[str, Any], status: int, reply: Dict[str, Any] plug.show_error(_('Error: Inara {MSG}').format(MSG=error_message)) -def handle_success_reply(data: Mapping[str, Any], reply: Dict[str, Any]) -> None: +def handle_success_reply(data: Mapping[str, Any], reply: dict[str, Any]) -> None: """ Handle successful API response. @@ -1619,7 +1619,7 @@ def handle_success_reply(data: Mapping[str, Any], reply: Dict[str, Any]) -> None handle_special_events(data_event, reply_event) -def handle_individual_error(data_event: Dict[str, Any], reply_status: int, reply_text: str) -> None: +def handle_individual_error(data_event: dict[str, Any], reply_status: int, reply_text: str) -> None: """ Handle individual API error. @@ -1638,7 +1638,7 @@ def handle_individual_error(data_event: Dict[str, Any], reply_status: int, reply )) -def handle_special_events(data_event: Dict[str, Any], reply_event: Dict[str, Any]) -> None: +def handle_special_events(data_event: dict[str, Any], reply_event: dict[str, Any]) -> None: """ Handle special events in the API response. diff --git a/tests/EDMCLogging.py/test_logging_classvar.py b/tests/EDMCLogging.py/test_logging_classvar.py index 24ab009ee..89d3220e9 100644 --- a/tests/EDMCLogging.py/test_logging_classvar.py +++ b/tests/EDMCLogging.py/test_logging_classvar.py @@ -37,7 +37,7 @@ def test_class_logger(caplog: 'LogCaptureFixture') -> None: ClassVarLogger.set_logger(logger) ClassVarLogger.logger.debug('test') # type: ignore # its there ClassVarLogger.logger.info('test2') # type: ignore # its there - log_stuff('test3') # type: ignore # its there + log_stuff('test3') # Dont move these, it relies on the line numbres. assert 'EDMarketConnector.EDMCLogging.py:test_logging_classvar.py:38 test' in caplog.text diff --git a/tests/config/_old_config.py b/tests/config/_old_config.py index 211c1e72f..22f0b18f0 100644 --- a/tests/config/_old_config.py +++ b/tests/config/_old_config.py @@ -1,12 +1,13 @@ -# type: ignore +"""Old Configuration Test File.""" +from __future__ import annotations + import numbers import sys import warnings from configparser import NoOptionError from os import getenv, makedirs, mkdir, pardir from os.path import dirname, expanduser, isdir, join, normpath -from typing import TYPE_CHECKING, Optional, Union - +from typing import TYPE_CHECKING from config import applongname, appname, update_interval from EDMCLogging import get_main_logger @@ -81,7 +82,7 @@ RegDeleteValue.restype = LONG RegDeleteValue.argtypes = [HKEY, LPCWSTR] - def known_folder_path(guid: uuid.UUID) -> Optional[str]: + def known_folder_path(guid: uuid.UUID) -> str | None: """Look up a Windows GUID to actual folder path name.""" buf = ctypes.c_wchar_p() if SHGetKnownFolderPath(ctypes.create_string_buffer(guid.bytes_le), 0, 0, ctypes.byref(buf)): @@ -95,7 +96,7 @@ def known_folder_path(guid: uuid.UUID) -> Optional[str]: from configparser import RawConfigParser -class OldConfig(): +class OldConfig: """Object that holds all configuration data.""" OUT_EDDN_SEND_STATION_DATA = 1 @@ -153,20 +154,19 @@ def __init__(self): if not self.get('outdir') or not isdir(str(self.get('outdir'))): self.set('outdir', NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask, True)[0]) - def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, list, str]: + def get(self, key: str, default: None | list | str = None) -> None | list | str: """Look up a string configuration value.""" val = self.settings.get(key) if val is None: return default - elif isinstance(val, str): + if isinstance(val, str): return str(val) - elif isinstance(val, list): + if isinstance(val, list): return list(val) # make writeable - else: - return default + return default def getint(self, key: str, default: int = 0) -> int: """Look up an integer configuration value.""" @@ -181,7 +181,7 @@ def getint(self, key: str, default: int = 0) -> int: logger.debug('The exception type is ...', exc_info=e) return default - def set(self, key: str, val: Union[int, str, list]) -> None: + def set(self, key: str, val: int | str | list) -> None: """Set value on the specified configuration key.""" self.settings[key] = val @@ -202,7 +202,7 @@ def close(self) -> None: elif sys.platform == 'win32': def __init__(self): - self.app_dir = join(known_folder_path(FOLDERID_LocalAppData), appname) # type: ignore # Not going to change + self.app_dir = join(known_folder_path(FOLDERID_LocalAppData), appname) # type: ignore if not isdir(self.app_dir): mkdir(self.app_dir) @@ -279,10 +279,10 @@ def __init__(self): RegSetValueEx(sparklekey, 'UpdateInterval', 0, 1, buf, len(buf) * 2) RegCloseKey(sparklekey) - if not self.get('outdir') or not isdir(self.get('outdir')): # type: ignore # Not going to change + if not self.get('outdir') or not isdir(self.get('outdir')): # type: ignore self.set('outdir', known_folder_path(FOLDERID_Documents) or self.home) - def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, list, str]: + def get(self, key: str, default: None | list | str = None) -> None | list | str: """Look up a string configuration value.""" key_type = DWORD() key_size = DWORD() @@ -304,11 +304,10 @@ def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, l if RegQueryValueEx(self.hkey, key, 0, ctypes.byref(key_type), buf, ctypes.byref(key_size)): return default - elif key_type.value == REG_MULTI_SZ: + if key_type.value == REG_MULTI_SZ: return list(ctypes.wstring_at(buf, len(buf)-2).split('\x00')) - else: - return str(buf.value) + return str(buf.value) def getint(self, key: str, default: int = 0) -> int: """Look up an integer configuration value.""" @@ -328,10 +327,9 @@ def getint(self, key: str, default: int = 0) -> int: ): return default - else: - return key_val.value + return key_val.value - def set(self, key: str, val: Union[int, str, list]) -> None: + def set(self, key: str, val: int | str | list) -> None: """Set value on the specified configuration key.""" if isinstance(val, str): buf = ctypes.create_unicode_buffer(val) @@ -388,7 +386,7 @@ def __init__(self): self.config = RawConfigParser(comment_prefixes=('#',)) try: - with codecs.open(self.filename, 'r') as h: + with codecs.open(self.filename) as h: self.config.read_file(h) except Exception as e: @@ -398,7 +396,7 @@ def __init__(self): if not self.get('outdir') or not isdir(self.get('outdir')): # type: ignore # Not going to change self.set('outdir', expanduser('~')) - def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, list, str]: + def get(self, key: str, default: None | list | str = None) -> None | list | str: """Look up a string configuration value.""" try: val = self.config.get(self.SECTION, key) @@ -407,8 +405,7 @@ def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, l # so we add a spurious ';' entry in set() and remove it here assert val.split('\n')[-1] == ';', val.split('\n') return [self._unescape(x) for x in val.split('\n')[:-1]] - else: - return self._unescape(val) + return self._unescape(val) except NoOptionError: logger.debug(f'attempted to get key {key} that does not exist') @@ -434,13 +431,13 @@ def getint(self, key: str, default: int = 0) -> int: return default - def set(self, key: str, val: Union[int, str, list]) -> None: + def set(self, key: str, val: int | str | list) -> None: """Set value on the specified configuration key.""" if isinstance(val, bool): - self.config.set(self.SECTION, key, val and '1' or '0') # type: ignore # Not going to change + self.config.set(self.SECTION, key, val and '1' or '0') - elif isinstance(val, str) or isinstance(val, numbers.Integral): - self.config.set(self.SECTION, key, self._escape(val)) # type: ignore # Not going to change + elif isinstance(val, (numbers.Integral, str)): + self.config.set(self.SECTION, key, self._escape(val)) elif isinstance(val, list): self.config.set(self.SECTION, key, '\n'.join([self._escape(x) for x in val] + [';'])) @@ -460,7 +457,7 @@ def save(self) -> None: def close(self) -> None: """Close the configuration.""" self.save() - self.config = None + self.config = None # type: ignore def _escape(self, val: str) -> str: """Escape a string for storage.""" diff --git a/tests/config/test_config.py b/tests/config/test_config.py index ad6bbd6f2..e839afc7a 100644 --- a/tests/config/test_config.py +++ b/tests/config/test_config.py @@ -7,15 +7,13 @@ Most of these tests are parity tests with the "old" config, and likely one day can be entirely removed. """ -from __future__ import annotations - import contextlib import itertools import pathlib import random import string import sys -from typing import Any, Iterable, List, cast +from typing import Any, Iterable, cast import pytest from pytest import mark @@ -30,12 +28,12 @@ from config import config # noqa: E402 -def _fuzz_list(length: int) -> List[str]: +def _fuzz_list(length: int) -> list[str]: out = [] for _ in range(length): out.append(_fuzz_generators[str](random.randint(0, 1337))) - return cast(List[str], out) + return cast(list[str], out) _fuzz_generators = { # Type annotating this would be a nightmare. @@ -72,7 +70,7 @@ def _get_fuzz(_type: Any, num_values=50, value_length=(0, 10)) -> list: big_int = int(0xFFFFFFFF) # 32 bit int -def _make_params(args: List[Any], id_name: str = 'random_test_{i}') -> list: +def _make_params(args: list[Any], id_name: str = 'random_test_{i}') -> list: return [pytest.param(x, id=id_name.format(i=i)) for i, x in enumerate(args)] @@ -81,7 +79,7 @@ def _build_test_list(static_data, random_data, random_id_name='random_test_{i}') class TestNewConfig: - """Test the new config with an array of hand picked and random data.""" + """Test the new config with an array of hand-picked and random data.""" def __update_linuxconfig(self) -> None: """On linux config uses ConfigParser, which doesn't update from disk changes. Force the update here.""" @@ -117,7 +115,7 @@ def test_string(self, string: str) -> None: config.delete(name) @mark.parametrize("lst", _build_test_list(list_tests, _get_fuzz(list))) - def test_list(self, lst: List[str]) -> None: + def test_list(self, lst: list[str]) -> None: """Save a list and then ask for it back.""" name = f'list_test_{ hash("".join(lst)) }' config.set(name, lst) @@ -216,7 +214,7 @@ def test_string(self, string: str) -> None: assert res == string @mark.parametrize("lst", _build_test_list(list_tests, _get_fuzz(list))) - def test_list(self, lst: List[str]) -> None: + def test_list(self, lst: list[str]) -> None: """Save a list though the old config, recall it using the new config.""" lst = [x.replace("\r", "") for x in lst] # OldConfig on linux fails to store these correctly if sys.platform == 'win32': diff --git a/tests/journal_lock.py/test_journal_lock.py b/tests/journal_lock.py/test_journal_lock.py index c617d52c2..649140c87 100644 --- a/tests/journal_lock.py/test_journal_lock.py +++ b/tests/journal_lock.py/test_journal_lock.py @@ -1,13 +1,13 @@ """Tests for journal_lock.py code.""" +from __future__ import annotations + import multiprocessing as mp import os import pathlib import sys from typing import Generator - import pytest from pytest import MonkeyPatch, TempdirFactory, TempPathFactory - from config import config from journal_lock import JournalLock, JournalLockResult @@ -142,7 +142,7 @@ def mock_journaldir_changing( def get_str(key: str, *, default: str | None = None) -> str: """Mock config.*Config get_str to provide fake journaldir.""" if key == 'journaldir': - return tmp_path_factory.mktemp("changing") + return tmp_path_factory.mktemp("changing") # type: ignore print('Other key, calling up ...') return config.get_str(key) # Call the non-mocked @@ -301,7 +301,7 @@ def test_obtain_lock_already_locked(self, mock_journaldir: TempPathFactory): # Need to release any handles on the lockfile else the sub-process # might not be able to clean up properly, and that will impact # on later tests. - jlock.journal_dir_lockfile.close() + jlock.journal_dir_lockfile.close() # type: ignore print('Telling sub-process to quit...') exit_q.put('quit') diff --git a/tests/killswitch.py/test_apply.py b/tests/killswitch.py/test_apply.py index c199ec485..ab9bb2679 100644 --- a/tests/killswitch.py/test_apply.py +++ b/tests/killswitch.py/test_apply.py @@ -1,6 +1,8 @@ """Test the apply functions used by killswitch to modify data.""" +from __future__ import annotations + import copy -from typing import Any, Optional +from typing import Any import pytest @@ -33,11 +35,11 @@ def test_apply(source: UPDATABLE_DATA, key: str, action: str, to_set: Any, resul def test_apply_errors() -> None: """_apply should fail when passed something that isn't a Sequence or MutableMapping.""" with pytest.raises(ValueError, match=r'Dont know how to'): - killswitch._apply(set(), '0', None, False) # type: ignore # Its intentional that its broken - killswitch._apply(None, '', None) # type: ignore # Its intentional that its broken + killswitch._apply(set(), '0') # type: ignore # Its intentional that its broken + killswitch._apply(None, '') # type: ignore # Its intentional that its broken with pytest.raises(ValueError, match=r'Cannot use string'): - killswitch._apply([], 'test', None, False) + killswitch._apply([], 'test') def test_apply_no_error() -> None: @@ -61,7 +63,7 @@ def test_apply_no_error() -> None: (False, 0), (str((1 << 63)-1), (1 << 63)-1), (True, 1), (str(1 << 1337), 1 << 1337) ] ) -def test_get_int(input: str, expected: Optional[int]) -> None: +def test_get_int(input: str, expected: int | None) -> None: """Check that _get_int doesn't throw when handed bad data.""" assert expected == killswitch._get_int(input) diff --git a/tests/killswitch.py/test_killswitch.py b/tests/killswitch.py/test_killswitch.py index cda672ac7..3c681ded7 100644 --- a/tests/killswitch.py/test_killswitch.py +++ b/tests/killswitch.py/test_killswitch.py @@ -1,7 +1,7 @@ """Tests of killswitch behaviour.""" -import copy -from typing import Optional +from __future__ import annotations +import copy import pytest import semantic_version @@ -34,7 +34,7 @@ ], ) def test_killswitch( - input: killswitch.UPDATABLE_DATA, kill: str, should_pass: bool, result: Optional[killswitch.UPDATABLE_DATA], + input: killswitch.UPDATABLE_DATA, kill: str, should_pass: bool, result: killswitch.UPDATABLE_DATA | None, version: str ) -> None: """Simple killswitch tests."""