Skip to content

Commit

Permalink
test xcp: revive xcp test script with CAN support
Browse files Browse the repository at this point in the history
  • Loading branch information
fkglr committed Jun 6, 2023
1 parent 6c76893 commit 2a6364e
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/gallia/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from gallia.commands.primitive.uds.rmba import RMBAPrimitive
from gallia.commands.primitive.uds.rtcl import RTCLPrimitive
from gallia.commands.primitive.uds.send_pdu import SendPDUPrimitive
from gallia.commands.primitive.uds.test_xcp import SimpleTestXCP
from gallia.commands.primitive.uds.vin import VINPrimitive
from gallia.commands.primitive.uds.wmba import WMBAPrimitive
from gallia.commands.primitive.uds.write_by_identifier import WriteByIdentifierPrimitive
Expand Down Expand Up @@ -54,6 +55,7 @@
WMBAPrimitive,
VirtualECU,
WriteByIdentifierPrimitive,
SimpleTestXCP,
]

__all__ = [x.__name__ for x in registry]
31 changes: 28 additions & 3 deletions src/gallia/commands/primitive/uds/test_xcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,46 @@
#
# SPDX-License-Identifier: Apache-2.0

import sys
from argparse import Namespace

from gallia.command import Scanner
from gallia.plugins import load_transport
from gallia.services.xcp import XCPService
from gallia.utils import catch_and_log_exception
from gallia.services.xcp import CANXCPSerivce, XCPService
from gallia.transports.can import ISOTPTransport, RawCANTransport
from gallia.utils import auto_int, catch_and_log_exception


class SimpleTestXCP(Scanner):
"""Test XCP Slave"""

GROUP = "primitive"
COMMAND = "xcp"
SHORT_HELP = "XCP tester"
HAS_ARTIFACTS_DIR = True

def configure_parser(self) -> None:
self.parser.add_argument("--can-master", type=auto_int, default=None)
self.parser.add_argument("--can-slave", type=auto_int, default=None)

async def main(self, args: Namespace) -> None:
transport_type = load_transport(args.target)
transport = await transport_type.connect(args.target)
service = XCPService(transport)
service: XCPService

if isinstance(transport, RawCANTransport):
if args.can_master is None or args.can_slave is None:
self.logger.error(
"For CAN interfaces, master and slave address are required!"
)
sys.exit(1)

service = CANXCPSerivce(transport, args.can_master, args.can_slave)
elif isinstance(transport, ISOTPTransport):
self.logger.error("Use can-raw for CAN interfaces!")
sys.exit(1)
else:
service = XCPService(transport)

await catch_and_log_exception(service.connect)
await catch_and_log_exception(service.get_status)
Expand Down
36 changes: 36 additions & 0 deletions src/gallia/services/xcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from gallia.log import get_logger
from gallia.services.xcp import types
from gallia.transports import BaseTransport
from gallia.transports.can import RawCANTransport


class XCPService:
Expand Down Expand Up @@ -74,3 +75,38 @@ async def upload(self, length: int) -> None:
resp = await self.request(bytes([0xF5, length]))
self.logger.info(resp)
self.logger.result(f"XCP GET_UPLOAD({length} -> OK")


class CANXCPSerivce(XCPService):
def __init__(
self,
transport: RawCANTransport,
master_id: int,
slave_id: int,
timeout: float = 1.0,
):
self.master_id = master_id
self.slave_id = slave_id

super().__init__(transport, timeout)

async def request(self, data: bytes, timeout: float | None = None) -> bytes:
t = timeout if timeout else self.timeout

assert isinstance(self.transport, RawCANTransport)

await self.transport.sendto(data, self.slave_id, t)
while True:
dst_, resp = await self.transport.recvfrom(t)

if dst_ == self.master_id:
break

header = types.Response.parse(resp)
self.logger.info(header)
if int(header.type) != 255:
raise ValueError(
f"Unknown response type: {header.type}, maybe no XCP packet?"
)
# strip header byte
return resp[1:]

0 comments on commit 2a6364e

Please sign in to comment.