Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds ability to set timeouts for a knxdclient.KNXDConnection #2

Merged
merged 24 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
aa34d5e
Adds the ability to pass a timeout to the connection
NanoSpicer Nov 27, 2023
eba2006
Replace complex logic into a multicatch
NanoSpicer Nov 27, 2023
b7837b0
Adds fix for python 3.11
NanoSpicer Nov 27, 2023
42f4e45
Include null-check
NanoSpicer Nov 27, 2023
d1c7cca
Update readme
NanoSpicer Nov 27, 2023
7467f1e
Annotates return type of "_read_raw_knxpacket"
NanoSpicer Nov 28, 2023
4a81f2c
Notify coroutines from iterate_group_telegrams
NanoSpicer Nov 28, 2023
7451cdb
Apply same mechanism on open_group_socket
NanoSpicer Nov 28, 2023
a1cccfe
Make task name shorter
NanoSpicer Nov 28, 2023
859ecf2
Fix: Bug where it would exit automatically
NanoSpicer Nov 28, 2023
aa2aa16
Assers self._reader is not None
NanoSpicer Nov 29, 2023
a453745
Suppress the connection aborted exceptions
NanoSpicer Nov 29, 2023
71c959a
Reset the _run_exited flag
NanoSpicer Nov 29, 2023
ca9b4f4
Replace dupplicated assertion with checks in _read_raw_knxpacket
NanoSpicer Nov 29, 2023
324c8dc
Adds docstring
NanoSpicer Nov 30, 2023
b2b6bf8
Adds docstring and removes too many blank lines between methods
NanoSpicer Nov 30, 2023
c38e417
Fix: Linting
NanoSpicer Nov 30, 2023
15b2f98
Fix: Replace reader checking with not null assertion
NanoSpicer Nov 30, 2023
26e06a6
Improve documentation of timeout parameter and exceptions
mhthies Nov 30, 2023
b5882cc
Improve code style, using string literal merging and logger's built-i…
mhthies Nov 30, 2023
73c0aa5
Fix stale asyncio Tasks from parallel waiting
mhthies Nov 30, 2023
9e2e56c
Add test for timeout handling
mhthies Nov 30, 2023
43e7c0e
Fix tests if no knxd is present on host
mhthies Nov 30, 2023
34e21ff
Improve code style
mhthies Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ def handler(packet: knxdclient.ReceivedGroupAPDU) -> None:
print("Received group telegram: {}".format(packet))

async def main() -> None:
connection = knxdclient.KNXDConnection()
# Raises a TimeoutError after 30 seconds of not receiving any traffic. This argument is optional
connection = knxdclient.KNXDConnection(timeout=30.0)
connection.set_group_apdu_handler(handler)
await connection.connect()
# Connection was successful. Start receive loop:
Expand Down
45 changes: 37 additions & 8 deletions knxdclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@ def handler(packet: knxdclient.ReceivedGroupAPDU) -> None:
await connection.stop()
await run_task
"""
def __init__(self):
def __init__(self, timeout: Optional[float] = None):
self._group_apdu_handler: Optional[Callable[[ReceivedGroupAPDU], Any]] = None
self.closing = False
self._current_response: Optional[KNXDPacket] = None
self._reader: Optional[asyncio.StreamReader] = None
self._writer: Optional[asyncio.StreamWriter] = None
self._timeout: Optional[float] = timeout
# A lock to ensure, that only one synchronous action is performed on the KNXD connection at once. Synchronous
# actions are for example EIB_OPEN_GROUPCON. The lock should be acquired before sending the synchronous request
# packet to KNXD and only released after receiving the response packet from KNXD.
Expand Down Expand Up @@ -106,6 +107,14 @@ async def connect(self, host: str = 'localhost', port: int = 6720, sock: Optiona
self._reader, self._writer = await asyncio.open_connection(host=host, port=port)
logger.info("Connecting to KNXd successful")


async def _read_raw_knxpacket(self):
mrctrifork marked this conversation as resolved.
Show resolved Hide resolved
length = int.from_bytes(await self._reader.readexactly(2), byteorder='big')
# data is "bytes"
return await self._reader.readexactly(length)



async def run(self):
"""
Coroutine for running the receive loop for incoming packets from EIBD/KNXD.
Expand Down Expand Up @@ -134,8 +143,13 @@ async def run(self):

while True:
try:
length = int.from_bytes(await self._reader.readexactly(2), byteorder='big')
data = await self._reader.readexactly(length)
data = None
if self._timeout is not None:
read_task = self._read_raw_knxpacket()
data = await asyncio.wait_for(read_task, self._timeout)
else:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be cool to have some kind of "keep alive" feature here, i.e. trigger some request to KNXD every few seconds or so, so that we can be sure to receive a packet from KNXD regularly. Then we don't have to rely on KNX bus activity to avoid false timeouts and can chose an even shorter timeout value.

I will take a look at the KNXD protocol spec to see if there is a function that we can (mis)use as a keepalive.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. Unfortunately, according to the BCUSDK docs about EIBD's/KNXD's interface, there is no appropriate function to use as a keepalive, that reliably sends a response packet, once we opened the 'group socket' connection.

I'm somehow unconvinced to rely on KNX bus traffic to avoid running into timeouts.

Have you tested whether the event notification mechanism, you implemented today, is sufficient to detect the connection loss and stop the receive loop, when your KNXD server reboots – without the timeout here in the run() coroutine (or with a very very long timeout)?
If it doesn't work: Can you test whether setting up TCP keepalive for the socket here allows us to detect the connection loss reliably?

sock = self._writer.get_extra_info("socket")
# The following seems to depend on your OS and is specific for Linux. See https://stackoverflow.com/q/12248132/10315508
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested the mechanism manually by connecting to a Debian docker image with KNXD installed and used knxtool to simulate some traffic which then I stopped and it seems to work.

It's possible that there is some stuff about coroutines / tasks that goes over my head as I am seeing the CI/CD failing.

The definitive test will come in 5h and 20 minutes :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested whether the event notification mechanism, you implemented today, is sufficient to detect the connection loss and stop the receive loop, when your KNXD server reboots – without the timeout here in the run() coroutine (or with a very very long timeout)?

It was enough as I kept receiving data throughout the night! 🎉

But I see how setting the TCP props on the socket will also be of use.

data = await self._read_raw_knxpacket()

packet = KNXDPacket.decode(data)
logger.debug("Received packet from KNXd: %s", packet)
if packet.type is KNXDPacketTypes.EIB_GROUP_PACKET:
Expand All @@ -152,9 +166,11 @@ async def run(self):
return
else:
raise ConnectionAbortedError("KNXd connection was closed with EOF unexpectedly.") from e
except ConnectionError:
# A connection error typically means we cannot proceed further with this connection. Thus we abort the
# receive loop execution with the exception.
# From python 3.11 it will raise a stdlib TimeoutError instead of asyncio.TimeoutError
except (ConnectionError, TimeoutError, asyncio.TimeoutError, asyncio.CancelledError) as error:
# A connection, timeout or cancellation errors typically mean we cannot proceed further with this connection.
# Thus we abort the receive loop execution with the exception.
logger.error(f"A connection, timeout or cancellation error has occurred. Aborting current connection. {error}")
raise
except Exception as e:
logger.error("Error while receiving KNX packets:", exc_info=e)
Expand Down Expand Up @@ -226,7 +242,14 @@ async def iterate_group_telegrams(self) -> AsyncIterator[ReceivedGroupAPDU]:
self._group_apdu_handler = queue.put_nowait
try:
while True:
yield await queue.get()
if self._timeout is not None:
next_message_task = queue.get()
yield await asyncio.wait_for(next_message_task, self._timeout)
else:
mrctrifork marked this conversation as resolved.
Show resolved Hide resolved
yield await queue.get()
# From Python 3.11 a stdlib TimeoutError is thrown instead
except (asyncio.TimeoutError, TimeoutError):
logger.error(f"Timeout while awaiting for KNX messages")
finally:
self._group_apdu_handler = None

Expand All @@ -249,7 +272,13 @@ async def open_group_socket(self, write_only=False) -> None:
self._response_ready.clear()
await self._send_eibd_packet(KNXDPacket(KNXDPacketTypes.EIB_OPEN_GROUPCON,
bytes([0, 0xff if write_only else 0, 0])))
await self._response_ready.wait() # TODO add timeout and Exception on timeout

if self._timeout is not None:
wait_task = self._response_ready.wait()
await asyncio.wait_for(wait_task, self._timeout)
else:
await self._response_ready.wait()

response = self._current_response
assert response is not None
if response.type is not KNXDPacketTypes.EIB_OPEN_GROUPCON:
Expand Down
Loading