Skip to content

Commit

Permalink
Add support for async
Browse files Browse the repository at this point in the history
  • Loading branch information
karlkar authored and KarolKsionek committed Sep 28, 2020
1 parent 83ed9a6 commit 04b36c7
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 251 deletions.
182 changes: 33 additions & 149 deletions aircon/__main__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
from aiohttp import web
import argparse
import base64
from http import HTTPStatus
from http.client import HTTPConnection, InvalidURL
import asyncio
import json
import logging
import logging.handlers
from retry import retry
import signal
import socket
import sys
import threading
import time
import _thread
from urllib.parse import parse_qs, urlparse, ParseResult

from . import aircon
from .app_mappings import SECRET_MAP
Expand All @@ -22,127 +14,21 @@
from .aircon import BaseDevice, AcDevice, FglDevice, FglBDevice, HumidifierDevice
from .discovery import perform_discovery
from .mqtt_client import MqttClient
from .notifier import Notifier
from .query_handlers import QueryHandlers

class KeepAliveThread(threading.Thread):
"""Thread to preiodically generate keep-alive requests."""

_KEEP_ALIVE_INTERVAL = 10.0

def __init__(self, port: int, devices: [BaseDevice]):
self.run_lock = threading.Condition()
self._alive = False
self._data = []

for device in devices:
header = {
'Accept': 'application/json',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'Host': device.ip_address,
'Accept-Encoding': 'gzip'
}
self._data.append({
'device': device,
'headers': header,
'conn': None,
'last_timestamp': 0
})

local_ip = self._get_local_ip()
self._json = {
'local_reg': {
'ip': local_ip,
'notify': 0,
'port': port,
'uri': "/local_lan"
}
}
super(KeepAliveThread, self).__init__(name='Keep Alive thread')

def _get_local_ip(self):
sock = None
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.connect(('10.255.255.255', 1))
return sock.getsockname()[0]
finally:
if sock:
sock.close()

@retry(exceptions=ConnectionError, delay=0.5, max_delay=20, backoff=1.5, logger=logging)
def _establish_connection(self, conn: HTTPConnection, headers: dict, device: BaseDevice) -> None:
method = 'PUT' if self._alive else 'POST'
self._json['local_reg']['notify'] = int(device.commands_queue.qsize() > 0)
logging.debug('[KeepAlive] %s %s/local_reg.json %s', method, conn.host, json.dumps(self._json))
try:
conn.request(method, '/local_reg.json', json.dumps(self._json), headers)
resp = conn.getresponse()
if resp.status != HTTPStatus.ACCEPTED:
raise ConnectionError('Recieved invalid response for local_reg: %d, %s', resp.status, resp.read())
resp.read()
except:
self._alive = False
raise
finally:
conn.close()
self._alive = True

def run(self) -> None:
with self.run_lock:
for entry in self._data:
try:
conn = HTTPConnection(entry['device'].ip_address, timeout=5)
entry['conn'] = conn
except InvalidURL:
logging.exception('[KeepAlive] Invalid IP provided.')
_thread.interrupt_main()
return
while True:
should_run_again = False
try:
for entry in self._data:
now = time.time()
queue_size = entry['device'].commands_queue.qsize()
if now - entry['last_timestamp'] >= self._KEEP_ALIVE_INTERVAL or queue_size > 0:
self._establish_connection(entry['conn'], entry['headers'], entry['device'])
entry['last_timestamp'] = now
if queue_size > 1:
should_run_again = True
except:
logging.exception('[KeepAlive] Failed to send local_reg keep alive to the AC.')
if not should_run_again:
logging.debug('[KeepAlive] Waiting for notification or timeout')
self.run_lock.wait(self._KEEP_ALIVE_INTERVAL)

class QueryStatusThread(threading.Thread):
"""Thread to preiodically query the status of all properties.
After start-up, essentially all updates should be pushed to the server due
to the keep alive, so this is just a belt and suspenders.
"""

async def query_status_worker(devices: [BaseDevice]):
_STATUS_UPDATE_INTERVAL = 600.0
_WAIT_FOR_EMPTY_QUEUE = 10.0

def __init__(self, devices: [BaseDevice]):
super(QueryStatusThread, self).__init__(name='Query Status thread')
self._devices = devices

def run(self) -> None:
while True:
# In case the AC is stuck, and not fetching commands, avoid flooding
# the queue with status updates.
for device in self._devices:
while device.commands_queue.qsize() > 10:
time.sleep(self._WAIT_FOR_EMPTY_QUEUE)
device.queue_status()
if _keep_alive:
with _keep_alive.run_lock:
logging.debug('QueryStatusThread triggered KeepAlive notify')
_keep_alive.run_lock.notify()
time.sleep(self._STATUS_UPDATE_INTERVAL)
while True:
# In case the AC is stuck, and not fetching commands, avoid flooding
# the queue with status updates.
for device in devices:
while device.commands_queue.qsize() > 10:
await asyncio.sleep(_WAIT_FOR_EMPTY_QUEUE)
print('I should put some tasks here, but for gods sake...')
#device.queue_status()
await asyncio.sleep(_STATUS_UPDATE_INTERVAL)

def ParseArguments() -> argparse.Namespace:
"""Parse command line arguments."""
Expand Down Expand Up @@ -199,8 +85,8 @@ def setup_logger(log_level):
logging_handler = logging.handlers.SysLogHandler(address='/var/run/syslog')
elif sys.platform.lower() in ['windows', 'win32']:
logging_handler = logging.handlers.SysLogHandler()
else: # Unknown platform, revert to stderr
logging_handler = logging.StreamHandler(sys.stderr)
#else: # Unknown platform, revert to stderr
logging_handler = logging.StreamHandler(sys.stderr)
logging_handler.setFormatter(
logging.Formatter(fmt='{levelname[0]}{asctime}.{msecs:03.0f} '
'{filename}:{lineno}] {message}',
Expand All @@ -209,7 +95,7 @@ def setup_logger(log_level):
logger.setLevel(log_level)
logger.addHandler(logging_handler)

def setup_and_run_http_server(parsed_args, devices: [BaseDevice]):
async def setup_and_run_http_server(parsed_args, devices: [BaseDevice]):
# TODO: Handle these if needed.
# '/local_lan/node/conn_status.json': _query_handlers.connection_status_handler,
# '/local_lan/connect_status': _query_handlers.module_request_handler,
Expand All @@ -222,19 +108,23 @@ def setup_and_run_http_server(parsed_args, devices: [BaseDevice]):
query_handlers = QueryHandlers(devices)
app = web.Application()
app.add_routes([web.get('/hisense/status', query_handlers.get_status_handler),
web.post('/hisense/command', query_handlers.queue_command_handler),
web.get('/hisense/command', query_handlers.queue_command_handler),
web.post('/local_lan/key_exchange.json', query_handlers.key_exchange_handler),
web.post('/local_lan/commands.json', query_handlers.command_handler),
web.post('/local_lan/property/datapoint.json', query_handlers.property_update_handler),
web.post('/local_lan/property/datapoint/ack.json', query_handlers.property_update_handler),
web.post('/local_lan/node/property/datapoint.json', query_handlers.property_update_handler),
web.post('/local_lan/node/property/datapoint/ack.json', query_handlers.property_update_handler)])
web.run_app(app, port = parsed_args.port)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, port=parsed_args.port)
await site.start()

def run(parsed_args):
async def run(parsed_args):
if (len(parsed_args.type) != len(parsed_args.config)):
raise ValueError("Each device has to have specified type and config file")

notifier = Notifier(parsed_args.port)
devices = []
for i in range(len(parsed_args.config)):
with open(parsed_args.config[i], 'rb') as f:
Expand All @@ -244,16 +134,17 @@ def run(parsed_args):
lanip_key = data['lanip_key']
lanip_key_id = data['lanip_key_id']
if parsed_args.type[i] == 'ac':
device = AcDevice(name, ip, lanip_key, lanip_key_id)
device = AcDevice(name, ip, lanip_key, lanip_key_id, notifier.notify)
elif parsed_args.type[i] == 'fgl':
device = FglDevice(name, ip, lanip_key, lanip_key_id)
device = FglDevice(name, ip, lanip_key, lanip_key_id, notifier.notify)
elif parsed_args.type[i] == 'fgl_b':
device = FglBDevice(name, ip, lanip_key, lanip_key_id)
device = FglBDevice(name, ip, lanip_key, lanip_key_id, notifier.notify)
elif parsed_args.type[i] == 'humidifier':
device = HumidifierDevice(name, ip, lanip_key, lanip_key_id)
device = HumidifierDevice(name, ip, lanip_key, lanip_key_id, notifier.notify)
else:
logging.error('Unknown type of device: %s', parsed_args.type[i])
sys.exit(1) # Should never get here.
notifier.register_device(device)
devices.append(device)

mqtt_client = None
Expand All @@ -266,18 +157,11 @@ def run(parsed_args):
mqtt_client.connect(parsed_args.mqtt_host, parsed_args.mqtt_port)
mqtt_client.loop_start()
for device in devices:
device.change_listener = mqtt_client.mqtt_publish_update

global _keep_alive
_keep_alive = None # type: typing.Optional[KeepAliveThread]

query_status = QueryStatusThread(devices)
query_status.start()

_keep_alive = KeepAliveThread(parsed_args.port, devices)
_keep_alive.start()

setup_and_run_http_server(parsed_args, devices)
device.property_change_listener = mqtt_client.mqtt_publish_update

await asyncio.gather(setup_and_run_http_server(parsed_args, devices),
query_status_worker(devices),
notifier.start())

def _escape_name(name: str):
safe_name = name.replace(' ', '_').lower()
Expand All @@ -301,6 +185,6 @@ def discovery(parsed_args):
setup_logger(parsed_args.log_level)

if (parsed_args.cmd == 'run'):
run(parsed_args)
asyncio.run(run(parsed_args))
elif (parsed_args.cmd == 'discovery'):
discovery(parsed_args)
33 changes: 21 additions & 12 deletions aircon/aircon.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
HumidifierProperties, Properties, Power, AcWorkMode, Quiet, TemperatureUnit)

class BaseDevice:
def __init__(self, name: str, ip_address: str, lanip_key: str, lanip_key_id: str, properties: Properties):
def __init__(self, name: str, ip_address: str, lanip_key: str, lanip_key_id: str,
properties: Properties, notifier: Callable[[None], None]):
self.name = name
self.ip_address = ip_address
self._config = Config(lanip_key, lanip_key_id)
self._properties = properties
self._properties_lock = threading.Lock()
self._queue_listener = notifier

self._next_command_id = 0

Expand All @@ -37,7 +39,7 @@ def __init__(self, name: str, ip_address: str, lanip_key: str, lanip_key_id: str
self._updates_seq_no = 0
self._updates_seq_no_lock = threading.Lock()

self.change_listener: Callable[[str, str], None] = None
self.property_change_listener: Callable[[str, str], None] = None

def get_all_properties(self) -> Properties:
with self._properties_lock:
Expand All @@ -58,8 +60,8 @@ def update_property(self, name: str, value) -> None:
if value != old_value:
setattr(self._properties, name, value)
logging.debug('Updated properties: %s' % self._properties)
if self.change_listener:
self.change_listener(self.name, name, value)
if self.property_change_listener:
self.property_change_listener(self.name, name, value)

def get_command_seq_no(self) -> int:
with self._commands_seq_no_lock:
Expand Down Expand Up @@ -113,6 +115,8 @@ def queue_command(self, name: str, value) -> None:
self.queue_command('t_fan_mute', 'OFF')
self.queue_command('t_sleep', 'STOP')
self.queue_command('t_temp_eight', 'OFF')

self._queue_listener()

def _build_command(self, name: str, data_value: int):
base_type = self._properties.get_base_type(name)
Expand Down Expand Up @@ -145,6 +149,7 @@ def queue_status(self) -> None:
}
self._next_command_id += 1
self.commands_queue.put_nowait((command, None))
self._queue_listener()

def update_key(self, key: dict) -> dict:
return self._config.update(key)
Expand All @@ -156,8 +161,9 @@ def get_dev_encryption(self) -> Encryption:
return self._config.dev

class AcDevice(BaseDevice):
def __init__(self, name: str, ip_address: str, lanip_key: str, lanip_key_id: str):
super().__init__(name, ip_address, lanip_key, lanip_key_id, AcProperties())
def __init__(self, name: str, ip_address: str, lanip_key: str, lanip_key_id: str,
notifier: Callable[[None], None]):
super().__init__(name, ip_address, lanip_key, lanip_key_id, AcProperties(), notifier)

def get_env_temp(self) -> int:
return self.get_property('f_temp_in')
Expand Down Expand Up @@ -338,13 +344,16 @@ def _convert_to_control_value(self, name: str, value) -> int:
raise ValueError()

class FglDevice(BaseDevice):
def __init__(self, name: str, ip_address: str, lanip_key: str, lanip_key_id: str):
super().__init__(name, ip_address, lanip_key, lanip_key_id, FglProperties())
def __init__(self, name: str, ip_address: str, lanip_key: str,
lanip_key_id: str, notifier: Callable[[None], None]):
super().__init__(name, ip_address, lanip_key, lanip_key_id, FglProperties(), notifier)

class FglBDevice(BaseDevice):
def __init__(self, name: str, ip_address: str, lanip_key: str, lanip_key_id: str):
super().__init__(name, ip_address, lanip_key, lanip_key_id, FglBProperties())
def __init__(self, name: str, ip_address: str, lanip_key: str,
lanip_key_id: str, notifier: Callable[[None], None]):
super().__init__(name, ip_address, lanip_key, lanip_key_id, FglBProperties(), notifier)

class HumidifierDevice(BaseDevice):
def __init__(self, name: str, ip_address: str, lanip_key: str, lanip_key_id: str):
super().__init__(name, ip_address, lanip_key, lanip_key_id, HumidifierProperties())
def __init__(self, name: str, ip_address: str, lanip_key: str,
lanip_key_id: str, notifier: Callable[[None], None]):
super().__init__(name, ip_address, lanip_key, lanip_key_id, HumidifierProperties(), notifier)
Loading

0 comments on commit 04b36c7

Please sign in to comment.