Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Creation of devices #2493

Merged
merged 2 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions lib/logitech_receiver/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

from typing import Callable
from typing import Optional
from typing import Protocol
from typing import cast

import hidapi

Expand All @@ -46,23 +48,49 @@
_IR = hidpp10_constants.INFO_SUBREGISTERS


class LowLevelInterface(Protocol):
def open_path(self, path):
...

def ping(self, handle, number, long_message: bool):
...

def request(self, handle, devnumber, request_id, *params, **kwargs):
...

def close(self, handle, *args, **kwargs) -> bool:
...


low_level_interface = cast(LowLevelInterface, base)


class DeviceFactory:
@staticmethod
def create_device(device_info, setting_callback=None):
def create_device(low_level: LowLevelInterface, device_info, setting_callback=None):
"""Opens a Logitech Device found attached to the machine, by Linux device path.
:returns: An open file handle for the found receiver, or None.
"""
try:
handle = base.open_path(device_info.path)
handle = low_level.open_path(device_info.path)
if handle:
# a direct connected device might not be online (as reported by user)
return Device(None, None, None, handle=handle, device_info=device_info, setting_callback=setting_callback)
return Device(
low_level,
None,
None,
None,
handle=handle,
device_info=device_info,
setting_callback=setting_callback,
)
except OSError as e:
logger.exception("open %s", device_info)
if e.errno == errno.EACCES:
raise
except Exception:
logger.exception("open %s", device_info)
raise


class Device:
Expand All @@ -72,6 +100,7 @@ class Device:

def __init__(
self,
low_level: LowLevelInterface,
receiver,
number,
online,
Expand All @@ -83,6 +112,7 @@ def __init__(
assert receiver or device_info
if receiver:
assert 0 < number <= 15 # some receivers have devices past their max # of devices
self.low_level = low_level
self.number = number # will be None at this point for directly connected devices
self.online = online # is the device online? - gates many atempts to contact the device
self.descriptor = None
Expand Down Expand Up @@ -129,11 +159,11 @@ def __init__(
self.path = hidapi.find_paired_node(receiver.path, number, 1) if receiver else None
if not self.handle:
try:
self.handle = base.open_path(self.path) if self.path else None
self.handle = self.low_level.open_path(self.path) if self.path else None
except Exception: # maybe the device wasn't set up
try:
time.sleep(1)
self.handle = base.open_path(self.path) if self.path else None
self.handle = self.low_level.open_path(self.path) if self.path else None
except Exception: # give up
self.handle = None # should this give up completely?

Expand Down Expand Up @@ -484,7 +514,7 @@ def request(self, request_id, *params, no_reply=False):
long = self.hidpp_long is True or (
self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0)
)
return base.request(
return self.low_level.request(
self.handle or self.receiver.handle,
self.number,
request_id,
Expand All @@ -503,7 +533,8 @@ def ping(self):
long = self.hidpp_long is True or (
self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0)
)
protocol = base.ping(self.handle or self.receiver.handle, self.number, long_message=long)
handle = self.handle or self.receiver.handle
protocol = self.low_level.ping(handle, self.number, long_message=long)
self.online = protocol is not None
if protocol:
self._protocol = protocol
Expand All @@ -519,7 +550,7 @@ def close(self):
if hasattr(self, "cleanups"):
for cleanup in self.cleanups:
cleanup(self)
return handle and base.close(handle)
return handle and self.low_level.close(handle)

def __index__(self):
return self.number
Expand Down
21 changes: 20 additions & 1 deletion lib/logitech_receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from dataclasses import dataclass
from typing import Callable
from typing import Optional
from typing import Protocol
from typing import cast

import hidapi

Expand All @@ -42,6 +44,23 @@
_IR = hidpp10_constants.INFO_SUBREGISTERS


class LowLevelInterface(Protocol):
def open_path(self, path):
...

def ping(self, handle, number, long_message=False):
...

def request(self, handle, devnumber, request_id, *params, **kwargs):
...

def close(self, handle):
...


low_level_interface = cast(LowLevelInterface, base)


@dataclass
class Pairing:
"""Information about the current or most recent pairing"""
Expand Down Expand Up @@ -228,7 +247,7 @@ def register_new_device(self, number, notification=None):
logger.warning("mismatch on device kind %s %s", info["kind"], nkind)
else:
online = True
dev = Device(self, number, online, pairing_info=info, setting_callback=self.setting_callback)
dev = Device(low_level_interface, self, number, online, pairing_info=info, setting_callback=self.setting_callback)
if logger.isEnabledFor(logging.INFO):
logger.info("%s: found new device %d (%s)", self, number, dev.wpid)
self._devices[number] = dev
Expand Down
9 changes: 4 additions & 5 deletions lib/solaar/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
import logitech_receiver.device as _device
import logitech_receiver.receiver as _receiver

from logitech_receiver.base import receivers
from logitech_receiver.base import receivers_and_devices
from logitech_receiver import base

from solaar import NAME

Expand Down Expand Up @@ -108,7 +107,7 @@ def _create_parser():


def _receivers(dev_path=None):
for dev_info in receivers():
for dev_info in base.receivers():
if dev_path is not None and dev_path != dev_info.path:
continue
try:
Expand All @@ -123,12 +122,12 @@ def _receivers(dev_path=None):


def _receivers_and_devices(dev_path=None):
for dev_info in receivers_and_devices():
for dev_info in base.receivers_and_devices():
if dev_path is not None and dev_path != dev_info.path:
continue
try:
if dev_info.isDevice:
d = _device.DeviceFactory.create_device(dev_info)
d = _device.DeviceFactory.create_device(base, dev_info)
else:
d = _receiver.ReceiverFactory.create_receiver(dev_info)

Expand Down
2 changes: 1 addition & 1 deletion lib/solaar/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def _start(device_info):
if not isDevice:
receiver = _receiver.ReceiverFactory.create_receiver(device_info, _setting_callback)
else:
receiver = _device.DeviceFactory.create_device(device_info, _setting_callback)
receiver = _device.DeviceFactory.create_device(_base, device_info, _setting_callback)
if receiver:
configuration.attach_to(receiver)
if receiver.bluetooth and receiver.hid_serial:
Expand Down
Loading
Loading