Skip to content

Commit

Permalink
Use __init_subclass__ cf. decorators to register device types
Browse files Browse the repository at this point in the history
  • Loading branch information
alexdewar committed Oct 9, 2023
1 parent f2834e8 commit 443a42b
Show file tree
Hide file tree
Showing 17 changed files with 277 additions and 264 deletions.
34 changes: 3 additions & 31 deletions finesse/hardware/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,18 @@
from datetime import datetime

from finesse.config import NUM_TEMPERATURE_MONITOR_CHANNELS, TEMPERATURE_MONITOR_TOPIC
from finesse.device_info import DeviceBaseTypeInfo, DeviceInstanceRef, DeviceTypeInfo
from finesse.device_info import DeviceInstanceRef

from . import data_file_writer # noqa: F401
from .plugins import load_device_types
from .device import get_device_type_registry
from .plugins.temperature import get_temperature_monitor_instance

_opus: OPUSInterface


def _get_device_type_info() -> dict[DeviceBaseTypeInfo, list[DeviceTypeInfo]]:
"""Return info about device types grouped according to their base type."""
base_types, device_types = load_device_types()

# Get the base type info and sort it alphabetically by description
base_types_info = sorted(
(t.get_device_base_type_info() for t in base_types),
key=lambda info: info.description,
)

# Preallocate dict with empty lists
out: dict[DeviceBaseTypeInfo, list[DeviceTypeInfo]] = {
info: [] for info in base_types_info
}

# Get device type info and group by base type
for device_type in device_types:
out[device_type.get_device_base_type_info()].append(
device_type.get_device_type_info()
)

# Sort the device types by name
for infos in out.values():
infos.sort(key=lambda info: info.description)

return out


def _broadcast_device_types() -> None:
"""Broadcast the available device types via pubsub."""
pub.sendMessage("device.list", device_types=_get_device_type_info())
pub.sendMessage("device.list", device_types=get_device_type_registry())


def _try_get_temperatures() -> list[Decimal] | None:
Expand Down
171 changes: 171 additions & 0 deletions finesse/hardware/device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
"""Provides base classes for all types of devices.
The Device class is the top-level base class from which all devices ultimately inherit.
The DeviceBaseType is a base class for *types* of devices (e.g. a stepper motor). All
concrete classes for devices must inherit from a particular DeviceBaseType.
"""
from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Sequence
from typing import Any

from finesse.device_info import DeviceBaseTypeInfo, DeviceParameter, DeviceTypeInfo

from .plugins import load_all_plugins

_base_types: set[type[DeviceBaseType]] = set()
"""Registry of device base types."""

_device_types: set[type[Device]] = set()
"""Registry of concrete device types."""


def get_device_type_registry() -> dict[DeviceBaseTypeInfo, list[DeviceTypeInfo]]:
"""Return info about device types grouped according to their base type."""
# Ensure all base types and device types have been registered
load_all_plugins()

# Get the base type info and sort it alphabetically by description
base_types_info = sorted(
(t.get_device_base_type_info() for t in _base_types),
key=lambda info: info.description,
)

# Preallocate dict with empty lists
out: dict[DeviceBaseTypeInfo, list[DeviceTypeInfo]] = {
info: [] for info in base_types_info
}

# Get device type info and group by base type
for device_type in _device_types:
out[device_type.get_device_base_type_info()].append(
device_type.get_device_type_info()
)

# Sort the device types by name
for infos in out.values():
infos.sort(key=lambda info: info.description)

return out


class Device(ABC):
"""A base class for all devices."""

_device_base_type_info: DeviceBaseTypeInfo
"""Information about the device's base type."""
_device_description: str
"""A human-readable name."""
_device_parameters: list[DeviceParameter] = []
"""Possible parameters that this device type accepts.
The key represents the parameter name and the value is a list of possible values.
"""

def __init_subclass__(cls, description: str | None = None, **kwargs: Any) -> None:
"""Initialise a device type class.
While the description argument has to be optional to allow for non-concrete
classes to inherit from Device, it is mandatory to include it, else the device
type class will not be added to the registry.
Args:
description: Human-readable name for this device type.
"""
# Forward keyword args to allow for multiple inheritance
super().__init_subclass__(**kwargs)

# **HACK**: Allow callers to omit this arg in the case that they are
# non-concrete
if not description:
return

# Set device description for this class
cls._device_description = description

# Add the class to the registry of device types
_device_types.add(cls)

def __init__(self, name: str | None = None) -> None:
"""Create a new DeviceBase.
Args:
name: A name to distinguish devices of the same type.
"""
self.topic = f"device.{self._device_base_type_info.name}"
"""The name of the root pubsub topic on which this device will broadcast."""

self.name = name
"""The (optional) name of this device to use in pubsub messages."""

if not self._device_base_type_info.names_short:
if name:
raise RuntimeError(
"Name provided for device which cannot accept names."
)
return

if name not in self._device_base_type_info.names_short:
raise RuntimeError("Invalid name given for device")

self.topic += f".{name}"

@abstractmethod
def close(self) -> None:
"""Close the connection to the device."""

@classmethod
def get_device_base_type_info(cls) -> DeviceBaseTypeInfo:
"""Get information about the base type for this device type."""
return cls._device_base_type_info

@classmethod
def get_device_type_info(cls) -> DeviceTypeInfo:
"""Get information about this device type."""
return DeviceTypeInfo(
cls._device_description,
cls._device_parameters,
cls.__module__,
cls.__name__,
)

@classmethod
def from_params(cls, **kwargs: Any) -> Device:
"""Create an instance of this object from the specified keyword args."""
return cls(**kwargs)


class DeviceBaseType(Device):
"""A class representing a type of device (e.g. a stepper motor)."""

def __init_subclass__(
cls,
**kwargs,
) -> None:
"""Initialise a class representing a device base type."""
# Only classes which inherit from DeviceBaseClass *directly* are counted as base
# types
if DeviceBaseType in cls.__bases__:
cls._init_base_type(**kwargs)
else:
# For all other cases, just call the parent's __init_subclass__
super().__init_subclass__(**kwargs)

@classmethod
def _init_base_type(
cls,
name: str,
description: str,
names_short: Sequence[str] = (),
names_long: Sequence[str] = (),
**kwargs,
) -> None:
super().__init_subclass__(**kwargs)
# Store metadata about this base class
cls._device_base_type_info = DeviceBaseTypeInfo(
name, description, names_short, names_long
)

# Add the class to the registry of base types
_base_types.add(cls)
76 changes: 0 additions & 76 deletions finesse/hardware/device_base.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

from finesse.device_info import DeviceInstanceRef

from .device_base import DeviceBase
from .device import Device, DeviceBaseType

_devices: dict[DeviceInstanceRef, DeviceBase] = {}
_devices: dict[DeviceInstanceRef, Device] = {}

_T = TypeVar("_T", bound=DeviceBase)
_T = TypeVar("_T", bound=DeviceBaseType)


def get_device_instance(base_type: type[_T], name: str | None = None) -> _T | None:
Expand Down Expand Up @@ -40,7 +40,7 @@ def _open_device(
"""
# Assume this is safe because module and class_name will not be provided directly by
# the user
cls: DeviceBase = getattr(import_module(module), class_name)
cls: Device = getattr(import_module(module), class_name)

logging.info(f"Opening device of type {instance.base_type}: {class_name}")

Expand Down
Loading

0 comments on commit 443a42b

Please sign in to comment.