Skip to content

Commit

Permalink
Fix Message'First handling for checksums
Browse files Browse the repository at this point in the history
ref #503
  • Loading branch information
jklmnn committed Dec 1, 2020
1 parent 706967e commit a2575d5
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 4 deletions.
16 changes: 12 additions & 4 deletions rflx/pyrflx/typevalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,13 @@ def __init__(
}
)

self.__message_first_name = First("Message")
initial = self._fields[INITIAL.name]
initial.first = Number(0)
initial.typeval.assign(bytes())
self._simplified_mapping: Dict[Name, Expr] = dict.fromkeys(
[initial.name_size, initial.name_last, initial.name_first], Number(0)
[initial.name_size, initial.name_last, initial.name_first, self.__message_first_name],
Number(0),
)
self.accessible_fields: List[str] = []
if self._skip_verification:
Expand Down Expand Up @@ -903,7 +905,12 @@ def set_checksum_function(self, checksums: Dict[str, Callable]) -> None:

def _is_checksum_settable(self, checksum: "MessageValue.Checksum") -> bool:
def valid_path(value_range: ValueRange) -> bool:
expr: Dict[Expr, str] = dict.fromkeys([value_range.lower, value_range.upper])
lower = value_range.lower.substituted(
func=lambda e: self._fields[self._next_field(INITIAL.name)].name_first
if e == self.__message_first_name
else e
)
expr: Dict[Expr, str] = dict.fromkeys([lower, value_range.upper])

for e in expr:
if isinstance(e, Sub):
Expand All @@ -916,8 +923,9 @@ def valid_path(value_range: ValueRange) -> bool:
else:
assert isinstance(e, (First, Last))
expr[e] = str(e.prefix)
print(str(e), ":", expr[e])

field = expr.get(value_range.lower)
field = expr.get(lower)
assert isinstance(field, str)
upper_field_name = expr[value_range.upper]
if upper_field_name == "Message":
Expand Down Expand Up @@ -1113,7 +1121,7 @@ def __update_simplified_mapping(self, field: Optional[Field] = None) -> None:
self._simplified_mapping[self.__message_last_name] = field.last
return

self._simplified_mapping = {}
self._simplified_mapping = {self.__message_first_name: Number(0)}
for v in self._fields.values():
if isinstance(v.typeval, ScalarValue) and v.set:
self._simplified_mapping[v.name_variable] = v.typeval.expr
Expand Down
31 changes: 31 additions & 0 deletions tests/data/fixtures/pyrflx.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,37 @@ def fixture_icmp_checksum_message_value(icmp_message: model.Message) -> pyrflx.M
)


@pytest.fixture(name="icmp_checksum_message_first")
def fixture_icmp_checksum_message_first(icmp_message: model.Message) -> pyrflx.MessageValue:
return pyrflx.MessageValue(
icmp_message.copy(
structure=[
model.Link(
l.source,
l.target,
condition=expr.And(l.condition, expr.ValidChecksum("Checksum")),
)
if l.target == model.FINAL
else l
for l in icmp_message.structure
],
aspects={
ID("Checksum"): {
ID("Checksum"): [
expr.ValueRange(
expr.First("Message"), expr.Sub(expr.First("Checksum"), expr.Number(1))
),
expr.Size("Checksum"),
expr.ValueRange(
expr.Add(expr.Last("Checksum"), expr.Number(1)), expr.Last("Message")
),
]
}
},
)
)


@pytest.fixture(name="ipv4_package", scope="session")
def fixture_ipv4_package(pyrflx_: pyrflx.PyRFLX) -> pyrflx.Package:
return pyrflx_["IPv4"]
Expand Down
72 changes: 72 additions & 0 deletions tests/unit/pyrflx_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# pylint: disable=too-many-lines
from pathlib import Path

import pytest
Expand Down Expand Up @@ -790,6 +791,8 @@ def test_array_assign_invalid(

def icmp_checksum_function(message: bytes, **kwargs: object) -> int:
first_arg = kwargs.get("Tag'First .. Checksum'First - 1")
if first_arg is None:
first_arg = kwargs.get("Message'First .. Checksum'First - 1")
assert isinstance(first_arg, tuple)
tag_first, checksum_first_minus_one = first_arg
assert tag_first == 0 and checksum_first_minus_one == 15
Expand Down Expand Up @@ -926,6 +929,75 @@ def test_checksum_parse_invalid(icmp_checksum_message_value: MessageValue) -> No
assert not icmp_checksum_message_value.valid_message


def test_checksum_message_first(icmp_checksum_message_first: MessageValue) -> None:
test_data = (
b"\x47\xb4\x67\x5e\x00\x00\x00\x00"
b"\x4a\xfc\x0d\x00\x00\x00\x00\x00\x10\x11\x12\x13\x14\x15\x16\x17"
b"\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f\x20\x21\x22\x23\x24\x25\x26\x27"
b"\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f\x30\x31\x32\x33\x34\x35\x36\x37"
)
icmp_checksum_message_first.set_checksum_function({"Checksum": icmp_checksum_function})
icmp_checksum_message_first.set("Tag", "Echo_Request")
icmp_checksum_message_first.set("Code_Zero", 0)
icmp_checksum_message_first.set("Identifier", 5)
icmp_checksum_message_first.set("Sequence_Number", 1)
icmp_checksum_message_first.set("Data", test_data)
assert icmp_checksum_message_first.get("Checksum") == 12824
assert icmp_checksum_message_first.bytestring == b"\x08\x00\x32\x18\x00\x05\x00\x01" + test_data
assert icmp_checksum_message_first.valid_message


def test_checksum_no_verification() -> None:
# pylint: disable = protected-access
pyrflx_ = PyRFLX.from_specs(
[f"{EX_SPEC_DIR}/icmp.rflx"], skip_model_verification=True, skip_message_verification=True
)
icmp_message = pyrflx_["ICMP"]["Message"]._type
icmp_msg = MessageValue(
icmp_message.copy(
structure=[
Link(
l.source,
l.target,
condition=expr.And(l.condition, expr.ValidChecksum("Checksum")),
)
if l.target == FINAL
else l
for l in icmp_message.structure
],
aspects={
ID("Checksum"): {
ID("Checksum"): [
expr.ValueRange(
expr.First("Message"), expr.Sub(expr.First("Checksum"), expr.Number(1))
),
expr.Size("Checksum"),
expr.ValueRange(
expr.Add(expr.Last("Checksum"), expr.Number(1)), expr.Last("Message")
),
]
}
},
)
)
test_data = (
b"\x47\xb4\x67\x5e\x00\x00\x00\x00"
b"\x4a\xfc\x0d\x00\x00\x00\x00\x00\x10\x11\x12\x13\x14\x15\x16\x17"
b"\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f\x20\x21\x22\x23\x24\x25\x26\x27"
b"\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f\x30\x31\x32\x33\x34\x35\x36\x37"
)
icmp_msg.set_checksum_function({"Checksum": icmp_checksum_function})
icmp_msg.set("Tag", "Echo_Request")
icmp_msg.set("Code_Zero", 0)
icmp_msg.set("Identifier", 5)
icmp_msg.set("Sequence_Number", 1)
icmp_msg.set("Data", test_data)
icmp_msg.update_checksums()
assert icmp_msg.get("Checksum") == 12824
assert icmp_msg.bytestring == b"\x08\x00\x32\x18\x00\x05\x00\x01" + test_data
assert icmp_msg.valid_message


@pytest.fixture(name="tlv_checksum_package", scope="session")
def fixture_tlv_checksum_package(pyrflx_: PyRFLX) -> Package:
return pyrflx_["TLV_With_Checksum"]
Expand Down

0 comments on commit a2575d5

Please sign in to comment.