Skip to content

Commit

Permalink
Format code with ruff
Browse files Browse the repository at this point in the history
Run ruff formatter using:
ruff format .

Related pwr-Solaar#2295
  • Loading branch information
MattHag committed Feb 20, 2024
1 parent d4c8f3d commit a54dc84
Show file tree
Hide file tree
Showing 52 changed files with 6,889 additions and 6,527 deletions.
15 changes: 8 additions & 7 deletions bin/solaar
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,25 @@ def init_paths():

except UnicodeError:
sys.stderr.write(
'ERROR: Solaar cannot recognize encoding of filesystem path, '
'this may happen because non UTF-8 characters in the pathname.\n'
"ERROR: Solaar cannot recognize encoding of filesystem path, "
"this may happen because non UTF-8 characters in the pathname.\n"
)
sys.exit(1)

prefix = _path.normpath(_path.join(_path.realpath(decoded_path), '..'))
src_lib = _path.join(prefix, 'lib')
share_lib = _path.join(prefix, 'share', 'solaar', 'lib')
prefix = _path.normpath(_path.join(_path.realpath(decoded_path), ".."))
src_lib = _path.join(prefix, "lib")
share_lib = _path.join(prefix, "share", "solaar", "lib")
for location in src_lib, share_lib:
init_py = _path.join(location, 'solaar', '__init__.py')
init_py = _path.join(location, "solaar", "__init__.py")
# print ("sys.path[0]: checking", init_py)
if _path.exists(init_py):
# print ("sys.path[0]: found", location, "replacing", sys.path[0])
sys.path[0] = location
break


if __name__ == '__main__':
if __name__ == "__main__":
init_paths()
import solaar.gtk

solaar.gtk.main()
4 changes: 2 additions & 2 deletions lib/hidapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import platform as _platform

if _platform.system() in ('Darwin', 'Windows'):
if _platform.system() in ("Darwin", "Windows"):
from hidapi.hidapi import close # noqa: F401
from hidapi.hidapi import enumerate # noqa: F401
from hidapi.hidapi import find_paired_node # noqa: F401
Expand All @@ -46,4 +46,4 @@
from hidapi.udev import read # noqa: F401
from hidapi.udev import write # noqa: F401

__version__ = '0.9'
__version__ = "0.9"
162 changes: 83 additions & 79 deletions lib/hidapi/hidapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,37 @@
import ctypes
import logging
import platform as _platform

from collections import namedtuple
from threading import Thread
from time import sleep

import gi

gi.require_version('Gdk', '3.0')
gi.require_version("Gdk", "3.0")
from gi.repository import GLib # NOQA: E402

logger = logging.getLogger(__name__)

native_implementation = 'hidapi'
native_implementation = "hidapi"

# Device info as expected by Solaar
DeviceInfo = namedtuple(
'DeviceInfo', [
'path',
'bus_id',
'vendor_id',
'product_id',
'interface',
'driver',
'manufacturer',
'product',
'serial',
'release',
'isDevice',
'hidpp_short',
'hidpp_long',
]
"DeviceInfo",
[
"path",
"bus_id",
"vendor_id",
"product_id",
"interface",
"driver",
"manufacturer",
"product",
"serial",
"release",
"isDevice",
"hidpp_short",
"hidpp_long",
],
)
del namedtuple

Expand All @@ -67,8 +67,15 @@

# hidapi binary names for various platforms
_library_paths = (
'libhidapi-hidraw.so', 'libhidapi-hidraw.so.0', 'libhidapi-libusb.so', 'libhidapi-libusb.so.0',
'libhidapi-iohidmanager.so', 'libhidapi-iohidmanager.so.0', 'libhidapi.dylib', 'hidapi.dll', 'libhidapi-0.dll'
"libhidapi-hidraw.so",
"libhidapi-hidraw.so.0",
"libhidapi-libusb.so",
"libhidapi-libusb.so.0",
"libhidapi-iohidmanager.so",
"libhidapi-iohidmanager.so.0",
"libhidapi.dylib",
"hidapi.dll",
"libhidapi-0.dll",
)

for lib in _library_paths:
Expand All @@ -84,9 +91,9 @@
# Retrieve version of hdiapi library
class _cHidApiVersion(ctypes.Structure):
_fields_ = [
('major', ctypes.c_int),
('minor', ctypes.c_int),
('patch', ctypes.c_int),
("major", ctypes.c_int),
("minor", ctypes.c_int),
("patch", ctypes.c_int),
]


Expand All @@ -97,28 +104,27 @@ class _cHidApiVersion(ctypes.Structure):

# Construct device info struct based on API version
class _cDeviceInfo(ctypes.Structure):

def as_dict(self):
return {name: getattr(self, name) for name, _t in self._fields_ if name != 'next'}
return {name: getattr(self, name) for name, _t in self._fields_ if name != "next"}


# Low level hdiapi device info struct
# See https://github.com/libusb/hidapi/blob/master/hidapi/hidapi.h#L143
_cDeviceInfo_fields = [
('path', ctypes.c_char_p),
('vendor_id', ctypes.c_ushort),
('product_id', ctypes.c_ushort),
('serial_number', ctypes.c_wchar_p),
('release_number', ctypes.c_ushort),
('manufacturer_string', ctypes.c_wchar_p),
('product_string', ctypes.c_wchar_p),
('usage_page', ctypes.c_ushort),
('usage', ctypes.c_ushort),
('interface_number', ctypes.c_int),
('next', ctypes.POINTER(_cDeviceInfo)),
("path", ctypes.c_char_p),
("vendor_id", ctypes.c_ushort),
("product_id", ctypes.c_ushort),
("serial_number", ctypes.c_wchar_p),
("release_number", ctypes.c_ushort),
("manufacturer_string", ctypes.c_wchar_p),
("product_string", ctypes.c_wchar_p),
("usage_page", ctypes.c_ushort),
("usage", ctypes.c_ushort),
("interface_number", ctypes.c_int),
("next", ctypes.POINTER(_cDeviceInfo)),
]
if _hid_version.contents.major >= 0 and _hid_version.contents.minor >= 13:
_cDeviceInfo_fields.append(('bus_type', ctypes.c_int))
_cDeviceInfo_fields.append(("bus_type", ctypes.c_int))
_cDeviceInfo._fields_ = _cDeviceInfo_fields

# Set up hidapi functions
Expand Down Expand Up @@ -168,7 +174,7 @@ def as_dict(self):
# Solaar opens the same device more than once which will fail unless we
# allow non-exclusive opening. On windows opening with shared access is
# the default, for macOS we need to set it explicitly.
if _platform.system() == 'Darwin':
if _platform.system() == "Darwin":
_hidapi.hid_darwin_set_open_exclusive.argtypes = [ctypes.c_int]
_hidapi.hid_darwin_set_open_exclusive.restype = None
_hidapi.hid_darwin_set_open_exclusive(0)
Expand All @@ -179,7 +185,7 @@ class HIDError(Exception):


def _enumerate_devices():
""" Returns all HID devices which are potentially useful to us """
"""Returns all HID devices which are potentially useful to us"""
devices = []
c_devices = _hidapi.hid_enumerate(0, 0)
p = c_devices
Expand All @@ -188,19 +194,19 @@ def _enumerate_devices():
p = p.contents.next
_hidapi.hid_free_enumeration(c_devices)

keyboard_or_mouse = {d['path'] for d in devices if d['usage_page'] == 1 and d['usage'] in (6, 2)}
keyboard_or_mouse = {d["path"] for d in devices if d["usage_page"] == 1 and d["usage"] in (6, 2)}
unique_devices = {}
for device in devices:
# On macOS we cannot access keyboard or mouse devices without special permissions. Since
# we don't need them anyway we remove them so opening them doesn't cause errors later.
if device['path'] in keyboard_or_mouse:
if device["path"] in keyboard_or_mouse:
# print(f"Ignoring keyboard or mouse device: {device}")
continue

# hidapi returns separate entries for each usage page of a device.
# Deduplicate by path to only keep one device entry.
if device['path'] not in unique_devices:
unique_devices[device['path']] = device
if device["path"] not in unique_devices:
unique_devices[device["path"]] = device

unique_devices = unique_devices.values()
# print("Unique devices:\n" + '\n'.join([f"{dev}" for dev in unique_devices]))
Expand All @@ -209,7 +215,6 @@ def _enumerate_devices():

# Use a separate thread to check if devices have been removed or connected
class _DeviceMonitor(Thread):

def __init__(self, device_callback, polling_delay=5.0):
self.device_callback = device_callback
self.polling_delay = polling_delay
Expand All @@ -225,10 +230,10 @@ def run(self):
current_devices = {tuple(dev.items()): dev for dev in _enumerate_devices()}
for key, device in self.prev_devices.items():
if key not in current_devices:
self.device_callback('remove', device)
self.device_callback("remove", device)
for key, device in current_devices.items():
if key not in self.prev_devices:
self.device_callback('add', device)
self.device_callback("add", device)
self.prev_devices = current_devices
sleep(self.polling_delay)

Expand All @@ -237,29 +242,29 @@ def run(self):
# It is given the bus id, vendor id, and product id and returns a dictionary
# with the required hid_driver and usb_interface and whether this is a receiver or device.
def _match(action, device, filterfn):
vid = device['vendor_id']
pid = device['product_id']
vid = device["vendor_id"]
pid = device["product_id"]

# Translate hidapi bus_type to the bus_id values Solaar expects
if device.get('bus_type') == 0x01:
if device.get("bus_type") == 0x01:
bus_id = 0x03 # USB
elif device.get('bus_type') == 0x02:
elif device.get("bus_type") == 0x02:
bus_id = 0x05 # Bluetooth
else:
bus_id = None

# Check for hidpp support
device['hidpp_short'] = False
device['hidpp_long'] = False
device["hidpp_short"] = False
device["hidpp_long"] = False
device_handle = None
try:
device_handle = open_path(device['path'])
device_handle = open_path(device["path"])
report = get_input_report(device_handle, 0x10, 32)
if len(report) == 1 + 6 and report[0] == 0x10:
device['hidpp_short'] = True
device["hidpp_short"] = True
report = get_input_report(device_handle, 0x11, 32)
if len(report) == 1 + 19 and report[0] == 0x11:
device['hidpp_long'] = True
device["hidpp_long"] = True
except HIDError as e: # noqa: F841
if logger.isEnabledFor(logging.INFO):
logger.info(f"Error opening device {device['path']} ({bus_id}/{vid:04X}/{pid:04X}) for hidpp check: {e}") # noqa
Expand All @@ -269,41 +274,41 @@ def _match(action, device, filterfn):

if logger.isEnabledFor(logging.INFO):
logger.info(
'Found device BID %s VID %04X PID %04X HID++ %s %s', bus_id, vid, pid, device['hidpp_short'], device['hidpp_long']
"Found device BID %s VID %04X PID %04X HID++ %s %s", bus_id, vid, pid, device["hidpp_short"], device["hidpp_long"]
)

if not device['hidpp_short'] and not device['hidpp_long']:
if not device["hidpp_short"] and not device["hidpp_long"]:
return None

filter = filterfn(bus_id, vid, pid, device['hidpp_short'], device['hidpp_long'])
filter = filterfn(bus_id, vid, pid, device["hidpp_short"], device["hidpp_long"])
if not filter:
return
isDevice = filter.get('isDevice')
isDevice = filter.get("isDevice")

if action == 'add':
if action == "add":
d_info = DeviceInfo(
path=device['path'].decode(),
path=device["path"].decode(),
bus_id=bus_id,
vendor_id=f'{vid:04X}', # noqa
product_id=f'{pid:04X}', # noqa
vendor_id=f"{vid:04X}", # noqa
product_id=f"{pid:04X}", # noqa
interface=None,
driver=None,
manufacturer=device['manufacturer_string'],
product=device['product_string'],
serial=device['serial_number'],
release=device['release_number'],
manufacturer=device["manufacturer_string"],
product=device["product_string"],
serial=device["serial_number"],
release=device["release_number"],
isDevice=isDevice,
hidpp_short=device['hidpp_short'],
hidpp_long=device['hidpp_long'],
hidpp_short=device["hidpp_short"],
hidpp_long=device["hidpp_long"],
)
return d_info

elif action == 'remove':
elif action == "remove":
d_info = DeviceInfo(
path=device['path'].decode(),
path=device["path"].decode(),
bus_id=None,
vendor_id=f'{vid:04X}', # noqa
product_id=f'{pid:04X}', # noqa
vendor_id=f"{vid:04X}", # noqa
product_id=f"{pid:04X}", # noqa
interface=None,
driver=None,
manufacturer=None,
Expand All @@ -328,14 +333,13 @@ def find_paired_node_wpid(receiver_path, index):


def monitor_glib(callback, filterfn):

def device_callback(action, device):
# print(f"device_callback({action}): {device}")
if action == 'add':
if action == "add":
d_info = _match(action, device, filterfn)
if d_info:
GLib.idle_add(callback, action, d_info)
elif action == 'remove':
elif action == "remove":
# Removed devices will be detected by Solaar directly
pass

Expand All @@ -352,7 +356,7 @@ def enumerate(filterfn):
:returns: a list of matching ``DeviceInfo`` tuples.
"""
for device in _enumerate_devices():
d_info = _match('add', device, filterfn)
d_info = _match("add", device, filterfn)
if d_info:
yield d_info

Expand Down Expand Up @@ -463,7 +467,7 @@ def read(device_handle, bytes_count, timeout_ms=None):
def get_input_report(device_handle, report_id, size):
assert device_handle
data = ctypes.create_string_buffer(size)
data[0] = bytearray((report_id, ))
data[0] = bytearray((report_id,))
size = _hidapi.hid_get_input_report(device_handle, data, size)
if size < 0:
raise HIDError(_hidapi.hid_error(device_handle))
Expand All @@ -475,7 +479,7 @@ def _readstring(device_handle, func, max_length=255):
buf = ctypes.create_unicode_buffer(max_length)
ret = func(device_handle, buf, max_length)
if ret < 0:
raise HIDError('Error reading device property')
raise HIDError("Error reading device property")
return buf.value


Expand Down
Loading

0 comments on commit a54dc84

Please sign in to comment.