Skip to content

Commit

Permalink
Merge pull request #2 from mrctrifork/master
Browse files Browse the repository at this point in the history
Fixes error behaviour of open_group_socket() and iterate_group_telegrams() and adds ability to set timeouts for a knxdclient.KNXDConnection
  • Loading branch information
mhthies authored Dec 2, 2023
2 parents 027aecc + 34e21ff commit 08d82c6
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 11 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ def handler(packet: knxdclient.ReceivedGroupAPDU) -> None:
print("Received group telegram: {}".format(packet))

async def main() -> None:
connection = knxdclient.KNXDConnection()
# Raises a TimeoutError after 30 seconds of not receiving any traffic. This argument is optional
connection = knxdclient.KNXDConnection(timeout=30.0)
connection.set_group_apdu_handler(handler)
await connection.connect()
# Connection was successful. Start receive loop:
Expand Down
79 changes: 69 additions & 10 deletions knxdclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,22 @@ def handler(packet: knxdclient.ReceivedGroupAPDU) -> None:
# Let's stop the connection and wait for graceful termination of the receive loop:
await connection.stop()
await run_task
:param timeout: Maximum time between packets received from KNXD. If the timeout is exceeded, the `run()` coroutine
will terminate with a TimeoutError or asyncio.TimeoutError (depending on Python version).
If None (default), no timeout is applied.
Typically, no timeout should be required, as connection failures are detected by the OS and
signalled by closing the network socket.
Make sure to only use the `timeout` parameter, if regular packets from KNXD is expected (e.g. due
to regular activity on the KNX bus).
"""
def __init__(self):
def __init__(self, timeout: Optional[float] = None):
self._group_apdu_handler: Optional[Callable[[ReceivedGroupAPDU], Any]] = None
self.closing = False
self._current_response: Optional[KNXDPacket] = None
self._reader: Optional[asyncio.StreamReader] = None
self._writer: Optional[asyncio.StreamWriter] = None
self._timeout: Optional[float] = timeout
# A lock to ensure, that only one synchronous action is performed on the KNXD connection at once. Synchronous
# actions are for example EIB_OPEN_GROUPCON. The lock should be acquired before sending the synchronous request
# packet to KNXD and only released after receiving the response packet from KNXD.
Expand All @@ -78,6 +87,8 @@ def __init__(self):
# ``wait()`` on it. As soon as a response is received by the :meth:`run` coroutine, it will store the response
# in ``_current_response` and inform the waiting method by setting the event.
self._response_ready = asyncio.Event()
# An (asyncio) event that checks whether "run" coroutine has exited or is still executing
self._run_exited = asyncio.Event()

async def connect(self, host: str = 'localhost', port: int = 6720, sock: Optional[str] = None):
"""
Expand All @@ -94,6 +105,8 @@ async def connect(self, host: str = 'localhost', port: int = 6720, sock: Optiona
"""
# Close previous connection gracefully if any
if self._writer is not None:
# In case of a reconnect unset the _run_exited asyncio.Event
self._run_exited.clear()
if not self._writer.is_closing():
self._writer.close()
await self._writer.wait_closed()
Expand All @@ -106,6 +119,11 @@ async def connect(self, host: str = 'localhost', port: int = 6720, sock: Optiona
self._reader, self._writer = await asyncio.open_connection(host=host, port=port)
logger.info("Connecting to KNXd successful")

async def _read_raw_knxpacket(self) -> bytes:
assert self._reader is not None
length = int.from_bytes(await self._reader.readexactly(2), byteorder='big')
return await self._reader.readexactly(length)

async def run(self):
"""
Coroutine for running the receive loop for incoming packets from EIBD/KNXD.
Expand All @@ -125,17 +143,23 @@ async def run(self):
:raises ConnectionAbortedError: in case of an unexpected EOF (connection closed without ``stop()`` being called)
:raises ConnectionError: in case such an error occurs while reading
:raises ConnectionError: when no connection has been established yet or the previous connection reached an EOF.
:raises TimeoutError: If given `timeout` is exceeded between packets received from KNXD (Python >= 3.11)
:raises asyncio.TimeoutError: If given `timeout` is exceeded between packets received from KNXD (Python < 3.11)
"""
logger.info("Entering KNXd client receive loop ...")

if self._reader is None or self._reader.at_eof():
raise ConnectionError("No connection to KNXD has been established yet or the previous connection's "
"StreamReader is at EOF")
logger.info("Entering KNXd client receive loop ...")

while True:
try:
length = int.from_bytes(await self._reader.readexactly(2), byteorder='big')
data = await self._reader.readexactly(length)
data = None
if self._timeout is not None:
read_task = self._read_raw_knxpacket()
data = await asyncio.wait_for(read_task, self._timeout)
else:
data = await self._read_raw_knxpacket()

packet = KNXDPacket.decode(data)
logger.debug("Received packet from KNXd: %s", packet)
if packet.type is KNXDPacketTypes.EIB_GROUP_PACKET:
Expand All @@ -147,17 +171,25 @@ async def run(self):
self._current_response = packet
self._response_ready.set()
except asyncio.IncompleteReadError as e:
self._run_exited.set()
if self.closing:
logger.info("KNXd connection reached EOF. KNXd client is stopped.")
return
else:
raise ConnectionAbortedError("KNXd connection was closed with EOF unexpectedly.") from e
except ConnectionError:
# A connection error typically means we cannot proceed further with this connection. Thus we abort the
# receive loop execution with the exception.
# From python 3.11 it will raise a stdlib TimeoutError instead of asyncio.TimeoutError
except (ConnectionError, TimeoutError, asyncio.TimeoutError, asyncio.CancelledError) as error:
# A connection, timeout or cancellation errors
# typically mean we cannot proceed further with this connection.
# Thus we abort the receive loop execution with the exception.
logger.error("A connection, timeout or cancellation error has occurred. "
"Aborting current connection: %s", error)
self._run_exited.set()
raise
except Exception as e:
logger.error("Error while receiving KNX packets:", exc_info=e)
self._run_exited.set()
raise

async def stop(self):
"""
Expand Down Expand Up @@ -217,6 +249,7 @@ async def iterate_group_telegrams(self) -> AsyncIterator[ReceivedGroupAPDU]:
:meth:`set_group_apdu_handler`.
:raises RuntimerError: When a custom handler function has been registered or another iterator is already active
:raises ConnectionAbortedError: in case the `run()` loop exited unexpectedly while waiting for messages
"""
if self._group_apdu_handler:
raise RuntimeError("A custom group APDU handler has already been registered or iterate_group_telegrams() is"
Expand All @@ -225,9 +258,24 @@ async def iterate_group_telegrams(self) -> AsyncIterator[ReceivedGroupAPDU]:
queue: asyncio.Queue[ReceivedGroupAPDU] = asyncio.Queue()
self._group_apdu_handler = queue.put_nowait
try:
run_exited = asyncio.create_task(self._run_exited.wait())
while True:
yield await queue.get()
try:
next_message_task = asyncio.create_task(queue.get())
done, _pending = await asyncio.wait((next_message_task, run_exited),
return_when=asyncio.FIRST_COMPLETED)

if run_exited in done:
raise ConnectionAbortedError("KNXDConnection was closed and is no longer sending messages")

yield next_message_task.result()
finally:
next_message_task.cancel()
except Exception as ex:
logger.error(ex)
raise
finally:
run_exited.cancel()
self._group_apdu_handler = None

async def open_group_socket(self, write_only=False) -> None:
Expand All @@ -243,13 +291,24 @@ async def open_group_socket(self, write_only=False) -> None:
:param write_only: If True, KNXD is requested to open the Group Socket in write-only mode, i.e. no incoming
group telegrams will be received.
:raises RuntimeError: when KNXD responds with an error message or an unexpected response packet.
:raises ConnectionAbortedError: in case the `run()` loop exited unexpectedly while waiting for the response
"""
logger.info("Opening KNX group socket for sending to group addresses ...")
async with self._lock:
self._response_ready.clear()
await self._send_eibd_packet(KNXDPacket(KNXDPacketTypes.EIB_OPEN_GROUPCON,
bytes([0, 0xff if write_only else 0, 0])))
await self._response_ready.wait() # TODO add timeout and Exception on timeout

run_exited = asyncio.create_task(self._run_exited.wait())
response_ready = asyncio.create_task(self._response_ready.wait())

done, _pending = await asyncio.wait((run_exited, response_ready), return_when=asyncio.FIRST_COMPLETED)

if run_exited in done:
response_ready.cancel()
raise ConnectionAbortedError("KNXDConnection was closed and is no longer sending messages")
run_exited.cancel()

response = self._current_response
assert response is not None
if response.type is not KNXDPacketTypes.EIB_OPEN_GROUPCON:
Expand Down
27 changes: 27 additions & 0 deletions test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import unittest.mock
import time
from contextlib import suppress
import sys

import knxdclient
from ._helper import async_test
Expand Down Expand Up @@ -229,6 +230,7 @@ async def iter_packets():
await run_task


@unittest.skipIf(shutil.which("knxd") is None, "knxd is not available in PATH")
class KNXDClientTCPTest(unittest.TestCase):
def setUp(self) -> None:
self.knxd_process = subprocess.Popen(["knxd", "--listen-tcp=16720", "-e", "0.5.1", "-E",
Expand All @@ -254,3 +256,28 @@ async def test_connect_tcp(self) -> None:
await connection.stop()
with suppress(asyncio.CancelledError):
await run_task


class KNXDClientTimeoutTest(unittest.TestCase):
@async_test
async def test_timeout(self) -> None:
# Create TCP server
async def client_handler(reader, writer):
# We never respond anything, so the client should run into timeout
await reader.read(-1)
server = await asyncio.start_server(client_handler, host="127.0.0.1", port=16720)

try:
# Setup connection and receive loop task
connection = knxdclient.KNXDConnection(timeout=1)
await connection.connect(host="127.0.0.1", port=16720)
run_task = asyncio.create_task(connection.run())

with self.assertRaises(ConnectionAbortedError):
await connection.open_group_socket()

with self.assertRaises(TimeoutError if sys.version_info >= (3, 11) else asyncio.TimeoutError):
await run_task
finally:
server.close()
await server.wait_closed()

0 comments on commit 08d82c6

Please sign in to comment.