-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpanel-emu.py
95 lines (76 loc) · 2.99 KB
/
panel-emu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# SPDX-FileCopyrightText: 2021 Diego Elio Pettenò
#
# SPDX-License-Identifier: 0BSD
import asyncio
import dataclasses
import datetime
from typing import Optional
import aioconsole
import click
import serial
import structs
class PanelEmu:
_serial: serial.Serial
_next_settings: structs.Settings
_last_settings: Optional[structs.Settings]
_period: datetime.timedelta
def __init__(self, serial_port: str, period: datetime.timedelta) -> None:
self._serial = serial.Serial(serial_port, baudrate=104, timeout=1.5)
self._last_settings = None
self._next_settings = structs.Settings()
self._period = period
async def bus_loop(self):
while True:
await asyncio.sleep(self._period.total_seconds())
if self._last_settings != self._next_settings:
packet = self._next_settings.to_packet(changed=True)
self._last_settings = self._next_settings
else:
packet = self._last_settings.to_packet(changed=False)
self._serial.write(packet)
result = self._serial.read(12)
assert len(result) == 12
command = structs.HVAC_CONTROL.parse(result[0:6])
response = structs.HVAC_RESPONSE.parse(result[6:12])
await aioconsole.aprint(f"{command.data.value}\n{response.data.value}")
async def user_loop(self):
while True:
try:
cmd = await aioconsole.ainput()
attribute, value = cmd.split("=", 1)
if attribute in {
"resistor_heating",
"running",
"plasma",
"swivel",
"swirl",
}:
value = value.lower() in {"y", "yes", "on", "true"}
elif attribute in {"room_temperature", "set_temperature"}:
value = float(value)
elif attribute == "mode":
value = structs.Mode[value.upper()]
elif attribute == "fan_speed":
value = structs.FanSpeed[value.upper()]
else:
await aioconsole.aprint(f"I don't know {cmd}")
continue
self._next_settings = dataclasses.replace(
self._last_settings, **{attribute: value}
)
except Exception as error:
await aioconsole.aprint(f"ooops: {error}")
def run(self, loop: asyncio.AbstractEventLoop):
loop.create_task(self.bus_loop())
loop.run_until_complete(self.user_loop())
@click.command()
@click.option("--period-sec", type=click.IntRange(min=2), default=4)
@click.argument(
"serial-port", type=click.Path(dir_okay=False, file_okay=True, writable=True)
)
def main(period_sec: int, serial_port: str) -> None:
loop = asyncio.get_event_loop()
emu = PanelEmu(serial_port, datetime.timedelta(seconds=period_sec))
emu.run(loop)
if __name__ == "__main__":
main()