Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Also run callbacks on outgoing telegrams, but filter them out b… #572

Merged
merged 4 commits into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased changes

### HA integration

- knx_event: fire also for outgoing telegrams

### Devices

- BinarySensor: return `None` for `BinarySensor.counter` when context timeout is not used (and don't calculate it)
Expand Down
1 change: 1 addition & 0 deletions home-assistant-plugin/custom_components/xknx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ def register_callback(self) -> TelegramQueue.Callback:
self.telegram_received_cb,
address_filters=address_filters,
group_addresses=[],
match_for_outgoing_telegrams=True,
)

async def service_event_register_modify(self, call):
Expand Down
54 changes: 53 additions & 1 deletion test/core_tests/telegram_queue_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,62 @@ def test_register(self):
payload=GroupValueWrite(DPTBinary(1)),
)
self.loop.run_until_complete(
xknx.telegram_queue.process_telegram_incoming(telegram)
xknx.telegram_queue.process_telegram_incoming(telegram),
)
async_telegram_received_cb.assert_called_once_with(telegram)

@patch("xknx.io.KNXIPInterface")
def test_register_with_outgoing_telegrams(self, if_mock):
"""Test telegram_received_callback with outgoing telegrams."""
# pylint: disable=no-self-use
xknx = XKNX()
async_telegram_received_cb = AsyncMock()

async_if_send_telegram = asyncio.Future()
async_if_send_telegram.set_result(None)
if_mock.send_telegram.return_value = async_if_send_telegram

xknx.telegram_queue.register_telegram_received_cb(
async_telegram_received_cb, None, None, True
)

telegram = Telegram(
destination_address=GroupAddress("1/2/3"),
direction=TelegramDirection.OUTGOING,
payload=GroupValueWrite(DPTBinary(1)),
)

xknx.knxip_interface = if_mock
self.loop.run_until_complete(
xknx.telegram_queue.process_telegram_outgoing(telegram)
)
async_telegram_received_cb.assert_called_once_with(telegram)

@patch("xknx.io.KNXIPInterface")
def test_register_with_outgoing_telegrams_does_not_trigger(self, if_mock):
"""Test telegram_received_callback with outgoing telegrams."""
# pylint: disable=no-self-use
xknx = XKNX()
async_telegram_received_cb = AsyncMock()

async_if_send_telegram = asyncio.Future()
async_if_send_telegram.set_result(None)
if_mock.send_telegram.return_value = async_if_send_telegram

xknx.telegram_queue.register_telegram_received_cb(async_telegram_received_cb)

telegram = Telegram(
destination_address=GroupAddress("1/2/3"),
direction=TelegramDirection.OUTGOING,
payload=GroupValueWrite(DPTBinary(1)),
)

xknx.knxip_interface = if_mock
self.loop.run_until_complete(
xknx.telegram_queue.process_telegram_outgoing(telegram)
)
async_telegram_received_cb.assert_not_called()

#
# TEST UNREGISTER
#
Expand Down
15 changes: 14 additions & 1 deletion xknx/core/telegram_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,22 @@ def __init__(
callback: "AsyncTelegramCallback",
address_filters: Optional[List[AddressFilter]] = None,
group_addresses: Optional[List[GroupAddress]] = None,
match_for_outgoing_telegrams: bool = False,
):
"""Initialize Callback class."""
self.callback = callback
self._match_all = address_filters is None and group_addresses is None
self._match_outgoing = match_for_outgoing_telegrams
self.address_filters = [] if address_filters is None else address_filters
self.group_addresses = [] if group_addresses is None else group_addresses

def is_within_filter(self, telegram: Telegram) -> bool:
"""Test if callback is filtering for group address."""
if (
not self._match_outgoing
and telegram.direction == TelegramDirection.OUTGOING
):
return False
if self._match_all:
return True
if isinstance(telegram.destination_address, GroupAddress):
Expand All @@ -69,12 +76,14 @@ def register_telegram_received_cb(
telegram_received_cb: "AsyncTelegramCallback",
address_filters: Optional[List[AddressFilter]] = None,
group_addresses: Optional[List[GroupAddress]] = None,
match_for_outgoing: bool = False,
) -> Callback:
"""Register callback for a telegram beeing received from KNX bus."""
"""Register callback for a telegram being received from KNX bus."""
callback = TelegramQueue.Callback(
telegram_received_cb,
address_filters=address_filters,
group_addresses=group_addresses,
match_for_outgoing_telegrams=match_for_outgoing,
)
self.telegram_received_cbs.append(callback)
return callback
Expand Down Expand Up @@ -165,6 +174,10 @@ async def process_telegram_outgoing(self, telegram: Telegram) -> None:
await self.xknx.knxip_interface.send_telegram(telegram)
if isinstance(telegram.payload, GroupValueWrite):
await self.xknx.devices.process(telegram)

for telegram_received_cb in self.telegram_received_cbs:
if telegram_received_cb.is_within_filter(telegram):
await telegram_received_cb.callback(telegram)
else:
raise CommunicationError("No KNXIP interface defined")

Expand Down