From 4d5ffbe51113b5867ab5b3e17c4b9f05c688f1df Mon Sep 17 00:00:00 2001 From: Tobias Specht Date: Thu, 11 Jan 2024 13:13:18 +0100 Subject: [PATCH] chore(testing): Add basic unit test for Services Scanner --- src/gallia/commands/scan/uds/services.py | 9 +- tests/test_scan_services.py | 215 +++++++++++++++++++++++ 2 files changed, 220 insertions(+), 4 deletions(-) create mode 100644 tests/test_scan_services.py diff --git a/src/gallia/commands/scan/uds/services.py b/src/gallia/commands/scan/uds/services.py index da5f73c15..62a1b80fb 100644 --- a/src/gallia/commands/scan/uds/services.py +++ b/src/gallia/commands/scan/uds/services.py @@ -80,10 +80,10 @@ def configure_parser(self) -> None: async def main(self, args: Namespace) -> None: self.result: list[tuple[int, int]] = [] self.ecu.max_retry = 1 - found: dict[int, dict[int, Any]] = {} + self.found: dict[int, dict[int, Any]] = {} if args.sessions is None: - found[0] = await self.perform_scan(args) + self.found[0] = await self.perform_scan(args) else: sessions = [ s @@ -117,11 +117,11 @@ async def main(self, args: Namespace) -> None: logger.result(f"scanning in session {g_repr(session)}") - found[session] = await self.perform_scan(args, session) + self.found[session] = await self.perform_scan(args, session) await self.ecu.leave_session(session) - for key, value in found.items(): + for key, value in self.found.items(): logger.result(f"findings in session 0x{key:02X}:") for sid, data in value.items(): self.result.append((key, sid)) @@ -163,6 +163,7 @@ async def perform_scan( ) except asyncio.TimeoutError: logger.info(f"{g_repr(sid)}: timeout") + result[sid] = "TimeoutError" continue except MalformedResponse as e: logger.warning( diff --git a/tests/test_scan_services.py b/tests/test_scan_services.py new file mode 100644 index 000000000..888752e13 --- /dev/null +++ b/tests/test_scan_services.py @@ -0,0 +1,215 @@ +# SPDX-FileCopyrightText: AISEC Pentesting Team +# +# SPDX-License-Identifier: Apache-2.0 + +from collections.abc import Awaitable, Callable +from dataclasses import dataclass +from enum import Enum +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from gallia.commands.scan.uds.services import ServicesScanner +from gallia.log import get_logger +from gallia.services.uds import UDSErrorCodes, UDSIsoServices +from gallia.services.uds.server import RandomUDSServer, UDSServerTransport +from gallia.transports import TargetURI + +logger = get_logger("pytest") + + +@dataclass +class Args: + target = TargetURI("unix-lines://tmp/mock.socket") + power_supply = None + power_cycle = False + dumpcap = False + oem = "default" + timeout = 1 + max_retries = 3 + ecu_reset = None + ping = True + tester_present = True + tester_present_interval = 0.1 + properties = False + sessions = [0x01] + skip = {0x01: list(set(range(0xFF)).difference({0x27}))} + check_session = True + scan_response_ids = False + + +class Action(Enum): + PASS = 1 + RAISE = 2 + TIMEOUT = 4 + + +class ScanServicesMock: + hook_request: Callable[ + [bytes, bytes, list[str] | None], Awaitable[tuple[Action, Any]] + ] | None = None + req_pdu = b"00" + args = Args() + scanner = None + + async def init(self) -> None: + self.scanner = ServicesScanner(None) # type: ignore + self.server = RandomUDSServer(1) + self.transport = UDSServerTransport(self.server, None) # type: ignore + await self.server.setup() + + async def mock_read( + self, + timeout: float | None = None, + tags: list[str] | None = None, + ) -> bytes: + logger.debug("read out of request_unsafe") + data = bytes( + [ + UDSIsoServices.NegativeResponse, + self.req_pdu[0], + UDSErrorCodes.serviceNotSupported, + ] + ) + return await self.send_response(self.req_pdu, data, tags) + + async def send_response( + self, req: bytes, res: bytes, tags: list[str] | None + ) -> bytes: + action = Action.PASS + arg = res + if self.hook_request: + action, arg = await self.hook_request(req, res, tags) + + match action: + case Action.PASS: + logger.debug(f"forward response: {req.hex()} -> {arg.hex()}") + return arg + case Action.TIMEOUT: + logger.debug(f"simulate timeout on request: {req.hex()}") + raise TimeoutError + case Action.RAISE: + assert isinstance(arg, Exception) + logger.debug(f"simulate exception on request: {req.hex()} -> {arg!r}") + raise arg + + async def mock_write( + self, + data: bytes, + timeout: float | None = None, + tags: list[str] | None = None, + ) -> int: + logger.debug(f"write out of request_unsafe: {data.hex()}") + self.req_pdu = data + return len(data) + + async def mock_close(self) -> None: + logger.debug("mock_close") + + async def mock_reconnect(self, timeout: float | None = None) -> AsyncMock: + logger.debug("mock_reconnect") + return await self.mock_connect(self.args.target) + + async def mock_request_unsafe( + self, data: bytes, timeout: float | None = None, tags: list[str] | None = None + ) -> bytes: + res, duration = await self.transport.handle_request(data) + if not res: + raise TimeoutError + return await self.send_response(data, res, tags) + + async def mock_connect( + self, target: str | TargetURI, timeout: float | None = None + ) -> AsyncMock: + con = AsyncMock() + con.close.side_effect = self.mock_close + con.write.side_effect = self.mock_write + con.read.side_effect = self.mock_read + con.reconnect.side_effect = self.mock_reconnect + con.request_unsafe.side_effect = self.mock_request_unsafe + return con + + @pytest.mark.asyncio + @pytest.fixture + @patch("gallia.transports.unix.UnixTransport.connect") + @patch("gallia.command.uds.UDSScanner.configure_class_parser") + @patch("gallia.commands.scan.uds.services.ServicesScanner.configure_parser") + async def setup( + self, + mock_configure_parser: MagicMock, + mock_configure_class_parser: MagicMock, + mock_transport: MagicMock, + ) -> None: + await self.init() + mock_transport.side_effect = self.mock_connect + await self.scanner.setup(self.args) # type: ignore + + @pytest.mark.asyncio + @pytest.fixture + async def teardown(self) -> None: + await self.scanner.teardown(self.args) # type: ignore + + +class TestScanServices(ScanServicesMock): + @pytest.mark.asyncio + async def test_1(self, setup: None, teardown: None) -> None: + """Basic test of Service Scan against vECU. + Scanner should scan Session 1 and skip all SIDs expect 0x27. + """ + assert self.scanner is not None + await self.scanner.main(self.args) # type: ignore + + assert len(self.scanner.found) == 1 + assert len(self.scanner.found[1]) == 1 + assert 0x27 in self.scanner.found[1] + + @pytest.mark.asyncio + async def test_2(self, setup: None, teardown: None) -> None: + """Test TimeoutError on Service Scan. + On SID 0x27 a TimeoutError is simulated. + The Scanner should return this in the result. + """ + assert self.scanner is not None + + async def handle( + req: bytes, res: bytes, tags: list[str] | None + ) -> tuple[Action, Any]: + if req[0] == 0x27: + return Action.TIMEOUT, None + return Action.PASS, res + + self.hook_request = handle + + await self.scanner.main(self.args) # type: ignore + + assert self.scanner.found[1][0x27] == TimeoutError.__name__ + + @pytest.mark.asyncio + async def test_3(self, setup: None, teardown: None) -> None: + """Test NegativeResponse on Service Scan. + On SID 0x27 the NRC brakeSwitchNotClosed is simulated. + The Scanner should return this NRC in the result. + """ + assert self.scanner is not None + + async def handle( + req: bytes, res: bytes, tags: list[str] | None + ) -> tuple[Action, Any]: + if req[0] == 0x27: + return Action.PASS, bytes( + [ + UDSIsoServices.NegativeResponse, + 0x27, + UDSErrorCodes.brakeSwitchNotClosed, + ] + ) + return Action.PASS, res + + self.hook_request = handle + + await self.scanner.main(self.args) # type: ignore + + assert ( + self.scanner.found[1][0x27].response_code + == UDSErrorCodes.brakeSwitchNotClosed + )