From d4ba0019f41ce61a5f2f4e80ab2cee1d887560d2 Mon Sep 17 00:00:00 2001 From: Funth0mas <43621609+Funth0mas@users.noreply.github.com> Date: Sun, 10 Mar 2019 20:18:06 +0100 Subject: [PATCH] Annotate and fix 'join' module. Update CanId. (#314) * Annotate and fix 'join' module. Add factory for CanId. --- examples/exampleJoin.py | 4 +- src/canmatrix/canmatrix.py | 16 ++++--- src/canmatrix/join.py | 65 +++++++++++++++------------ src/canmatrix/tests/test_canmatrix.py | 4 +- src/canmatrix/utils.py | 2 + 5 files changed, 51 insertions(+), 40 deletions(-) diff --git a/examples/exampleJoin.py b/examples/exampleJoin.py index 73e01cd3..fdd9c834 100644 --- a/examples/exampleJoin.py +++ b/examples/exampleJoin.py @@ -1,11 +1,11 @@ #!/usr/bin/env python3 import canmatrix.formats -from canmatrix.join import join_frame_by_signal_startbit +from canmatrix.join import join_frame_by_signal_start_bit files = ["../test/db_B.dbc", "../test/db_A.dbc"] -target = join_frame_by_signal_startbit(files) +target = join_frame_by_signal_start_bit(files) # # export the new (target)-Matrix for example as .dbc: diff --git a/src/canmatrix/canmatrix.py b/src/canmatrix/canmatrix.py index fd756d64..088af779 100644 --- a/src/canmatrix/canmatrix.py +++ b/src/canmatrix/canmatrix.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- # Copyright (c) 2013, Eduard Broecker # All rights reserved. @@ -564,6 +565,7 @@ def pack_bitstring(length, is_float, value, signed): return bitstring + @attr.s(cmp=False) class ArbitrationId(object): standard_id_mask = ((1 << 11) - 1) @@ -659,7 +661,7 @@ def is_multiplexed(self): # type: () -> bool @property def pgn(self): # type: () -> int - return CanId(self.arbitration_id.id).pgn + return CanId(self.arbitration_id).pgn @pgn.setter def pgn(self, value): # type: (int) -> None @@ -1802,16 +1804,16 @@ class CanId(object): """ Split Id into Global source addresses (source, destination) off ECU and PGN (SAE J1939). """ - # TODO link to BoardUnit/ECU + # TODO link to ECU source = None # type: int # Source Address destination = None # type: int # Destination Address pgn = None # type: int # PGN - def __init__(self, id, extended=True): # type: (int, bool) -> None - if extended: - self.source = id & int('0xFF', 16) - self.pgn = (id >> 8) & int('0xFFFF', 16) - self.destination = id >> 8 * 3 & int('0xFF', 16) + def __init__(self, arbitration_id): # type: (ArbitrationId) -> None + if arbitration_id.extended: + self.source = arbitration_id.id & int('0xFF', 16) + self.pgn = (arbitration_id.id >> 8) & int('0xFFFF', 16) + self.destination = arbitration_id.id >> 8 * 3 & int('0xFF', 16) else: # TODO implement for standard Id pass diff --git a/src/canmatrix/join.py b/src/canmatrix/join.py index 04169450..87828130 100644 --- a/src/canmatrix/join.py +++ b/src/canmatrix/join.py @@ -1,65 +1,72 @@ +# -*- coding: utf-8 -*- + +import typing + +import canmatrix import canmatrix.formats -from canmatrix.canmatrix import CanId def list_pgn(db): + # type: (canmatrix.CanMatrix) -> typing.Tuple[typing.List[int], typing.List[canmatrix.ArbitrationId]] """ + Get all PGN values for given frame. - :param db: - :return: pgn and id + :param db: CanMatrix database + :return: tuple of [pgn] and [arbitration_id] """ - msg_id = [x.id for x in db.frames] - r = [CanId(t).tuples() for t in msg_id] - return [t[1] for t in r], msg_id + id_list = [x.arbitration_id for x in db.frames] + pgn_list = [canmatrix.CanId(arb_id).pgn for arb_id in id_list] + return pgn_list, id_list def ids_sharing_same_pgn(id_x, pgn_x, id_y, pgn_y): - for idx, pgnx in zip(id_x, pgn_x): - for idy, pgny in zip(id_y, pgn_y): - if pgnx == pgny: - yield (idx, idy) + # type: (typing.Sequence[canmatrix.ArbitrationId], typing.Sequence[int], typing.Sequence[canmatrix.ArbitrationId], typing.Sequence[int]) -> typing.Iterable[typing.Tuple[canmatrix.ArbitrationId, canmatrix.ArbitrationId]] + """Yield arbitration ids which has the same pgn.""" + for id_a, pgn_a in zip(id_x, pgn_x): + for id_b, pgn_b in zip(id_y, pgn_y): + if pgn_a == pgn_b: + yield (id_a, id_b) -def join_frame_by_signal_startbit(files): - target_db = next(iter(canmatrix.formats.loadp(files.pop(0)).values())) +def join_frame_by_signal_start_bit(files): # type: (typing.List[str]) -> canmatrix.CanMatrix + target_db = next(iter(canmatrix.formats.loadp(files.pop(0)).values())) # type: canmatrix.CanMatrix pgn_x, id_x = list_pgn(db=target_db) for f in files: - source_db = next(iter(canmatrix.formats.loadp(f).values())) + source_db = next(iter(canmatrix.formats.loadp(f).values())) # type: canmatrix.CanMatrix pgn_y, id_y = list_pgn(db=source_db) same_pgn = ids_sharing_same_pgn(id_x, pgn_x, id_y, pgn_y) - for idx, idy in same_pgn: - # print("{0:#x} {1:#x}".format(idx, idy)) - target_fr = target_db.frame_by_id(idx) - source_fr = source_db.frame_by_id(idy) + for id_a, id_b in same_pgn: + # print("{0:#x} {1:#x}".format(id_x, id_y)) + target_fr = target_db.frame_by_id(id_a) + source_fr = source_db.frame_by_id(id_b) - to_add = [] + signal_to_add = [] # type: typing.List[canmatrix.Signal] for sig_t in target_fr.signals: for sig_s in source_fr.signals: # print(sig.name) - if sig_t.startbit == sig_s.startbit: + if sig_t.start_bit == sig_s.start_bit: # print("\t{0} {1}".format(sig_t.name, sig_s.name)) - to_add.append(sig_s) - for s in to_add: + signal_to_add.append(sig_s) + for s in signal_to_add: target_fr.add_signal(s) - return target_db -def rename_frame_with_id(source_db): +def rename_frame_with_id(source_db): # type: (canmatrix.CanMatrix) -> None for frameSc in source_db.frames: - _, pgn, sa = CanId(frameSc.id).tuples() + _, pgn, sa = canmatrix.CanId(frameSc.arbitration_id).tuples() - exten = "__{pgn:#04X}_{sa:#02X}_{sa:03d}d".format(pgn=pgn, sa=sa) - new_name = frameSc.name + exten + extension = "__{pgn:#04X}_{sa:#02X}_{sa:03d}d".format(pgn=pgn, sa=sa) + new_name = frameSc.name + extension # print(new_name) frameSc.name = new_name -def rename_frame_with_sae_acronym(source_db, target_db): +def rename_frame_with_sae_acronym(source_db, target_db): # type: (canmatrix.CanMatrix, canmatrix.CanMatrix) -> None pgn_x, id_x = list_pgn(db=target_db) pgn_y, id_y = list_pgn(db=source_db) same_pgn = ids_sharing_same_pgn(id_x, pgn_x, id_y, pgn_y) @@ -72,7 +79,7 @@ def rename_frame_with_sae_acronym(source_db, target_db): target_fr.name = new_name -def join_frame_for_manufacturer(db, files): +def join_frame_for_manufacturer(db, files): # type: (canmatrix.CanMatrix, typing.Sequence[str]) -> None # target_db = next(iter(im.importany(files.pop(0)).values())) pgn_x, id_x = list_pgn(db=db) @@ -88,7 +95,7 @@ def join_frame_for_manufacturer(db, files): target_fr = db.frame_by_id(idx) source_fr = source_db.frame_by_id(idy) - _, pgn, sa = CanId(target_fr.id).tuples() + _, pgn, sa = canmatrix.CanId(target_fr.arbitration_id).tuples() if sa < 128: print('less', target_fr.name) to_add = [] diff --git a/src/canmatrix/tests/test_canmatrix.py b/src/canmatrix/tests/test_canmatrix.py index 1c0774d9..2e668f8a 100644 --- a/src/canmatrix/tests/test_canmatrix.py +++ b/src/canmatrix/tests/test_canmatrix.py @@ -680,7 +680,7 @@ def test_define_for_float(): # CanId tests def test_canid_parse_values(): - can_id = canmatrix.canmatrix.CanId(0x01ABCD02) + can_id = canmatrix.canmatrix.CanId(canmatrix.ArbitrationId(id=0x01ABCD02, extended=True)) assert can_id.source == 0x02 assert can_id.destination == 0x01 assert can_id.pgn == 0xABCD @@ -688,7 +688,7 @@ def test_canid_parse_values(): def test_canid_repr(): - can_id = canmatrix.canmatrix.CanId(0x01ABCD02) + can_id = canmatrix.canmatrix.CanId(canmatrix.ArbitrationId(id=0x01ABCD02, extended=True)) assert str(can_id) == "DA:0x01 PGN:0xABCD SA:0x02" diff --git a/src/canmatrix/utils.py b/src/canmatrix/utils.py index 4738e57b..2d8f67a0 100644 --- a/src/canmatrix/utils.py +++ b/src/canmatrix/utils.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- + import csv import shlex import sys