Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Make the DoIP stack work #148

Merged
merged 1 commit into from
May 31, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 44 additions & 26 deletions src/gallia/transports/doip.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ def unpack(cls, data: bytes) -> GenericHeader:
raise ValueError("inverse protocol_version is invalid")
return cls(
protocol_version,
inverse_protocol_version,
PayloadTypes(payload_type),
payload_length,
b"",
)


Expand Down Expand Up @@ -167,13 +167,19 @@ class DiagnosticMessage:
UserData: bytes

def pack(self) -> bytes:
return struct.pack(
"!HHp", self.SourceAddress, self.TargetAddress, self.UserData
return (
struct.pack(
"!HH",
self.SourceAddress,
self.TargetAddress,
)
+ self.UserData
)

@classmethod
def unpack(cls, data: bytes) -> DiagnosticMessage:
source_address, target_address, data = struct.unpack("!HHp", data)
source_address, target_address = struct.unpack("!HH", data[:4])
data = data[4:]
return cls(source_address, target_address, data)


Expand All @@ -185,12 +191,14 @@ class DiagnosticMessageAcknowledgement:
PreviousDiagnosticMessageData: bytes

def pack(self) -> bytes:
return struct.pack(
"!HHBp",
self.SourceAddress,
self.TargetAddress,
self.ACKCode,
self.PreviousDiagnosticMessageData,
return (
struct.pack(
"!HHB",
self.SourceAddress,
self.TargetAddress,
self.ACKCode,
)
+ self.PreviousDiagnosticMessageData
)


Expand All @@ -199,9 +207,9 @@ class DiagnosticMessagePositiveAcknowledgement(DiagnosticMessageAcknowledgement)

@classmethod
def unpack(cls, data: bytes) -> DiagnosticMessagePositiveAcknowledgement:
source_address, target_address, ack_code, prev_data = struct.unpack(
"!HHBp", data
)
source_address, target_address, ack_code = struct.unpack("!HHB", data[:5])
prev_data = data[5:]

return cls(
source_address,
target_address,
Expand All @@ -215,9 +223,9 @@ class DiagnosticMessageNegativeAcknowledgement(DiagnosticMessageAcknowledgement)

@classmethod
def unpack(cls, data: bytes) -> DiagnosticMessageNegativeAcknowledgement:
source_address, target_address, ack_code, prev_data = struct.unpack(
"!HHBp", data
)
source_address, target_address, ack_code = struct.unpack("!HHB", data[:5])
prev_data = data[5:]

return cls(
source_address,
target_address,
Expand Down Expand Up @@ -310,8 +318,7 @@ async def _read_frame(self) -> DoIPFrame:
elif hdr.PayloadType == PayloadTypes.AliveCheckRequest:
payload = AliveCheckRequest()
else:
raise BrokenPipeError(f"unexpected DoIP message: {hdr} {payload}")
self.logger.log_trace(f"hdr: {hdr}, data: {payload}")
raise BrokenPipeError(f"unexpected DoIP message: {hdr} {payload_buf.hex()}")
return hdr, payload

async def _read_worker(self) -> None:
Expand Down Expand Up @@ -368,15 +375,18 @@ async def _read_ack(self, prev_data: bytes) -> None:
f"unexpected DoIP message: {hdr} {payload}, expected positive ACK"
)

if payload.SourceAddress != self.src_addr:
if payload.SourceAddress != self.target_addr:
self.logger.log_warning(
f"ack: unexpected src_addr: {payload.SourceAddress:#04x}"
)
if payload.TargetAddress != self.target_addr:
if payload.TargetAddress != self.src_addr:
self.logger.log_warning(
f"ack: unexpected dst_addr: {payload.TargetAddress:#04x}"
)
if prev_data != payload.PreviousDiagnosticMessageData:
if (
len(payload.PreviousDiagnosticMessageData) > 0
and prev_data != payload.PreviousDiagnosticMessageData
):
self.logger.log_warning("ack: previous data differs from request")
self.logger.log_warning(
f"ack: got: {payload.PreviousDiagnosticMessageData.hex()} expected {prev_data.hex()}"
Expand Down Expand Up @@ -412,12 +422,14 @@ async def write_request_raw(self, hdr: GenericHeader, payload: DoIPOutData) -> N
# Now an ACK message is expected.
await asyncio.wait_for(
self._read_ack(payload.UserData),
TimingAndCommunicationParameters.DiagnosticMessageMessageAckTimeout,
TimingAndCommunicationParameters.DiagnosticMessageMessageAckTimeout
/ 1000,
)
elif isinstance(payload, RoutingActivationRequest):
await asyncio.wait_for(
self._read_routing_activation_response(),
TimingAndCommunicationParameters.DiagnosticMessageMessageAckTimeout,
TimingAndCommunicationParameters.DiagnosticMessageMessageAckTimeout
/ 1000,
)
except asyncio.TimeoutError as e:
await self.close()
Expand Down Expand Up @@ -539,7 +551,9 @@ async def close(self) -> None:
await self.connection.close()

async def read(
self, timeout: Optional[float] = None, tags: Optional[list[str]] = None
self,
timeout: Optional[float] = None,
tags: Optional[list[str]] = None,
) -> bytes:
assert self.connection is not None, assertion_str

Expand Down Expand Up @@ -575,15 +589,19 @@ async def sendto(
PayloadTypeSpecificMessageContent=b"",
)
payload = DiagnosticMessage(
SourceAddress=self.args["src_addr"], TargetAddress=dst, UserData=data
SourceAddress=self.args["src_addr"],
TargetAddress=dst,
UserData=data,
)
await asyncio.wait_for(self.connection.write_request_raw(hdr, payload), timeout)
self.logger.log_write(f"{dst:x}#{data.hex()}", tags)

return len(data)

async def recvfrom(
self, timeout: Optional[float] = None, tags: Optional[list[str]] = None
self,
timeout: Optional[float] = None,
tags: Optional[list[str]] = None,
) -> tuple[int, bytes]:
assert self.connection is not None, assertion_str

Expand Down