diff --git a/src/gallia/commands/discover/doip.py b/src/gallia/commands/discover/doip.py index d11a0738b..748c9f3a4 100644 --- a/src/gallia/commands/discover/doip.py +++ b/src/gallia/commands/discover/doip.py @@ -14,13 +14,15 @@ from gallia.command import AsyncScript from gallia.services.uds.core.service import ( - DiagnosticSessionControlRequest, - DiagnosticSessionControlResponse, + TesterPresentRequest, + TesterPresentResponse, ) from gallia.transports.doip import ( DiagnosticMessage, DiagnosticMessageNegativeAckCodes, DoIPConnection, + DoIPNegativeAckError, + DoIPRoutingActivationDeniedError, RoutingActivationRequestTypes, RoutingActivationResponseCodes, TimingAndCommunicationParameters, @@ -216,25 +218,22 @@ async def enumerate_routing_activation_types( self.logger.info( f"[๐Ÿคฏ] Holy moly, it actually worked for activation_type {routing_activation_type:#x} and src_addr {src_addr:#x}!!!" ) - except ConnectionAbortedError as e: - # Let's utilize Gallia's excellent error handling - error = RoutingActivationResponseCodes[str(e).split(" ")[-1]] + except DoIPRoutingActivationDeniedError as e: self.logger.info( - f"[๐ŸŒŸ] splendid, {routing_activation_type:#x} yields a {error.name}" + f"[๐ŸŒŸ] splendid, {routing_activation_type:#x} yields {e.rac_code.name}" ) - if error != RoutingActivationResponseCodes.UnsupportedActivationType: + if ( + e.rac_code + != RoutingActivationResponseCodes.UnsupportedActivationType + ): rat_not_unsupported.append(routing_activation_type) - if error == RoutingActivationResponseCodes.UnknownSourceAddress: + if e.rac_code == RoutingActivationResponseCodes.UnknownSourceAddress: rat_wrong_source.append(routing_activation_type) finally: - try: - await conn.close() - except ConnectionResetError as e: - # This triggers when the connection is closed already, as conn.close() is not handling this - self.logger.warn(f"[โ›”] Could not close connection: {e}") + await conn.close() self.logger.notice( f"[๐Ÿ’Ž] Look what RoutingActivationTypes I've found that are not 'unsupported': {', '.join([f'{x:#x}' for x in rat_not_unsupported])}" @@ -252,6 +251,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 timeout: None | float = None, ) -> None: known_targets = [] + unreachable_targets = [] responsive_targets = [] search_space = range(start, stop + 1) @@ -260,12 +260,12 @@ async def enumerate_target_addresses( # noqa: PLR0913 ) for target_addr in search_space: - self.logger.info(f"[๐Ÿšง] Attempting connection to {target_addr:#02x}") + self.logger.debug(f"[๐Ÿšง] Attempting connection to {target_addr:#x}") conn.target_addr = target_addr try: - req = DiagnosticSessionControlRequest(0x01) + req = TesterPresentRequest(suppress_response=False) await conn.write_diag_request(req.pdu) # If we reach this, the request was not denied due to unknown TargetAddress @@ -280,6 +280,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 ) as f: await f.write(f"{known_targets[-1]}\n") + self.logger.info(f"[โณ] Waiting for reply of target {target_addr:#x}") # Hardcoded loop to detect potential broadcasts while True: pot_broadcast, data = await asyncio.wait_for( @@ -302,7 +303,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 f"target_addr={target_addr:#x} yielded reply from {pot_broadcast:#x}; could also be late answer triggered by previous address!\n" ) - resp = DiagnosticSessionControlResponse.parse_static(data) + resp = TesterPresentResponse.parse_static(data) self.logger.notice( f"[๐Ÿฅณ] It cannot get nicer: {target_addr:#x} responded: {resp}" ) @@ -312,28 +313,36 @@ async def enumerate_target_addresses( # noqa: PLR0913 ) as f: await f.write(f"{known_targets[-1]}\n") - except ( - BrokenPipeError - ) as e: # Though it's obvious: this error is raised when a DoIP NACK is received - error = DiagnosticMessageNegativeAckCodes(int(str(e).split(" ")[-1], 0)) - if error == DiagnosticMessageNegativeAckCodes.UnknownTargetAddress: + except DoIPNegativeAckError as e: + if ( + e.nack_code + == DiagnosticMessageNegativeAckCodes.UnknownTargetAddress + ): self.logger.info( f"[๐Ÿซฅ] {target_addr:#x} is an unknown target address" ) continue + elif e.nack_code == DiagnosticMessageNegativeAckCodes.TargetUnreachable: + self.logger.info( + f"[๐Ÿ’ค] {target_addr:#x} is (currently?) unreachable" + ) + unreachable_targets.append(known_targets[-1]) + async with aiofiles.open( + self.artifacts_dir.joinpath("5_unreachable_targets.txt"), "a" + ) as f: + await f.write(f"{known_targets[-1]}\n") + continue else: self.logger.warning( - f"[๐Ÿคท] {target_addr:#x} is behaving strangely: {error.name}" + f"[๐Ÿคท] {target_addr:#x} is behaving strangely: {e.nack_code.name}" ) async with aiofiles.open( self.artifacts_dir.joinpath("7_targets_with_errors.txt"), "a" ) as f: - await f.write(f"{target_addr:#x}: {error.name}\n") + await f.write(f"{target_addr:#x}: {e.nack_code.name}\n") continue - except ( - asyncio.TimeoutError - ): # This triggers when no ACK is received, or ACK but no UDS reply + except asyncio.TimeoutError: # This triggers when DoIP ACK but no UDS reply self.logger.info( f"[๐Ÿ™Š] Presumably no active ECU on target address {target_addr:#x}" ) @@ -346,7 +355,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 except (ConnectionError, ConnectionResetError) as e: # Whenever this triggers, but sometimes connections are closed not by us self.logger.warn( - f"[๐Ÿซฆ] Sexy, but unexpected: {target_addr:#} triggered {e}" + f"[๐Ÿซฆ] Sexy, but unexpected: {target_addr:#x} triggered {e}" ) async with aiofiles.open( self.artifacts_dir.joinpath("7_targets_with_errors.txt"), "a" @@ -359,20 +368,22 @@ async def enumerate_target_addresses( # noqa: PLR0913 ) continue - try: - await conn.close() - except ConnectionResetError as e: - # This triggers when the connection is closed already, as conn.close() is not handling this - self.logger.warn(f"[โ›”] could not close connection: {e}") + await conn.close() self.logger.notice( - "[โš”๏ธ] It's dangerous to test alone, take one of these known targets:" + f"[โš”๏ธ] It's dangerous to test alone, take one of these {len(known_targets)} known targets:" ) for item in known_targets: self.logger.notice(item) self.logger.notice( - "[๐Ÿ’ฐ] For even more profit, try targets that actually responded:" + f"[โ“] Those {len(unreachable_targets)} targets were unreachable by the gateway (could be just temporary):" + ) + for item in unreachable_targets: + self.logger.notice(item) + + self.logger.notice( + f"[๐Ÿ’ฐ] For even more profit, try one of the {len(responsive_targets)} targets that actually responded:" ) for item in responsive_targets: self.logger.notice(item) @@ -414,7 +425,8 @@ async def read_diag_request_custom( while True: hdr, payload = await conn.read_frame() if not isinstance(payload, DiagnosticMessage): - raise BrokenPipeError(f"[๐Ÿงจ] Unexpected DoIP message: {hdr} {payload}") + self.logger.warning(f"[๐Ÿงจ] Unexpected DoIP message: {hdr} {payload}") + return None, b"" if payload.SourceAddress != conn.target_addr: return payload.SourceAddress, payload.UserData if payload.TargetAddress != conn.src_addr: @@ -449,30 +461,24 @@ async def enumerate_source_addresses( try: await conn.write_routing_activation_request(routing_activation_type) - except ConnectionAbortedError as e: - # Let's utilize Gallia's excellent error handling - error = RoutingActivationResponseCodes[str(e).split(" ")[-1]] + except DoIPRoutingActivationDeniedError as e: self.logger.info( - f"[๐ŸŒŸ] splendid, {source_address:#x} yields a {error.name}" + f"[๐ŸŒŸ] splendid, {source_address:#x} yields {e.rac_code.name}" ) - if error != RoutingActivationResponseCodes.UnknownSourceAddress: + if e.rac_code != RoutingActivationResponseCodes.UnknownSourceAddress: denied_sourceAddresses.append(source_address) async with aiofiles.open( self.artifacts_dir.joinpath("2_denied_src_addresses.txt"), "a" ) as f: await f.write( - f"activation_type={routing_activation_type:#x},src_addr={source_address:#x}: {error.name}\n" + f"activation_type={routing_activation_type:#x},src_addr={source_address:#x}: {e.rac_code.name}\n" ) continue finally: - try: - await conn.close() - except ConnectionResetError as e: - # This triggers when the connection is closed already, as conn.close() is not handling this - self.logger.warn(f"[โ›”] could not close connection: {e}") + await conn.close() self.logger.info( f"[๐Ÿคฏ] Holy moly, it actually worked for activation_type {routing_activation_type:#x} and src_addr {source_address:#x}!!!" @@ -488,7 +494,7 @@ async def enumerate_source_addresses( # Print valid SourceAddresses and suitable target string for config self.logger.notice( - f"[๐Ÿ’€] Look what SourceAddresses got denied: {', '.join([f'{x:#x}' for x in known_sourceAddresses])}" + f"[๐Ÿ’€] Look what SourceAddresses got denied: {', '.join([f'{x:#x}' for x in denied_sourceAddresses])}" ) self.logger.notice( f"[๐Ÿ’Ž] Look what valid SourceAddresses I've found: {', '.join([f'{x:#x}' for x in known_sourceAddresses])}" diff --git a/src/gallia/transports/doip.py b/src/gallia/transports/doip.py index 6efafd09e..c08a9591a 100644 --- a/src/gallia/transports/doip.py +++ b/src/gallia/transports/doip.py @@ -8,6 +8,7 @@ import struct from dataclasses import dataclass from enum import IntEnum, unique +from typing import Any from pydantic import BaseModel, field_validator @@ -31,16 +32,31 @@ class RoutingActivationRequestTypes(IntEnum): @unique class RoutingActivationResponseCodes(IntEnum): + UNDEFINED = -0x01 UnknownSourceAddress = 0x00 - NoRessources = 0x01 + NoResources = 0x01 InvalidConnectionEntry = 0x02 - AlreadyActived = 0x03 + AlreadyActive = 0x03 AuthenticationMissing = 0x04 ConfirmationRejected = 0x05 UnsupportedActivationType = 0x06 Success = 0x10 SuccessConfirmationRequired = 0x11 + @classmethod + def _missing_(cls, value: Any) -> RoutingActivationResponseCodes: + return cls.UNDEFINED + + +class DoIPRoutingActivationDeniedError(ConnectionAbortedError): + rac_code: RoutingActivationResponseCodes + + def __init__(self, rac_code: int): + self.rac_code = RoutingActivationResponseCodes(rac_code) + super().__init__( + f"DoIP routing activation denied: {self.rac_code.name} ({rac_code})" + ) + @unique class PayloadTypes(IntEnum): @@ -66,6 +82,7 @@ class DiagnosticMessagePositiveAckCodes(IntEnum): @unique class DiagnosticMessageNegativeAckCodes(IntEnum): + UNDEFINED = -0x01 InvalidSourceAddress = 0x02 UnknownTargetAddress = 0x03 DiagnosticMessageTooLarge = 0x04 @@ -74,6 +91,20 @@ class DiagnosticMessageNegativeAckCodes(IntEnum): UnknownNetwork = 0x07 TransportProtocolError = 0x08 + @classmethod + def _missing_(cls, value: Any) -> DiagnosticMessageNegativeAckCodes: + return cls.UNDEFINED + + +class DoIPNegativeAckError(BrokenPipeError): + nack_code: DiagnosticMessageNegativeAckCodes + + def __init__(self, negative_ack_code: int): + self.nack_code = DiagnosticMessageNegativeAckCodes(negative_ack_code) + super().__init__( + f"DoIP negative ACK received: {self.nack_code.name} ({negative_ack_code})" + ) + @unique class GenericHeaderNACKCodes(IntEnum): @@ -101,10 +132,9 @@ class TimingAndCommunicationParameters(IntEnum): @dataclass class GenericHeader: - ProtocolVersion: ProtocolVersions - PayloadType: PayloadTypes + ProtocolVersion: int + PayloadType: int PayloadLength: int - PayloadTypeSpecificMessageContent: bytes def pack(self) -> bytes: return struct.pack( @@ -127,9 +157,8 @@ def unpack(cls, data: bytes) -> GenericHeader: raise ValueError("inverse protocol_version is invalid") return cls( protocol_version, - PayloadTypes(payload_type), + payload_type, payload_length, - b"", ) @@ -155,7 +184,7 @@ def pack(self) -> bytes: class RoutingActivationResponse: SourceAddress: int TargetAddress: int - RoutingActivationResponseCode: RoutingActivationResponseCodes + RoutingActivationResponseCode: int Reserved: int = 0x00000000 # Not used, default value. # OEMReserved uint32 @@ -348,6 +377,10 @@ async def _read_worker(self) -> None: await self._read_queue.put((hdr, data)) except asyncio.CancelledError: self.logger.debug("read worker cancelled") + except asyncio.IncompleteReadError as e: + self.logger.debug(f"read worker received EOF: {e}") + except Exception as e: + self.logger.critical(f"read worker died with {type(e)}: {e}") async def read_frame_unsafe(self) -> DoIPFrame: # Avoid waiting on the queue forever when @@ -361,20 +394,29 @@ async def read_frame(self) -> DoIPFrame: return await self.read_frame_unsafe() async def read_diag_request_raw(self) -> DoIPDiagFrame: + unexpected_packets: list[tuple[Any, Any]] = [] while True: hdr, payload = await self.read_frame() if not isinstance(payload, DiagnosticMessage): - raise BrokenPipeError(f"unexpected DoIP message: {hdr} {payload}") - if payload.SourceAddress != self.target_addr: self.logger.warning( - f"unexpected DoIP src address: {payload.SourceAddress:#04x}" + f"expected DoIP DiagnosticMessage, instead got: {hdr} {payload}" ) + unexpected_packets.append((hdr, payload)) continue - if payload.TargetAddress != self.src_addr: + if ( + payload.SourceAddress != self.target_addr + or payload.TargetAddress != self.src_addr + ): self.logger.warning( - f"unexpected DoIP target address: {payload.TargetAddress:#04x}" + f"DoIP-DiagnosticMessage: unexpected addresses (src:dst); expected {self.src_addr}:{self.target_addr} but got: {payload.SourceAddress:#04x}:{payload.TargetAddress:#04x}" ) + unexpected_packets.append((hdr, payload)) continue + + # Do not consume unexpected packets, but re-add them to the queue for other consumers + for item in unexpected_packets: + await self._read_queue.put(item) + return hdr, payload async def read_diag_request(self) -> bytes: @@ -382,36 +424,26 @@ async def read_diag_request(self) -> bytes: return payload.UserData async def _read_ack(self, prev_data: bytes) -> None: + unexpected_packets: list[tuple[Any, Any]] = [] while True: hdr, payload = await self.read_frame_unsafe() - if isinstance(payload, DiagnosticMessageNegativeAcknowledgement): - if ( - payload.ACKCode - == DiagnosticMessageNegativeAckCodes.TargetUnreachable - ): - # TargetUnreachable can be just a temporary issue. Thus, we do not raise - # BrokenPipeError but instead ignore it here and let upper layers handle - # missing responses - self.logger.warning("DoIP message was ACKed with TargetUnreachable") - else: - raise BrokenPipeError( - f"request denied: {hdr} {payload} {payload.ACKCode.value}" - ) - elif not isinstance(payload, DiagnosticMessagePositiveAcknowledgement): + if not isinstance( + payload, DiagnosticMessagePositiveAcknowledgement + ) and not isinstance(payload, DiagnosticMessageNegativeAcknowledgement): self.logger.warning( - f"unexpected DoIP message: {hdr} {payload}, expected positive ACK" + f"expected DoIP positive/negative ACK, instead got: {hdr} {payload}" ) + unexpected_packets.append((hdr, payload)) continue - if payload.SourceAddress != self.target_addr: - self.logger.warning( - f"ack: unexpected src_addr: {payload.SourceAddress:#04x}" - ) - continue - if payload.TargetAddress != self.src_addr: + if ( + payload.SourceAddress != self.target_addr + or payload.TargetAddress != self.src_addr + ): self.logger.warning( - f"ack: unexpected dst_addr: {payload.TargetAddress:#04x}" + f"DoIP-ACK: unexpected addresses (src:dst); expected {self.src_addr}:{self.target_addr} but got: {payload.SourceAddress:#04x}:{payload.TargetAddress:#04x}" ) + unexpected_packets.append((hdr, payload)) continue if ( len(payload.PreviousDiagnosticMessageData) > 0 @@ -419,31 +451,42 @@ async def _read_ack(self, prev_data: bytes) -> None: ): self.logger.warning("ack: previous data differs from request") self.logger.warning( - f"ack: got: {payload.PreviousDiagnosticMessageData.hex()} expected {prev_data.hex()}" + f"DoIP-ACK: got: {payload.PreviousDiagnosticMessageData.hex()} expected {prev_data.hex()}" ) + unexpected_packets.append((hdr, payload)) continue + + # Do not consume unexpected packets, but re-add them to the queue for other consumers + for item in unexpected_packets: + await self._read_queue.put(item) + + if isinstance(payload, DiagnosticMessageNegativeAcknowledgement): + raise DoIPNegativeAckError(payload.ACKCode) return async def _read_routing_activation_response(self) -> None: - hdr, payload = await self.read_frame_unsafe() - if hdr.PayloadType != PayloadTypes.RoutingActivationResponse or not isinstance( - payload, RoutingActivationResponse - ): - raise BrokenPipeError(f"unexpected DoIP message: {hdr} {payload}") - - if ( - payload.RoutingActivationResponseCode - != RoutingActivationResponseCodes.Success - ): - try: - code = RoutingActivationResponseCodes( + unexpected_packets: list[tuple[Any, Any]] = [] + while True: + hdr, payload = await self.read_frame_unsafe() + if not isinstance(payload, RoutingActivationResponse): + self.logger.warning( + f"expected DoIP RoutingActivationResponse, instead got: {hdr} {payload}" + ) + unexpected_packets.append((hdr, payload)) + continue + + # Do not consume unexpected packets, but re-add them to the queue for other consumers + for item in unexpected_packets: + await self._read_queue.put(item) + + if ( + payload.RoutingActivationResponseCode + != RoutingActivationResponseCodes.Success + ): + raise DoIPRoutingActivationDeniedError( payload.RoutingActivationResponseCode ) - except ValueError as e: - raise ConnectionAbortedError( - f"unknown routing_activation_response_code: {payload.RoutingActivationResponseCode}" - ) from e - raise ConnectionAbortedError(f"routing activation denied: {code.name}") + return async def write_request_raw(self, hdr: GenericHeader, payload: DoIPOutData) -> None: async with self._mutex: @@ -481,7 +524,6 @@ async def write_diag_request(self, data: bytes) -> None: ProtocolVersion=ProtocolVersions.ISO_13400_2_2012, PayloadType=PayloadTypes.DiagnosticMessage, PayloadLength=len(data) + 4, - PayloadTypeSpecificMessageContent=b"", ) payload = DiagnosticMessage( SourceAddress=self.src_addr, @@ -498,7 +540,6 @@ async def write_routing_activation_request( ProtocolVersion=ProtocolVersions.ISO_13400_2_2012, PayloadType=PayloadTypes.RoutingActivationRequest, PayloadLength=7, - PayloadTypeSpecificMessageContent=b"", ) payload = RoutingActivationRequest( SourceAddress=self.src_addr, @@ -512,7 +553,6 @@ async def write_alive_check_response(self) -> None: ProtocolVersion=ProtocolVersions.ISO_13400_2_2012, PayloadType=PayloadTypes.AliveCheckResponse, PayloadLength=2, - PayloadTypeSpecificMessageContent=b"", ) payload = AliveCheckResponse( SourceAddress=self.src_addr, @@ -625,8 +665,18 @@ async def write( timeout: float | None = None, tags: list[str] | None = None, ) -> int: - await asyncio.wait_for(self._conn.write_diag_request(data), timeout) - t = tags + ["write"] if tags is not None else ["write"] self.logger.trace(data.hex(), extra={"tags": t}) + + try: + await asyncio.wait_for(self._conn.write_diag_request(data), timeout) + except DoIPNegativeAckError as e: + if e.nack_code != DiagnosticMessageNegativeAckCodes.TargetUnreachable: + raise e + # TargetUnreachable can be just a temporary issue. Thus, we do not raise + # BrokenPipeError but instead ignore it here and let upper layers handle + # missing responses (i.e. raise a TimeoutError instead) + self.logger.debug("DoIP message was ACKed with TargetUnreachable") + raise asyncio.TimeoutError from e + return len(data)