Skip to content

Commit

Permalink
Cleanup sense_link code and fix W1201 warning (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Sep 15, 2023
1 parent 7b82466 commit 5e734b4
Showing 1 changed file with 33 additions and 15 deletions.
48 changes: 33 additions & 15 deletions sense_energy/sense_link.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,40 @@
# SenseLink is a tool that emulates the energy monitoring functionality of TP-Link Kasa HS110 Smart Plugs,
# and allows you to report "custom" power usage to your Sense Home Energy Monitor based on other parameters.

import logging
import asyncio
import logging
from typing import Callable, Iterator, Optional, Tuple, Union

import orjson
from kasa_crypt import encrypt as tp_link_encrypt, decrypt as tp_link_decrypt
from kasa_crypt import decrypt as tp_link_decrypt
from kasa_crypt import encrypt as tp_link_encrypt

from .plug_instance import PlugInstance

SENSE_TP_LINK_PORT = 9999

_LOGGER = logging.getLogger(__name__)


class SenseLinkServerProtocol:
def __init__(self, devices):
"""Class to represent a SenseLink server."""

def __init__(self, devices: Callable[[], Iterator[PlugInstance]]) -> None:
"""Initialize the SenseLink server."""
self._devices = devices
self.should_respond = True
self.transport: Optional[asyncio.DatagramTransport] = None

def connection_made(self, transport):
def connection_made(self, transport: asyncio.DatagramTransport) -> None:
"""Handle new connection."""
self.transport = transport

def connection_lost(self, exc):
def connection_lost(self, exc) -> None:
"""Handle lost connection."""
pass

def datagram_received(self, data, addr):
def datagram_received(self, data: bytes, addr: Union[Tuple[str, int], Tuple[str, int, int, int]]) -> None:
"""Handle incoming UDP datagram."""
decrypted_data = tp_link_decrypt(data)

try:
Expand All @@ -49,42 +61,48 @@ def datagram_received(self, data, addr):
# Build response
response = plug.generate_response()
json_resp = orjson.dumps(response)
encrypted_resp = tp_link_encrypt(json_resp.decode('utf-8'))
encrypted_resp = tp_link_encrypt(json_resp.decode("utf-8"))
# Strip leading 4 bytes for...some reason
encrypted_resp = encrypted_resp[4:]

# Allow disabling response
if self.should_respond:
# Send response
logging.debug(f"Sending response: {response}")
logging.debug("Sending response: %s", response)
self.transport.sendto(encrypted_resp, addr)
else:
# Do not send response, but log for debugging
logging.debug(f"SENSE_RESPONSE disabled, response content: {response}")
_LOGGER.debug("SENSE_RESPONSE disabled, response content: %s", response)
else:
logging.debug(f"Ignoring non-emeter JSON from {addr}: {json_data}")
_LOGGER.debug(f"Ignoring non-emeter JSON from %s: %s", addr, json_data)

# Appears to not be JSON
except ValueError:
logging.debug("Did not receive valid json")
_LOGGER.debug("Did not receive valid json")


class SenseLink:
"""Class to represent a SenseLink server."""

_devices = []

def __init__(self, devices, port=SENSE_TP_LINK_PORT):
def __init__(self, devices: Callable[[], Iterator[PlugInstance]], port=SENSE_TP_LINK_PORT) -> None:
"""Initialize the SenseLink server."""
self.port = port
self._devices = devices

def print_instance_wattages(self):
def print_instance_wattages(self) -> None:
"""Log the current wattages of all instances."""
for inst in self._devices():
logging.info(f"Plug {inst.alias} power: {inst.power}")

async def start(self):
async def start(self) -> None:
"""Start the SenseLink server."""
loop = asyncio.get_running_loop()
self.transport, self.protocol = await loop.create_datagram_endpoint(
lambda: SenseLinkServerProtocol(self._devices), local_addr=("0.0.0.0", self.port)
)

async def stop(self):
async def stop(self) -> None:
"""Stop the SenseLink server."""
self.transport.close()

0 comments on commit 5e734b4

Please sign in to comment.