Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup sense_link code and fix W1201 warning #64

Merged
merged 1 commit into from
Sep 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions sense_energy/sense_link.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,40 @@
# SenseLink is a tool that emulates the energy monitoring functionality of TP-Link Kasa HS110 Smart Plugs,
# and allows you to report "custom" power usage to your Sense Home Energy Monitor based on other parameters.

import logging
import asyncio
import logging
from typing import Callable, Iterator, Optional, Tuple, Union

import orjson
from kasa_crypt import encrypt as tp_link_encrypt, decrypt as tp_link_decrypt
from kasa_crypt import decrypt as tp_link_decrypt
from kasa_crypt import encrypt as tp_link_encrypt

from .plug_instance import PlugInstance

SENSE_TP_LINK_PORT = 9999

_LOGGER = logging.getLogger(__name__)


class SenseLinkServerProtocol:
def __init__(self, devices):
"""Class to represent a SenseLink server."""

def __init__(self, devices: Callable[[], Iterator[PlugInstance]]) -> None:
"""Initialize the SenseLink server."""
self._devices = devices
self.should_respond = True
self.transport: Optional[asyncio.DatagramTransport] = None

def connection_made(self, transport):
def connection_made(self, transport: asyncio.DatagramTransport) -> None:
"""Handle new connection."""
self.transport = transport

def connection_lost(self, exc):
def connection_lost(self, exc) -> None:
"""Handle lost connection."""
pass

def datagram_received(self, data, addr):
def datagram_received(self, data: bytes, addr: Union[Tuple[str, int], Tuple[str, int, int, int]]) -> None:
"""Handle incoming UDP datagram."""
decrypted_data = tp_link_decrypt(data)

try:
Expand All @@ -49,42 +61,48 @@ def datagram_received(self, data, addr):
# Build response
response = plug.generate_response()
json_resp = orjson.dumps(response)
encrypted_resp = tp_link_encrypt(json_resp.decode('utf-8'))
encrypted_resp = tp_link_encrypt(json_resp.decode("utf-8"))
# Strip leading 4 bytes for...some reason
encrypted_resp = encrypted_resp[4:]

# Allow disabling response
if self.should_respond:
# Send response
logging.debug(f"Sending response: {response}")
logging.debug("Sending response: %s", response)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was converting the response payload to a string and than throwing it away if debug logging was not enabled

self.transport.sendto(encrypted_resp, addr)
else:
# Do not send response, but log for debugging
logging.debug(f"SENSE_RESPONSE disabled, response content: {response}")
_LOGGER.debug("SENSE_RESPONSE disabled, response content: %s", response)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was converting the json payload to a string and than throwing it away if debug logging was not enabled

else:
logging.debug(f"Ignoring non-emeter JSON from {addr}: {json_data}")
_LOGGER.debug(f"Ignoring non-emeter JSON from %s: %s", addr, json_data)
Copy link
Contributor Author

@bdraco bdraco Sep 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was converting the response payload to a string and than throwing it away if debug logging was not enabled


# Appears to not be JSON
except ValueError:
logging.debug("Did not receive valid json")
_LOGGER.debug("Did not receive valid json")


class SenseLink:
"""Class to represent a SenseLink server."""

_devices = []

def __init__(self, devices, port=SENSE_TP_LINK_PORT):
def __init__(self, devices: Callable[[], Iterator[PlugInstance]], port=SENSE_TP_LINK_PORT) -> None:
"""Initialize the SenseLink server."""
self.port = port
self._devices = devices

def print_instance_wattages(self):
def print_instance_wattages(self) -> None:
"""Log the current wattages of all instances."""
for inst in self._devices():
logging.info(f"Plug {inst.alias} power: {inst.power}")

async def start(self):
async def start(self) -> None:
"""Start the SenseLink server."""
loop = asyncio.get_running_loop()
self.transport, self.protocol = await loop.create_datagram_endpoint(
lambda: SenseLinkServerProtocol(self._devices), local_addr=("0.0.0.0", self.port)
)

async def stop(self):
async def stop(self) -> None:
"""Stop the SenseLink server."""
self.transport.close()