From 639a008325dc10e80de78fd585d9ce02aafbeb8c Mon Sep 17 00:00:00 2001 From: Teemu R Date: Mon, 26 Sep 2022 20:14:42 +0200 Subject: [PATCH] Add miot simulator (#1539) This adds a simple, full-functioning miot simulator based on miotspec files, making it possible to test both python-miio and downstream implementations. The simulator keeps an internal state constructed based on the defined property constraints (e.g., ranges for int properties or choices for enums). The values are currently randomly generated based on the constraints the spec file does. The simulator implements the main commands a miot device uses: * get_properties (get property values) * set_properties (set property values) * action (call actions, noop returning success) * miIO.info Additionally, the available services and properties can be dumped using `dump_services` and `dump_properties` commands. Using `miiocli devtools miot-simulator` requires defining the model (`--model`) and the path to a miotspec json file (`--file`). --- miio/devtools/__init__.py | 6 +- miio/devtools/simulators/__init__.py | 4 +- miio/devtools/simulators/miotsimulator.py | 207 ++++++++++++++++++++++ miio/devtools/simulators/models.py | 106 +++++++++++ 4 files changed, 317 insertions(+), 6 deletions(-) create mode 100644 miio/devtools/simulators/miotsimulator.py create mode 100644 miio/devtools/simulators/models.py diff --git a/miio/devtools/__init__.py b/miio/devtools/__init__.py index 936e12934..6a1de31f2 100644 --- a/miio/devtools/__init__.py +++ b/miio/devtools/__init__.py @@ -5,9 +5,7 @@ from .pcapparser import parse_pcap from .propertytester import test_properties -from .simulators import miio_simulator - -# from .simulators import miot_simulator +from .simulators import miio_simulator, miot_simulator _LOGGER = logging.getLogger(__name__) @@ -21,4 +19,4 @@ def devtools(ctx: click.Context): devtools.add_command(parse_pcap) devtools.add_command(test_properties) devtools.add_command(miio_simulator) -# devtools.add_command(miot_simulator) +devtools.add_command(miot_simulator) diff --git a/miio/devtools/simulators/__init__.py b/miio/devtools/simulators/__init__.py index 692d1e2cd..4ae1b7903 100644 --- a/miio/devtools/simulators/__init__.py +++ b/miio/devtools/simulators/__init__.py @@ -1,4 +1,4 @@ -# from .miotsimulator import miot_simulator from .miiosimulator import miio_simulator +from .miotsimulator import miot_simulator -__all__ = ["miio_simulator"] +__all__ = ["miio_simulator", "miot_simulator"] diff --git a/miio/devtools/simulators/miotsimulator.py b/miio/devtools/simulators/miotsimulator.py new file mode 100644 index 000000000..f92c7e16e --- /dev/null +++ b/miio/devtools/simulators/miotsimulator.py @@ -0,0 +1,207 @@ +import asyncio +import logging +import random +from collections import defaultdict +from typing import List, Union + +import click +from pydantic import Field, validator + +from miio import PushServer + +from .common import create_info_response, mac_from_model +from .models import DeviceModel, MiotProperty, MiotService + +_LOGGER = logging.getLogger(__name__) +UNSET = -10000 + + +def create_random(values): + """Create random value for the given mapping.""" + piid = values["piid"] + if values["format"] == str: + return f"piid {piid}" + + if values["choices"] is not None: + choices = values["choices"] + choice = choices[random.randint(0, len(choices) - 1)] # nosec + _LOGGER.debug("Got enum %r for %s", choice, piid) + return choice.value + + if values["range"] is not None: + range = values["range"] + value = random.randint(range[0], range[1]) # nosec + _LOGGER.debug("Got value %r from %s for piid %s", value, range, piid) + return value + + if values["format"] == bool: + value = bool(random.randint(0, 1)) # nosec + _LOGGER.debug("Got bool %r for piid %s", value, piid) + return value + + +class SimulatedMiotProperty(MiotProperty): + """Simulates a property. + + * Creates dummy values based on the property information. + * Validates inputs for set_properties + """ + + current_value: Union[int, str, bool] = Field(default=UNSET) + + @validator("current_value", pre=True, always=True) + def verify_value(cls, v, values): + """This verifies that the type of the value conforms with the mapping + definition. + + This will also create random values for the mapping when the device is + initialized. + """ + if v == UNSET: + return create_random(values) + if "write" not in values["access"]: + raise ValueError("Tried to set read-only property") + + try: + casted_value = values["format"](v) + except Exception as ex: + raise TypeError("Invalid type") from ex + + range = values["range"] + if range is not None and not (range[0] <= casted_value <= range[1]): + raise ValueError(f"{casted_value} not in range {range}") + + choices = values["choices"] + if choices is not None: + return choices[casted_value] + + return casted_value + + class Config: + validate_assignment = True + smart_union = True + + +class SimulatedMiotService(MiotService): + """Overridden to allow simulated properties.""" + + properties: List[SimulatedMiotProperty] = Field(default=[], repr=False) + + +class SimulatedDeviceModel(DeviceModel): + """Overridden to allow simulated properties.""" + + services: List[SimulatedMiotService] + + +class MiotSimulator: + """MiOT device simulator. + + This class implements a barebone simulator for a given devicemodel instance created + from a miot schema file. + """ + + def __init__(self, device_model): + self._model: SimulatedDeviceModel = device_model + self._state = defaultdict(defaultdict) + self.initialize_state() + + def initialize_state(self): + """Create initial state for the device.""" + for serv in self._model.services: + for act in serv.actions: + _LOGGER.debug("Found action: %s", act) + for prop in serv.properties: + self._state[serv.siid][prop.piid] = prop + + def get_properties(self, payload): + """Handle get_properties method.""" + _LOGGER.info("Got get_properties call with %s", payload) + response = [] + params = payload["params"] + for p in params: + res = p.copy() + res["value"] = self._state[res["siid"]][res["piid"]].current_value + res["code"] = 0 + response.append(res) + + return {"result": response} + + def set_properties(self, payload): + """Handle set_properties method.""" + _LOGGER.info("Received set_properties call with %s", payload) + params = payload["params"] + for param in params: + siid = param["siid"] + piid = param["piid"] + value = param["value"] + self._state[siid][piid].current_value = value + _LOGGER.info("Set %s:%s to %s", siid, piid, self._state[siid][piid]) + + return {"result": 0} + + def dump_services(self, payload): + """Dumps the available services.""" + servs = {} + for serv in self._model.services: + servs[serv.siid] = {"siid": serv.siid, "description": serv.description} + + return {"services": servs} + + def dump_properties(self, payload): + """Dumps the available properties. + + This is not implemented on real devices, but can be used for debugging. + """ + props = [] + params = payload["params"] + if "siid" not in params: + raise ValueError("missing 'siid'") + + siid = params["siid"] + if siid not in self._state: + raise ValueError(f"non-existing 'siid' {siid}") + + for piid, prop in self._state[siid].items(): + props.append( + { + "siid": siid, + "piid": piid, + "prop": prop.description, + "value": prop.current_value, + } + ) + return {"result": props} + + def action(self, payload): + """Handle action method.""" + _LOGGER.info("Got called %s", payload) + return {"result": 0} + + +async def main(dev, model): + server = PushServer() + + mac = mac_from_model(model) + simulator = MiotSimulator(device_model=dev) + server.add_method("miIO.info", create_info_response(model, mac)) + server.add_method("action", simulator.action) + server.add_method("get_properties", simulator.get_properties) + server.add_method("set_properties", simulator.set_properties) + server.add_method("dump_properties", simulator.dump_properties) + server.add_method("dump_services", simulator.dump_services) + + transport, proto = await server.start() + + +@click.command() +@click.option("--file", type=click.File("r"), required=True) +@click.option("--model", type=str, required=True, default=None) +def miot_simulator(file, model): + """Simulate miot device.""" + data = file.read() + dev = SimulatedDeviceModel.parse_raw(data) + loop = asyncio.get_event_loop() + random.seed(1) # nosec + loop.run_until_complete(main(dev, model=model)) + loop.run_forever() diff --git a/miio/devtools/simulators/models.py b/miio/devtools/simulators/models.py new file mode 100644 index 000000000..f6928542a --- /dev/null +++ b/miio/devtools/simulators/models.py @@ -0,0 +1,106 @@ +import logging +from typing import Any, List, Optional + +from pydantic import BaseModel, Field + +_LOGGER = logging.getLogger(__name__) + + +class MiotFormat(type): + """Custom type to convert textual presentation to python type.""" + + @classmethod + def __get_validators__(cls): + yield cls.convert_type + + @classmethod + def convert_type(cls, input: str): + if input.startswith("uint") or input.startswith("int"): + return int + type_map = { + "bool": bool, + "string": str, + "float": float, + } + return type_map[input] + + @classmethod + def serialize(cls, v): + return str(v) + + +class MiotEvent(BaseModel): + """Presentation of miot event.""" + + description: str + eiid: int = Field(alias="iid") + urn: str = Field(alias="type") + arguments: Any + + class Config: + extra = "forbid" + + +class MiotEnumValue(BaseModel): + """Enum value for miot.""" + + description: str + value: int + + class Config: + extra = "forbid" + + +class MiotAction(BaseModel): + """Action presentation for miot.""" + + description: str + aiid: int = Field(alias="iid") + urn: str = Field(alias="type") + inputs: Any = Field(alias="in") + output: Any = Field(alias="out") + + class Config: + extra = "forbid" + + +class MiotProperty(BaseModel): + """Property presentation for miot.""" + + description: str + piid: int = Field(alias="iid") + urn: str = Field(alias="type") + unit: str = Field(default="unknown") + format: MiotFormat + access: Any = Field(default=["read"]) + range: Optional[List[int]] = Field(alias="value-range") + choices: Optional[List[MiotEnumValue]] = Field(alias="value-list") + + class Config: + extra = "forbid" + + +class MiotService(BaseModel): + """Service presentation for miot.""" + + description: str + siid: int = Field(alias="iid") + urn: str = Field(alias="type") + properties: List[MiotProperty] = Field(default=[], repr=False) + events: Optional[List[MiotEvent]] = Field(default=[], repr=False) + actions: Optional[List[MiotAction]] = Field(default=[], repr=False) + + class Config: + extra = "forbid" + + +class DeviceModel(BaseModel): + """Device presentation for miot.""" + + description: str + urn: str = Field(alias="type") + services: List[MiotService] = Field(repr=False) + model: Optional[str] = None + + class Config: + extra = "forbid"