Skip to content

Commit

Permalink
Feat: Also run callbacks on outgoing telegrams, but filter them out b… (
Browse files Browse the repository at this point in the history
#572)

* Feat: Also run callbacks on outgoing telegrams, but filter them out by default - just not for HA.
  • Loading branch information
marvin-w authored Feb 16, 2021
1 parent afa30c7 commit c9b1c11
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 2 deletions.
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased changes

### HA integration

- knx_event: fire also for outgoing telegrams

### Devices

- BinarySensor: return `None` for `BinarySensor.counter` when context timeout is not used (and don't calculate it)
Expand Down
1 change: 1 addition & 0 deletions home-assistant-plugin/custom_components/xknx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ def register_callback(self) -> TelegramQueue.Callback:
self.telegram_received_cb,
address_filters=address_filters,
group_addresses=[],
match_for_outgoing_telegrams=True,
)

async def service_event_register_modify(self, call):
Expand Down
54 changes: 53 additions & 1 deletion test/core_tests/telegram_queue_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,62 @@ def test_register(self):
payload=GroupValueWrite(DPTBinary(1)),
)
self.loop.run_until_complete(
xknx.telegram_queue.process_telegram_incoming(telegram)
xknx.telegram_queue.process_telegram_incoming(telegram),
)
async_telegram_received_cb.assert_called_once_with(telegram)

@patch("xknx.io.KNXIPInterface")
def test_register_with_outgoing_telegrams(self, if_mock):
"""Test telegram_received_callback with outgoing telegrams."""
# pylint: disable=no-self-use
xknx = XKNX()
async_telegram_received_cb = AsyncMock()

async_if_send_telegram = asyncio.Future()
async_if_send_telegram.set_result(None)
if_mock.send_telegram.return_value = async_if_send_telegram

xknx.telegram_queue.register_telegram_received_cb(
async_telegram_received_cb, None, None, True
)

telegram = Telegram(
destination_address=GroupAddress("1/2/3"),
direction=TelegramDirection.OUTGOING,
payload=GroupValueWrite(DPTBinary(1)),
)

xknx.knxip_interface = if_mock
self.loop.run_until_complete(
xknx.telegram_queue.process_telegram_outgoing(telegram)
)
async_telegram_received_cb.assert_called_once_with(telegram)

@patch("xknx.io.KNXIPInterface")
def test_register_with_outgoing_telegrams_does_not_trigger(self, if_mock):
"""Test telegram_received_callback with outgoing telegrams."""
# pylint: disable=no-self-use
xknx = XKNX()
async_telegram_received_cb = AsyncMock()

async_if_send_telegram = asyncio.Future()
async_if_send_telegram.set_result(None)
if_mock.send_telegram.return_value = async_if_send_telegram

xknx.telegram_queue.register_telegram_received_cb(async_telegram_received_cb)

telegram = Telegram(
destination_address=GroupAddress("1/2/3"),
direction=TelegramDirection.OUTGOING,
payload=GroupValueWrite(DPTBinary(1)),
)

xknx.knxip_interface = if_mock
self.loop.run_until_complete(
xknx.telegram_queue.process_telegram_outgoing(telegram)
)
async_telegram_received_cb.assert_not_called()

#
# TEST UNREGISTER
#
Expand Down
15 changes: 14 additions & 1 deletion xknx/core/telegram_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,22 @@ def __init__(
callback: "AsyncTelegramCallback",
address_filters: Optional[List[AddressFilter]] = None,
group_addresses: Optional[List[GroupAddress]] = None,
match_for_outgoing_telegrams: bool = False,
):
"""Initialize Callback class."""
self.callback = callback
self._match_all = address_filters is None and group_addresses is None
self._match_outgoing = match_for_outgoing_telegrams
self.address_filters = [] if address_filters is None else address_filters
self.group_addresses = [] if group_addresses is None else group_addresses

def is_within_filter(self, telegram: Telegram) -> bool:
"""Test if callback is filtering for group address."""
if (
not self._match_outgoing
and telegram.direction == TelegramDirection.OUTGOING
):
return False
if self._match_all:
return True
if isinstance(telegram.destination_address, GroupAddress):
Expand All @@ -69,12 +76,14 @@ def register_telegram_received_cb(
telegram_received_cb: "AsyncTelegramCallback",
address_filters: Optional[List[AddressFilter]] = None,
group_addresses: Optional[List[GroupAddress]] = None,
match_for_outgoing: bool = False,
) -> Callback:
"""Register callback for a telegram beeing received from KNX bus."""
"""Register callback for a telegram being received from KNX bus."""
callback = TelegramQueue.Callback(
telegram_received_cb,
address_filters=address_filters,
group_addresses=group_addresses,
match_for_outgoing_telegrams=match_for_outgoing,
)
self.telegram_received_cbs.append(callback)
return callback
Expand Down Expand Up @@ -165,6 +174,10 @@ async def process_telegram_outgoing(self, telegram: Telegram) -> None:
await self.xknx.knxip_interface.send_telegram(telegram)
if isinstance(telegram.payload, GroupValueWrite):
await self.xknx.devices.process(telegram)

for telegram_received_cb in self.telegram_received_cbs:
if telegram_received_cb.is_within_filter(telegram):
await telegram_received_cb.callback(telegram)
else:
raise CommunicationError("No KNXIP interface defined")

Expand Down

0 comments on commit c9b1c11

Please sign in to comment.