-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmcp2515_testing.py
110 lines (91 loc) · 4.04 KB
/
mcp2515_testing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import time
from can_data_structures import *
from mcp2515 import *
def send_can_message(mcp2515_instance):
TX_BUFFER_NUMBER = 0
tx_buffer = CANMessageBuffer()
tx_buffer.set_id(0x201)
tx_buffer.set_id_extension(False)
tx_buffer.set_remote_transmission(False)
tx_buffer.set_data_bytes([0x01, 0x04, 0x09, 0x10])
mcp2515_instance.load_tx_buffer(tx_buffer, can_message_buffer_number=TX_BUFFER_NUMBER)
#mcp2515_instance.send_tx_buffers(buffer2=True)
mcp2515_instance.send_tx_buffer_with_priority(buffer_number=TX_BUFFER_NUMBER, priority=3)
while True:
read_byte = mcp2515_instance.read_bytes(0x30, 1)
print(f"TX CNTRL = {read_byte}")
read_byte = mcp2515_instance.read_bytes(0x2C, 1)
print(f"CANINTF = {read_byte}")
read_byte = mcp2515_instance.read_bytes(0x0E, 1)
print(f"CAN Status Register = {read_byte}")
read_byte = mcp2515_instance.read_bytes(0x2D, 1)
print(f"Error Status Register = {read_byte}")
time.sleep(.1)
mcp2515_instance.wait_until_tx_message_success(TX_BUFFER_NUMBER)
print("Message Transmission Success")
def receive_can_message(mcp2515_instance):
rx_buffer_number = 0
# Configure RX Control Register
mcp2515_instance.configure_rx_buffer(rx_buffer_number, is_filters_enabled=True, buffer_rollover=False)
# Loop to continuously read messages from an RX Buffer
while(True):
print("Waiting on Message Reception...")
mcp2515_instance.wait_until_message_received(rx_buffer_number)
print("Message Received!")
# Read Message from RX Buffer
rx_buffer0_bytes_read = mcp2515_instance.read_rx_buffer(rx_buffer_number, expected_num_data_bytes=4)
rx_buffer0 = mcp2515_instance.interpret_rx_buffer(rx_buffer0_bytes_read)
print(f"CAN ID = {rx_buffer0.get_id()}")
print(f"IDE = {rx_buffer0.get_id_extension()}")
print(f"RTR = {rx_buffer0.get_remote_transmission()}")
print(f"Number of Data Bytes = {rx_buffer0.get_num_data_bytes()}")
print(f"Data = {rx_buffer0.get_data_bytes()}")
def initialize_mcp2515(mcp2515_instance, rx):
# Switch to Configuration Mode by resetting the device
mcp2515_instance.reset()
time.sleep(.1)
while (mcp2515_instance.get_operation_mode() != OperationModes.CONFIGURATION):
mcp2515_instance.reset()
time.sleep(.1)
# Can only configure Bit Timing and RX Filters in Configuration Mode
configure_bit_timing(mcp2515_instance)
if rx:
configure_rx_message_mask(mcp2515_instance, mask_number=0)
configure_rx_message_filter(mcp2515_instance, can_stdid=0x212, filter_number=0)
# Switch to Normal Mode
while (mcp2515_instance.get_operation_mode() != OperationModes.NORMAL):
mcp2515_instance.switch_operation_modes(OperationModes.NORMAL)
time.sleep(.1)
def configure_bit_timing(mcp2515_instance):
bit_timing = BitTimingConfiguration()
bit_timing.set_sjw(1)
bit_timing.set_baud_rate_prescalar(1)
bit_timing.set_propagation_segment_length(2 - 1)
bit_timing.set_phase_segment1_length(8 - 1)
bit_timing.set_phase_segment2_length(5 - 1)
bit_timing.set_num_samples_per_bit(1)
mcp2515_instance.configure_bit_timing(bit_timing)
def configure_rx_message_filter(mcp2515_instance, can_stdid, filter_number):
can_rx_message_filter = CANMessageFilter()
can_rx_message_filter.set_id(can_stdid & 0x7FF)
can_rx_message_filter.set_id_extension(False)
can_rx_message_filter.set_data_byte0(0x00)
can_rx_message_filter.set_data_byte1(0x00)
mcp2515_instance.set_can_rx_filter(filter_number, can_rx_message_filter)
def configure_rx_message_mask(mcp2515_instance, mask_number):
can_rx_message_mask = CANMessageMask()
can_rx_message_mask.set_standard_id(0x7FF)
can_rx_message_mask.set_extended_id(0)
can_rx_message_mask.set_id_extension(True)
can_rx_message_mask.set_data_byte0(0x00)
can_rx_message_mask.set_data_byte1(0x00)
mcp2515_instance.set_can_rx_mask(mask_number, can_rx_message_mask)
# Receives a single CAN Message
if __name__ == "__main__":
can_controller = MCP2515(bus_num=0, device_num=0)
can_controller.open_can_connection()
initialize_mcp2515(can_controller, rx=1)
print("Initialization Finished")
# Clear Interrupt Enable Register
can_controller.write_bytes(0x2B, 0x00)
receive_can_message(can_controller)