Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jobs:
- run: uv sync --all-extras --dev
- run: uv run ruff check
- run: uv run ruff format --check
- run: uv run pyrefly check

docs:
runs-on: ubuntu-latest
Expand Down
16 changes: 11 additions & 5 deletions monitorcontrol/monitorcontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def set_input_source(self, value: Union[int, str, InputSource]):
self._set_vcp_feature(vcp_codes.input_select, mode_value)


def get_vcps() -> List[Type[vcp.VCP]]:
def get_vcps() -> List[vcp.VCP]:
"""
Discovers virtual control panels.

Expand Down Expand Up @@ -587,18 +587,19 @@ def _parse_capabilities(caps_str: str) -> dict:
"""
Converts the capabilities string into a nice dict
"""
caps_dict = {

caps_dict: dict[str, str | list | dict[int, dict]] = {
# Used to specify the protocol class
"prot": "",
# Identifies the type of display
"type": "",
# The display model number
"model": "",
# A list of supported VCP codes. Somehow not the same as "vcp"
"cmds": "",
"cmds": {},
# A list of supported VCP codes with a list of supported values
# for each nc code
"vcp": "",
"vcp": {},
# undocumented
"mswhql": "",
# undocumented
Expand All @@ -612,19 +613,24 @@ def _parse_capabilities(caps_str: str) -> dict:
# Alternate name to be used for control
"vcpname": "",
# Parsed input sources into text. Not part of capabilities string.
"inputs": "",
"inputs": [],
# Parsed color presets into text. Not part of capabilities string.
"color_presets": "",
}

for key in caps_dict:
# The "cmds" and "vcp" caps can be a mapping
if key in ["cmds", "vcp"]:
caps_dict[key] = _convert_to_dict(_extract_a_cap(caps_str, key))
else:
caps_dict[key] = _extract_a_cap(caps_str, key)

# Parse the input sources into a text list for readability
input_source_cap = vcp_codes.input_select.value

# Put this check here to appease the type checker
if isinstance(caps_dict["vcp"], str):
raise ValueError("VCP capabilities dictionary is the wrong type!")
if input_source_cap in caps_dict["vcp"]:
caps_dict["inputs"] = []
input_val_list = list(caps_dict["vcp"][input_source_cap].keys())
Expand Down
8 changes: 6 additions & 2 deletions monitorcontrol/vcp/vcp_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ class VCPPermissionError(VCPError):
pass


class VCP(abc.ABC):
class VCP[T](abc.ABC):
@abc.abstractmethod
def __enter__(self):
def __enter__(self) -> T:
pass

@abc.abstractmethod
Expand Down Expand Up @@ -64,3 +64,7 @@ def get_vcp_feature(self, code: int) -> Tuple[int, int]:
VCPError: Failed to get VCP feature.
"""
pass

@abc.abstractmethod
def get_vcp_capabilities(self) -> str:
pass
21 changes: 14 additions & 7 deletions monitorcontrol/vcp/vcp_linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(self, bus_number: int):
"""
self.logger = logging.getLogger(__name__)
self.bus_number = bus_number
self.fd: Optional[str] = None
self.fd: Optional[int] = None
self.fp: str = f"/dev/i2c-{self.bus_number}"
# time of last feature set call
self.last_set: Optional[float] = None
Expand All @@ -64,7 +64,7 @@ def __enter__(self):
def cleanup(fd: Optional[int]):
if fd is not None:
try:
os.close(self.fd)
os.close(fd)
except OSError:
pass

Expand All @@ -90,7 +90,8 @@ def __exit__(
exception_traceback: Optional[TracebackType],
) -> Optional[bool]:
try:
os.close(self.fd)
if self.fd:
os.close(self.fd)
except OSError as e:
raise VCPIOError("unable to close descriptor") from e
self.fd = None
Expand Down Expand Up @@ -359,11 +360,14 @@ def read_bytes(self, num_bytes: int) -> bytes:
VCPIOError: unable to read data
"""
try:
return os.read(self.fd, num_bytes)
if self.fd:
return os.read(self.fd, num_bytes)
else:
raise VCPIOError("unable read from I2C bus: no open file descriptor")
except OSError as e:
raise VCPIOError("unable to read from I2C bus") from e

def write_bytes(self, data: bytes):
def write_bytes(self, data: bytes | bytearray):
"""
Writes bytes to the I2C bus.

Expand All @@ -374,12 +378,15 @@ def write_bytes(self, data: bytes):
VCPIOError: unable to write data
"""
try:
os.write(self.fd, data)
if self.fd:
os.write(self.fd, data)
else:
VCPIOError("unable write to I2C bus: no open file descriptor found")
except OSError as e:
raise VCPIOError("unable write to I2C bus") from e


def get_vcps() -> List[LinuxVCP]:
def get_vcps() -> List[VCP]:
"""
Interrogates I2C buses to determine if they are DDC-CI capable.

Expand Down
56 changes: 31 additions & 25 deletions monitorcontrol/vcp/vcp_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@
WCHAR,
)

# Move some aliases here so we can have our type
# ignoring in one place

# pyrefly: ignore[missing-attribute]
c_windll = ctypes.windll
# pyrefly: ignore[missing-attribute]
c_formaterror = ctypes.FormatError


# structure type for a physical monitor
class PhysicalMonitor(ctypes.Structure):
Expand All @@ -37,11 +45,12 @@ def __init__(self, handle: HANDLE, description: str):
description: Text description of the physical monitor.
"""
self.logger = logging.getLogger(__name__)
self.handle = handle
self.handle_p = handle
self.handle = int(handle)
self.description = description

def __del__(self):
WindowsVCP._destroy_physical_monitor(self.handle)
WindowsVCP._destroy_physical_monitor(self.handle_p)

def __enter__(self):
pass
Expand Down Expand Up @@ -70,10 +79,10 @@ def set_vcp_feature(self, code: int, value: int):
extra=dict(code=code, value=value),
)
try:
if not ctypes.windll.dxva2.SetVCPFeature(
HANDLE(self.handle), BYTE(code), DWORD(value)
if not c_windll.dxva2.SetVCPFeature(
HANDLE(int(self.handle)), BYTE(code), DWORD(value)
):
raise VCPError("failed to set VCP feature: " + ctypes.FormatError())
raise VCPError("failed to set VCP feature: " + c_formaterror())
except OSError as e:
raise VCPError("failed to close handle") from e

Expand All @@ -97,14 +106,14 @@ def get_vcp_feature(self, code: int) -> Tuple[int, int]:
extra=dict(code=code),
)
try:
if not ctypes.windll.dxva2.GetVCPFeatureAndVCPFeatureReply(
if not c_windll.dxva2.GetVCPFeatureAndVCPFeatureReply(
HANDLE(self.handle),
BYTE(code),
None,
ctypes.byref(feature_current),
ctypes.byref(feature_max),
):
raise VCPError("failed to get VCP feature: " + ctypes.FormatError())
raise VCPError("failed to get VCP feature: " + c_formaterror())
except OSError as e:
raise VCPError("failed to get VCP feature") from e
self.logger.debug(
Expand All @@ -131,20 +140,16 @@ def get_vcp_capabilities(self):
cap_length = DWORD()
self.logger.debug("GetCapabilitiesStringLength")
try:
if not ctypes.windll.dxva2.GetCapabilitiesStringLength(
if not c_windll.dxva2.GetCapabilitiesStringLength(
HANDLE(self.handle), ctypes.byref(cap_length)
):
raise VCPError(
"failed to get VCP capabilities: " + ctypes.FormatError()
)
raise VCPError("failed to get VCP capabilities: " + c_formaterror())
cap_string = (ctypes.c_char * cap_length.value)()
self.logger.debug("CapabilitiesRequestAndCapabilitiesReply")
if not ctypes.windll.dxva2.CapabilitiesRequestAndCapabilitiesReply(
if not c_windll.dxva2.CapabilitiesRequestAndCapabilitiesReply(
HANDLE(self.handle), cap_string, cap_length
):
raise VCPError(
"failed to get VCP capabilities: " + ctypes.FormatError()
)
raise VCPError("failed to get VCP capabilities: " + c_formaterror())
except OSError as e:
raise VCPError("failed to get VCP capabilities") from e
return cap_string.value.decode("ascii")
Expand All @@ -167,19 +172,20 @@ def _get_hmonitors() -> List[HMONITOR]:
"""
Calls the Windows `EnumDisplayMonitors` API in Python-friendly form.
"""
hmonitors = [] # type: List[HMONITOR]
hmonitors: List[HMONITOR] = []
try:

def _callback(hmonitor, hdc, lprect, lparam):
hmonitors.append(HMONITOR(hmonitor))
del hmonitor, hdc, lprect, lparam
return True # continue enumeration

# pyrefly: ignore[missing-attribute]
MONITORENUMPROC = ctypes.WINFUNCTYPE( # noqa: N806
BOOL, HMONITOR, HDC, ctypes.POINTER(RECT), LPARAM
)
callback = MONITORENUMPROC(_callback)
if not ctypes.windll.user32.EnumDisplayMonitors(0, 0, callback, 0):
if not c_windll.user32.EnumDisplayMonitors(0, 0, callback, 0):
raise VCPError("Call to EnumDisplayMonitors failed")
except OSError as e:
raise VCPError("failed to enumerate VCPs") from e
Expand All @@ -194,12 +200,13 @@ def _physical_monitors_from_hmonitor(
"""
num_physical = DWORD()
try:
if not ctypes.windll.dxva2.GetNumberOfPhysicalMonitorsFromHMONITOR(
# pyrefly: ignore[missing-attribute]
if not c_windll.dxva2.GetNumberOfPhysicalMonitorsFromHMONITOR(
hmonitor, ctypes.byref(num_physical)
):
raise VCPError(
"Call to GetNumberOfPhysicalMonitorsFromHMONITOR failed: "
+ ctypes.FormatError()
+ c_formaterror()
)
except OSError as e:
raise VCPError(
Expand All @@ -208,17 +215,16 @@ def _physical_monitors_from_hmonitor(

physical_monitors = (PhysicalMonitor * num_physical.value)()
try:
if not ctypes.windll.dxva2.GetPhysicalMonitorsFromHMONITOR(
if not c_windll.dxva2.GetPhysicalMonitorsFromHMONITOR(
hmonitor, num_physical.value, physical_monitors
):
raise VCPError(
"Call to GetPhysicalMonitorsFromHMONITOR failed: "
+ ctypes.FormatError()
"Call to GetPhysicalMonitorsFromHMONITOR failed: " + c_formaterror()
)
except OSError as e:
raise VCPError("failed to open physical monitor handle") from e
return (
[physical_monitor.handle, physical_monitor.description]
(physical_monitor.handle, physical_monitor.description)
for physical_monitor in physical_monitors
)

Expand All @@ -228,9 +234,9 @@ def _destroy_physical_monitor(handle: HANDLE) -> None:
Calls the Windows `DestroyPhysicalMonitor` API in Python-friendly form.
"""
try:
if not ctypes.windll.dxva2.DestroyPhysicalMonitor(handle):
if not c_windll.dxva2.DestroyPhysicalMonitor(handle):
raise VCPError(
"Call to DestroyPhysicalMonitor failed: " + ctypes.FormatError()
"Call to DestroyPhysicalMonitor failed: " + c_formaterror()
)
except OSError as e:
raise VCPError("failed to close handle") from e
Expand Down
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,17 @@ dependencies = [
"pyudev>=0.23.3 ; sys_platform != 'win32'",
]

[tool.pyrefly]
project-includes = ["**/*"]
project-excludes = [
"**/__pycache__",
"**/*venv/**/*",
]

[dependency-groups]
dev = [
"coveralls>=4.0.1",
"pyrefly>=v0.37.0",
"pytest>=8.3.4",
"pytest-cov>=6.0.0",
"ruff>=0.11.11",
Expand Down
1 change: 1 addition & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .test_monitorcontrol import UnitTestVCP
import monitorcontrol.__main__
from monitorcontrol import Monitor
from monitorcontrol.__main__ import main, count_to_level
from unittest import mock
Expand Down
6 changes: 4 additions & 2 deletions tests/test_monitorcontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def __exit__(


def test_context_manager_assert():
m = Monitor(None)
vcps = get_test_vcps()
m = Monitor(vcps[0])
with pytest.raises(AssertionError):
m.get_power_mode()

Expand All @@ -65,7 +66,7 @@ def test_get_monitors():
get_monitors()


def get_test_vcps() -> List[Type[vcp.VCP]]:
def get_test_vcps() -> List[vcp.VCP]:
if USE_ATTACHED_MONITORS:
return get_vcps()
else:
Expand Down Expand Up @@ -222,6 +223,7 @@ def test_input_source_issue_59(monitor: Monitor):

def test_input_source_type_error(monitor: Monitor):
with pytest.raises(TypeError):
# pyrefly: ignore[bad-argument-type]
monitor.set_input_source([])


Expand Down
3 changes: 3 additions & 0 deletions tests/test_windows_vcp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import sys
import pytest
from unittest.mock import patch

if not sys.platform.startswith("win"):
pytest.skip("skipping windows-only tests", allow_module_level=True)

from monitorcontrol.vcp.vcp_windows import WindowsVCP

Expand Down
Loading