Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds ability to set timeouts for a knxdclient.KNXDConnection #2

Merged
merged 24 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
aa34d5e
Adds the ability to pass a timeout to the connection
NanoSpicer Nov 27, 2023
eba2006
Replace complex logic into a multicatch
NanoSpicer Nov 27, 2023
b7837b0
Adds fix for python 3.11
NanoSpicer Nov 27, 2023
42f4e45
Include null-check
NanoSpicer Nov 27, 2023
d1c7cca
Update readme
NanoSpicer Nov 27, 2023
7467f1e
Annotates return type of "_read_raw_knxpacket"
NanoSpicer Nov 28, 2023
4a81f2c
Notify coroutines from iterate_group_telegrams
NanoSpicer Nov 28, 2023
7451cdb
Apply same mechanism on open_group_socket
NanoSpicer Nov 28, 2023
a1cccfe
Make task name shorter
NanoSpicer Nov 28, 2023
859ecf2
Fix: Bug where it would exit automatically
NanoSpicer Nov 28, 2023
aa2aa16
Assers self._reader is not None
NanoSpicer Nov 29, 2023
a453745
Suppress the connection aborted exceptions
NanoSpicer Nov 29, 2023
71c959a
Reset the _run_exited flag
NanoSpicer Nov 29, 2023
ca9b4f4
Replace dupplicated assertion with checks in _read_raw_knxpacket
NanoSpicer Nov 29, 2023
324c8dc
Adds docstring
NanoSpicer Nov 30, 2023
b2b6bf8
Adds docstring and removes too many blank lines between methods
NanoSpicer Nov 30, 2023
c38e417
Fix: Linting
NanoSpicer Nov 30, 2023
15b2f98
Fix: Replace reader checking with not null assertion
NanoSpicer Nov 30, 2023
26e06a6
Improve documentation of timeout parameter and exceptions
mhthies Nov 30, 2023
b5882cc
Improve code style, using string literal merging and logger's built-i…
mhthies Nov 30, 2023
73c0aa5
Fix stale asyncio Tasks from parallel waiting
mhthies Nov 30, 2023
9e2e56c
Add test for timeout handling
mhthies Nov 30, 2023
43e7c0e
Fix tests if no knxd is present on host
mhthies Nov 30, 2023
34e21ff
Improve code style
mhthies Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ def handler(packet: knxdclient.ReceivedGroupAPDU) -> None:
print("Received group telegram: {}".format(packet))

async def main() -> None:
connection = knxdclient.KNXDConnection()
# Raises a TimeoutError after 30 seconds of not receiving any traffic. This argument is optional
connection = knxdclient.KNXDConnection(timeout=30.0)
connection.set_group_apdu_handler(handler)
await connection.connect()
# Connection was successful. Start receive loop:
Expand Down
79 changes: 69 additions & 10 deletions knxdclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,22 @@ def handler(packet: knxdclient.ReceivedGroupAPDU) -> None:
# Let's stop the connection and wait for graceful termination of the receive loop:
await connection.stop()
await run_task

:param timeout: Maximum time between packets received from KNXD. If the timeout is exceeded, the `run()` coroutine
will terminate with a TimeoutError or asyncio.TimeoutError (depending on Python version).
If None (default), no timeout is applied.
Typically, no timeout should be required, as connection failures are detected by the OS and
signalled by closing the network socket.
Make sure to only use the `timeout` parameter, if regular packets from KNXD is expected (e.g. due
to regular activity on the KNX bus).
"""
def __init__(self):
def __init__(self, timeout: Optional[float] = None):
self._group_apdu_handler: Optional[Callable[[ReceivedGroupAPDU], Any]] = None
self.closing = False
self._current_response: Optional[KNXDPacket] = None
self._reader: Optional[asyncio.StreamReader] = None
self._writer: Optional[asyncio.StreamWriter] = None
self._timeout: Optional[float] = timeout
# A lock to ensure, that only one synchronous action is performed on the KNXD connection at once. Synchronous
# actions are for example EIB_OPEN_GROUPCON. The lock should be acquired before sending the synchronous request
# packet to KNXD and only released after receiving the response packet from KNXD.
Expand All @@ -78,6 +87,8 @@ def __init__(self):
# ``wait()`` on it. As soon as a response is received by the :meth:`run` coroutine, it will store the response
# in ``_current_response` and inform the waiting method by setting the event.
self._response_ready = asyncio.Event()
# An (asyncio) event that checks whether "run" coroutine has exited or is still executing
self._run_exited = asyncio.Event()

async def connect(self, host: str = 'localhost', port: int = 6720, sock: Optional[str] = None):
"""
Expand All @@ -94,6 +105,8 @@ async def connect(self, host: str = 'localhost', port: int = 6720, sock: Optiona
"""
# Close previous connection gracefully if any
if self._writer is not None:
# In case of a reconnect unset the _run_exited asyncio.Event
self._run_exited.clear()
if not self._writer.is_closing():
self._writer.close()
await self._writer.wait_closed()
Expand All @@ -106,6 +119,11 @@ async def connect(self, host: str = 'localhost', port: int = 6720, sock: Optiona
self._reader, self._writer = await asyncio.open_connection(host=host, port=port)
logger.info("Connecting to KNXd successful")

async def _read_raw_knxpacket(self) -> bytes:
assert self._reader is not None
length = int.from_bytes(await self._reader.readexactly(2), byteorder='big')
return await self._reader.readexactly(length)

async def run(self):
"""
Coroutine for running the receive loop for incoming packets from EIBD/KNXD.
Expand All @@ -125,17 +143,23 @@ async def run(self):
:raises ConnectionAbortedError: in case of an unexpected EOF (connection closed without ``stop()`` being called)
:raises ConnectionError: in case such an error occurs while reading
:raises ConnectionError: when no connection has been established yet or the previous connection reached an EOF.
:raises TimeoutError: If given `timeout` is exceeded between packets received from KNXD (Python >= 3.11)
:raises asyncio.TimeoutError: If given `timeout` is exceeded between packets received from KNXD (Python < 3.11)
"""
logger.info("Entering KNXd client receive loop ...")

if self._reader is None or self._reader.at_eof():
raise ConnectionError("No connection to KNXD has been established yet or the previous connection's "
"StreamReader is at EOF")
logger.info("Entering KNXd client receive loop ...")

while True:
try:
length = int.from_bytes(await self._reader.readexactly(2), byteorder='big')
data = await self._reader.readexactly(length)
data = None
if self._timeout is not None:
read_task = self._read_raw_knxpacket()
data = await asyncio.wait_for(read_task, self._timeout)
else:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be cool to have some kind of "keep alive" feature here, i.e. trigger some request to KNXD every few seconds or so, so that we can be sure to receive a packet from KNXD regularly. Then we don't have to rely on KNX bus activity to avoid false timeouts and can chose an even shorter timeout value.

I will take a look at the KNXD protocol spec to see if there is a function that we can (mis)use as a keepalive.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. Unfortunately, according to the BCUSDK docs about EIBD's/KNXD's interface, there is no appropriate function to use as a keepalive, that reliably sends a response packet, once we opened the 'group socket' connection.

I'm somehow unconvinced to rely on KNX bus traffic to avoid running into timeouts.

Have you tested whether the event notification mechanism, you implemented today, is sufficient to detect the connection loss and stop the receive loop, when your KNXD server reboots – without the timeout here in the run() coroutine (or with a very very long timeout)?
If it doesn't work: Can you test whether setting up TCP keepalive for the socket here allows us to detect the connection loss reliably?

sock = self._writer.get_extra_info("socket")
# The following seems to depend on your OS and is specific for Linux. See https://stackoverflow.com/q/12248132/10315508
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested the mechanism manually by connecting to a Debian docker image with KNXD installed and used knxtool to simulate some traffic which then I stopped and it seems to work.

It's possible that there is some stuff about coroutines / tasks that goes over my head as I am seeing the CI/CD failing.

The definitive test will come in 5h and 20 minutes :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested whether the event notification mechanism, you implemented today, is sufficient to detect the connection loss and stop the receive loop, when your KNXD server reboots – without the timeout here in the run() coroutine (or with a very very long timeout)?

It was enough as I kept receiving data throughout the night! 🎉

But I see how setting the TCP props on the socket will also be of use.

data = await self._read_raw_knxpacket()

packet = KNXDPacket.decode(data)
logger.debug("Received packet from KNXd: %s", packet)
if packet.type is KNXDPacketTypes.EIB_GROUP_PACKET:
Expand All @@ -147,17 +171,25 @@ async def run(self):
self._current_response = packet
self._response_ready.set()
except asyncio.IncompleteReadError as e:
self._run_exited.set()
if self.closing:
logger.info("KNXd connection reached EOF. KNXd client is stopped.")
return
else:
raise ConnectionAbortedError("KNXd connection was closed with EOF unexpectedly.") from e
except ConnectionError:
# A connection error typically means we cannot proceed further with this connection. Thus we abort the
# receive loop execution with the exception.
# From python 3.11 it will raise a stdlib TimeoutError instead of asyncio.TimeoutError
except (ConnectionError, TimeoutError, asyncio.TimeoutError, asyncio.CancelledError) as error:
# A connection, timeout or cancellation errors
# typically mean we cannot proceed further with this connection.
# Thus we abort the receive loop execution with the exception.
logger.error("A connection, timeout or cancellation error has occurred. "
"Aborting current connection: %s", error)
self._run_exited.set()
raise
except Exception as e:
logger.error("Error while receiving KNX packets:", exc_info=e)
self._run_exited.set()
raise

async def stop(self):
"""
Expand Down Expand Up @@ -217,6 +249,7 @@ async def iterate_group_telegrams(self) -> AsyncIterator[ReceivedGroupAPDU]:
:meth:`set_group_apdu_handler`.

:raises RuntimerError: When a custom handler function has been registered or another iterator is already active
:raises ConnectionAbortedError: in case the `run()` loop exited unexpectedly while waiting for messages
"""
if self._group_apdu_handler:
raise RuntimeError("A custom group APDU handler has already been registered or iterate_group_telegrams() is"
Expand All @@ -225,9 +258,24 @@ async def iterate_group_telegrams(self) -> AsyncIterator[ReceivedGroupAPDU]:
queue: asyncio.Queue[ReceivedGroupAPDU] = asyncio.Queue()
self._group_apdu_handler = queue.put_nowait
try:
run_exited = asyncio.create_task(self._run_exited.wait())
while True:
yield await queue.get()
try:
next_message_task = asyncio.create_task(queue.get())
done, _pending = await asyncio.wait((next_message_task, run_exited),
return_when=asyncio.FIRST_COMPLETED)

if run_exited in done:
raise ConnectionAbortedError("KNXDConnection was closed and is no longer sending messages")

yield next_message_task.result()
finally:
next_message_task.cancel()
except Exception as ex:
logger.error(ex)
raise
finally:
run_exited.cancel()
self._group_apdu_handler = None

async def open_group_socket(self, write_only=False) -> None:
Expand All @@ -243,13 +291,24 @@ async def open_group_socket(self, write_only=False) -> None:
:param write_only: If True, KNXD is requested to open the Group Socket in write-only mode, i.e. no incoming
group telegrams will be received.
:raises RuntimeError: when KNXD responds with an error message or an unexpected response packet.
:raises ConnectionAbortedError: in case the `run()` loop exited unexpectedly while waiting for the response
"""
logger.info("Opening KNX group socket for sending to group addresses ...")
async with self._lock:
self._response_ready.clear()
await self._send_eibd_packet(KNXDPacket(KNXDPacketTypes.EIB_OPEN_GROUPCON,
bytes([0, 0xff if write_only else 0, 0])))
await self._response_ready.wait() # TODO add timeout and Exception on timeout

run_exited = asyncio.create_task(self._run_exited.wait())
response_ready = asyncio.create_task(self._response_ready.wait())

done, _pending = await asyncio.wait((run_exited, response_ready), return_when=asyncio.FIRST_COMPLETED)

if run_exited in done:
response_ready.cancel()
raise ConnectionAbortedError("KNXDConnection was closed and is no longer sending messages")
run_exited.cancel()

response = self._current_response
assert response is not None
if response.type is not KNXDPacketTypes.EIB_OPEN_GROUPCON:
Expand Down
27 changes: 27 additions & 0 deletions test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import unittest.mock
import time
from contextlib import suppress
import sys

import knxdclient
from ._helper import async_test
Expand Down Expand Up @@ -229,6 +230,7 @@ async def iter_packets():
await run_task


@unittest.skipIf(shutil.which("knxd") is None, "knxd is not available in PATH")
class KNXDClientTCPTest(unittest.TestCase):
def setUp(self) -> None:
self.knxd_process = subprocess.Popen(["knxd", "--listen-tcp=16720", "-e", "0.5.1", "-E",
Expand All @@ -254,3 +256,28 @@ async def test_connect_tcp(self) -> None:
await connection.stop()
with suppress(asyncio.CancelledError):
await run_task


class KNXDClientTimeoutTest(unittest.TestCase):
@async_test
async def test_timeout(self) -> None:
# Create TCP server
async def client_handler(reader, writer):
# We never respond anything, so the client should run into timeout
await reader.read(-1)
server = await asyncio.start_server(client_handler, host="127.0.0.1", port=16720)

try:
# Setup connection and receive loop task
connection = knxdclient.KNXDConnection(timeout=1)
await connection.connect(host="127.0.0.1", port=16720)
run_task = asyncio.create_task(connection.run())

with self.assertRaises(ConnectionAbortedError):
await connection.open_group_socket()

with self.assertRaises(TimeoutError if sys.version_info >= (3, 11) else asyncio.TimeoutError):
await run_task
finally:
server.close()
await server.wait_closed()
Loading