Skip to content

Commit

Permalink
Add miot simulator (#1539)
Browse files Browse the repository at this point in the history
This adds a simple, full-functioning miot simulator based on miotspec files, making it possible to test both python-miio and downstream implementations. The simulator keeps an internal state constructed based on the defined property constraints (e.g., ranges for int properties or choices for enums). The values are currently randomly generated based on the constraints the spec file does.

The simulator implements the main commands a miot device uses:
* get_properties (get property values)
* set_properties (set property values)
* action (call actions, noop returning success)
* miIO.info

Additionally, the available services and properties can be dumped using `dump_services` and `dump_properties` commands.

Using `miiocli devtools miot-simulator` requires defining the model (`--model`) and the path to a miotspec json file (`--file`).
  • Loading branch information
rytilahti authored Sep 26, 2022
1 parent a1ff575 commit 639a008
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 6 deletions.
6 changes: 2 additions & 4 deletions miio/devtools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

from .pcapparser import parse_pcap
from .propertytester import test_properties
from .simulators import miio_simulator

# from .simulators import miot_simulator
from .simulators import miio_simulator, miot_simulator

_LOGGER = logging.getLogger(__name__)

Expand All @@ -21,4 +19,4 @@ def devtools(ctx: click.Context):
devtools.add_command(parse_pcap)
devtools.add_command(test_properties)
devtools.add_command(miio_simulator)
# devtools.add_command(miot_simulator)
devtools.add_command(miot_simulator)
4 changes: 2 additions & 2 deletions miio/devtools/simulators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# from .miotsimulator import miot_simulator
from .miiosimulator import miio_simulator
from .miotsimulator import miot_simulator

__all__ = ["miio_simulator"]
__all__ = ["miio_simulator", "miot_simulator"]
207 changes: 207 additions & 0 deletions miio/devtools/simulators/miotsimulator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import asyncio
import logging
import random
from collections import defaultdict
from typing import List, Union

import click
from pydantic import Field, validator

from miio import PushServer

from .common import create_info_response, mac_from_model
from .models import DeviceModel, MiotProperty, MiotService

_LOGGER = logging.getLogger(__name__)
UNSET = -10000


def create_random(values):
"""Create random value for the given mapping."""
piid = values["piid"]
if values["format"] == str:
return f"piid {piid}"

if values["choices"] is not None:
choices = values["choices"]
choice = choices[random.randint(0, len(choices) - 1)] # nosec
_LOGGER.debug("Got enum %r for %s", choice, piid)
return choice.value

if values["range"] is not None:
range = values["range"]
value = random.randint(range[0], range[1]) # nosec
_LOGGER.debug("Got value %r from %s for piid %s", value, range, piid)
return value

if values["format"] == bool:
value = bool(random.randint(0, 1)) # nosec
_LOGGER.debug("Got bool %r for piid %s", value, piid)
return value


class SimulatedMiotProperty(MiotProperty):
"""Simulates a property.
* Creates dummy values based on the property information.
* Validates inputs for set_properties
"""

current_value: Union[int, str, bool] = Field(default=UNSET)

@validator("current_value", pre=True, always=True)
def verify_value(cls, v, values):
"""This verifies that the type of the value conforms with the mapping
definition.
This will also create random values for the mapping when the device is
initialized.
"""
if v == UNSET:
return create_random(values)
if "write" not in values["access"]:
raise ValueError("Tried to set read-only property")

try:
casted_value = values["format"](v)
except Exception as ex:
raise TypeError("Invalid type") from ex

range = values["range"]
if range is not None and not (range[0] <= casted_value <= range[1]):
raise ValueError(f"{casted_value} not in range {range}")

choices = values["choices"]
if choices is not None:
return choices[casted_value]

return casted_value

class Config:
validate_assignment = True
smart_union = True


class SimulatedMiotService(MiotService):
"""Overridden to allow simulated properties."""

properties: List[SimulatedMiotProperty] = Field(default=[], repr=False)


class SimulatedDeviceModel(DeviceModel):
"""Overridden to allow simulated properties."""

services: List[SimulatedMiotService]


class MiotSimulator:
"""MiOT device simulator.
This class implements a barebone simulator for a given devicemodel instance created
from a miot schema file.
"""

def __init__(self, device_model):
self._model: SimulatedDeviceModel = device_model
self._state = defaultdict(defaultdict)
self.initialize_state()

def initialize_state(self):
"""Create initial state for the device."""
for serv in self._model.services:
for act in serv.actions:
_LOGGER.debug("Found action: %s", act)
for prop in serv.properties:
self._state[serv.siid][prop.piid] = prop

def get_properties(self, payload):
"""Handle get_properties method."""
_LOGGER.info("Got get_properties call with %s", payload)
response = []
params = payload["params"]
for p in params:
res = p.copy()
res["value"] = self._state[res["siid"]][res["piid"]].current_value
res["code"] = 0
response.append(res)

return {"result": response}

def set_properties(self, payload):
"""Handle set_properties method."""
_LOGGER.info("Received set_properties call with %s", payload)
params = payload["params"]
for param in params:
siid = param["siid"]
piid = param["piid"]
value = param["value"]
self._state[siid][piid].current_value = value
_LOGGER.info("Set %s:%s to %s", siid, piid, self._state[siid][piid])

return {"result": 0}

def dump_services(self, payload):
"""Dumps the available services."""
servs = {}
for serv in self._model.services:
servs[serv.siid] = {"siid": serv.siid, "description": serv.description}

return {"services": servs}

def dump_properties(self, payload):
"""Dumps the available properties.
This is not implemented on real devices, but can be used for debugging.
"""
props = []
params = payload["params"]
if "siid" not in params:
raise ValueError("missing 'siid'")

siid = params["siid"]
if siid not in self._state:
raise ValueError(f"non-existing 'siid' {siid}")

for piid, prop in self._state[siid].items():
props.append(
{
"siid": siid,
"piid": piid,
"prop": prop.description,
"value": prop.current_value,
}
)
return {"result": props}

def action(self, payload):
"""Handle action method."""
_LOGGER.info("Got called %s", payload)
return {"result": 0}


async def main(dev, model):
server = PushServer()

mac = mac_from_model(model)
simulator = MiotSimulator(device_model=dev)
server.add_method("miIO.info", create_info_response(model, mac))
server.add_method("action", simulator.action)
server.add_method("get_properties", simulator.get_properties)
server.add_method("set_properties", simulator.set_properties)
server.add_method("dump_properties", simulator.dump_properties)
server.add_method("dump_services", simulator.dump_services)

transport, proto = await server.start()


@click.command()
@click.option("--file", type=click.File("r"), required=True)
@click.option("--model", type=str, required=True, default=None)
def miot_simulator(file, model):
"""Simulate miot device."""
data = file.read()
dev = SimulatedDeviceModel.parse_raw(data)
loop = asyncio.get_event_loop()
random.seed(1) # nosec
loop.run_until_complete(main(dev, model=model))
loop.run_forever()
106 changes: 106 additions & 0 deletions miio/devtools/simulators/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import logging
from typing import Any, List, Optional

from pydantic import BaseModel, Field

_LOGGER = logging.getLogger(__name__)


class MiotFormat(type):
"""Custom type to convert textual presentation to python type."""

@classmethod
def __get_validators__(cls):
yield cls.convert_type

@classmethod
def convert_type(cls, input: str):
if input.startswith("uint") or input.startswith("int"):
return int
type_map = {
"bool": bool,
"string": str,
"float": float,
}
return type_map[input]

@classmethod
def serialize(cls, v):
return str(v)


class MiotEvent(BaseModel):
"""Presentation of miot event."""

description: str
eiid: int = Field(alias="iid")
urn: str = Field(alias="type")
arguments: Any

class Config:
extra = "forbid"


class MiotEnumValue(BaseModel):
"""Enum value for miot."""

description: str
value: int

class Config:
extra = "forbid"


class MiotAction(BaseModel):
"""Action presentation for miot."""

description: str
aiid: int = Field(alias="iid")
urn: str = Field(alias="type")
inputs: Any = Field(alias="in")
output: Any = Field(alias="out")

class Config:
extra = "forbid"


class MiotProperty(BaseModel):
"""Property presentation for miot."""

description: str
piid: int = Field(alias="iid")
urn: str = Field(alias="type")
unit: str = Field(default="unknown")
format: MiotFormat
access: Any = Field(default=["read"])
range: Optional[List[int]] = Field(alias="value-range")
choices: Optional[List[MiotEnumValue]] = Field(alias="value-list")

class Config:
extra = "forbid"


class MiotService(BaseModel):
"""Service presentation for miot."""

description: str
siid: int = Field(alias="iid")
urn: str = Field(alias="type")
properties: List[MiotProperty] = Field(default=[], repr=False)
events: Optional[List[MiotEvent]] = Field(default=[], repr=False)
actions: Optional[List[MiotAction]] = Field(default=[], repr=False)

class Config:
extra = "forbid"


class DeviceModel(BaseModel):
"""Device presentation for miot."""

description: str
urn: str = Field(alias="type")
services: List[MiotService] = Field(repr=False)
model: Optional[str] = None

class Config:
extra = "forbid"

0 comments on commit 639a008

Please sign in to comment.