Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore unknown CEMI Messages #271

Merged
merged 1 commit into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions test/knxip_tests/cemi_frame_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pytest import fixture, raises

from xknx.dpt import DPTBinary
from xknx.exceptions import CouldNotParseKNXIP
from xknx.exceptions import CouldNotParseKNXIP, UnsupportedCEMIMessage
from xknx.knxip.cemi_frame import CEMIFrame
from xknx.knxip.knxip_enum import APCICommand, CEMIMessageCode
from xknx.telegram import PhysicalAddress
Expand Down Expand Up @@ -50,8 +50,8 @@ def test_valid_command(frame):

def test_invalid_tpci_apci(frame):
"""Test for invalid APCICommand"""
with raises(CouldNotParseKNXIP, match=r".*APCI not supported: .*"):
frame.from_knx(get_data(0x29, 0, 0, 0, 0, 1, 0xFFC0, []))
with raises(UnsupportedCEMIMessage, match=r".*APCI not supported: .*"):
frame.from_knx_data_link_layer(get_data(0x29, 0, 0, 0, 0, 1, 0xFFC0, []))


def test_invalid_apdu_len(frame):
Expand All @@ -62,11 +62,5 @@ def test_invalid_apdu_len(frame):

def test_invalid_invalid_len(frame):
"""Test for invalid cemi len"""
with raises(CouldNotParseKNXIP, match=r".*CEMI too small"):
frame.from_knx(get_data(0x29, 0, 0, 0, 0, 2, 0, [])[:5])


def test_invalid_invalid_code(frame):
"""Test for invalid cemi code"""
with raises(CouldNotParseKNXIP, match=r".*Could not understand CEMIMessageCode"):
frame.from_knx(get_data(0x0, 0, 0, 0, 0, 2, 0, []))
with raises(UnsupportedCEMIMessage, match=r".*CEMI too small.*"):
frame.from_knx_data_link_layer(get_data(0x29, 0, 0, 0, 0, 2, 0, [])[:5])
3 changes: 2 additions & 1 deletion xknx/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
# flake8: noqa
from .exception import (
ConversionError, CouldNotParseAddress, CouldNotParseKNXIP,
CouldNotParseTelegram, DeviceIllegalValue, XKNXException)
CouldNotParseTelegram, DeviceIllegalValue, UnsupportedCEMIMessage,
XKNXException)
16 changes: 15 additions & 1 deletion xknx/exceptions/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class CouldNotParseKNXIP(XKNXException):
"""Exception class for wrong KNXIP data."""

def __init__(self, description=""):
"""Initialize CouldNotParseTelegram class."""
"""Initialize CouldNotParseKNXIP class."""
super().__init__()
self.description = description

Expand All @@ -49,6 +49,20 @@ def __str__(self):
.format(self.description)


class UnsupportedCEMIMessage(XKNXException):
"""Exception class for unsupported CEMI Messages."""

def __init__(self, description=""):
"""Initialize UnsupportedCEMIMessage class."""
super().__init__()
self.description = description

def __str__(self):
"""Return object as readable string."""
return '<UnsupportedCEMIMessage description="{0}" />' \
.format(self.description)


class ConversionError(XKNXException):
"""Exception class for error while converting one type to another."""

Expand Down
30 changes: 18 additions & 12 deletions xknx/knxip/cemi_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
File: AN117 v02.01 KNX IP Communication Medium DV.pdf
"""
from xknx.dpt import DPTArray, DPTBinary
from xknx.exceptions import ConversionError, CouldNotParseKNXIP
from xknx.exceptions import (
ConversionError, CouldNotParseKNXIP, UnsupportedCEMIMessage)
from xknx.telegram import GroupAddress, PhysicalAddress, Telegram, TelegramType

from .body import KNXIPBody
Expand Down Expand Up @@ -106,20 +107,25 @@ def calculated_length(self):
def from_knx(self, raw):
"""Parse/deserialize from KNX/IP raw data."""
try:
self.code = CEMIMessageCode(raw[0])
except ValueError:
raise CouldNotParseKNXIP("Could not understand CEMIMessageCode: {0} ".format(raw[0]))

if self.code == CEMIMessageCode.L_DATA_IND or \
self.code == CEMIMessageCode.L_Data_REQ or \
self.code == CEMIMessageCode.L_DATA_CON:
return self.from_knx_data_link_layer(raw)
raise CouldNotParseKNXIP("Could not understand CEMIMessageCode: {0} / {1}".format(self.code, raw[0]))
try:
self.code = CEMIMessageCode(raw[0])
except ValueError:
raise UnsupportedCEMIMessage("CEMIMessageCode not implemented: {0} ".format(raw[0]))

if self.code == CEMIMessageCode.L_DATA_IND or \
self.code == CEMIMessageCode.L_Data_REQ or \
self.code == CEMIMessageCode.L_DATA_CON:
return self.from_knx_data_link_layer(raw)
raise UnsupportedCEMIMessage("Could not handle CEMIMessageCode: {0} / {1}".format(self.code, raw[0]))
except UnsupportedCEMIMessage as unsupported_cemi_err:
self.xknx.logger.warning("Ignoring not implemented CEMI: %s", unsupported_cemi_err)
return len(raw)
farmio marked this conversation as resolved.
Show resolved Hide resolved

def from_knx_data_link_layer(self, cemi):
"""Parse L_DATA_IND, CEMIMessageCode.L_Data_REQ, CEMIMessageCode.L_DATA_CON."""
if len(cemi) < 11:
raise CouldNotParseKNXIP("CEMI too small")
# eg. ETS Line-Scan issues L_DATA_IND with length 10
raise UnsupportedCEMIMessage("CEMI too small. Length: {0}; CEMI: {1}".format(len(cemi), cemi))

# AddIL (Additional Info Length), as specified within
# KNX Chapter 3.6.3/4.1.4.3 "Additional information."
Expand All @@ -146,7 +152,7 @@ def from_knx_data_link_layer(self, cemi):
try:
self.cmd = APCICommand(tpci_apci & 0xFFC0)
except ValueError:
raise CouldNotParseKNXIP(
raise UnsupportedCEMIMessage(
"APCI not supported: {0:#012b}".format(tpci_apci & 0xFFC0))

apdu = cemi[10 + addil:]
Expand Down