-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
executable file
·71 lines (57 loc) · 2.42 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#!/usr/bin/env python3
import argparse
import time
import socket
import sys
from threading import Thread
import threading
from tcp_server import tcp_server
from button import button
from imu.mock_imu import poll_imu_sensor_data as mock_imu_poll_imu_sensor_data
from imu.MPU9250 import poll_imu_sensor_data as MPU9250_poll_imu_sensor_data
parser = argparse.ArgumentParser(description='Start the IMU polling and TCP server.')
parser.add_argument('--imu', action='store', type=str, default='MPU9250')
parser.add_argument('--filter_type', action='store', type=str, default='kalman')
parser.add_argument('--ip', action='store', type=str, default='127.0.0.1')
parser.add_argument('--port', action='store', type=int, default=5005)
parser.add_argument('--button_1_gpio', action='store', type=int, default=18)
parser.add_argument('--button_2_gpio', action='store', type=int, default=23)
parser.add_argument('--poll_frequency', action='store', type=float, default=0.01666)
parser.add_argument('--is_verbose', action='store', type=bool, default=False)
args = parser.parse_args()
print('=== IMU Server ===')
print('IMU: {}'.format(args.imu))
print('Filter: {}'.format(args.filter_type))
print('')
run_event = threading.Event()
run_event.set()
tcp_server_thread = Thread(target=tcp_server, args=(run_event, args.is_verbose, args.poll_frequency, args.ip, args.port))
tcp_server_thread.start()
button_1_thread = Thread(target=button, args=(run_event, args.poll_frequency, args.button_1_gpio, 1))
button_1_thread.start()
button_2_thread = Thread(target=button, args=(run_event, args.poll_frequency, args.button_2_gpio, 2))
button_2_thread.start()
print('')
poll_imu_sensor_data = None
if args.imu == 'MPU9250':
poll_imu_sensor_data = MPU9250_poll_imu_sensor_data
elif args.imu == 'mock_imu':
poll_imu_sensor_data = mock_imu_poll_imu_sensor_data
if poll_imu_sensor_data != None:
poll_imu_sensor_data_thread = Thread(target=poll_imu_sensor_data, args=(run_event, args.poll_frequency, False, args.filter_type))
poll_imu_sensor_data_thread.start()
if poll_imu_sensor_data_thread == None:
print('No IMU module found for: ' + args.imu)
sys.exit()
try:
while 1:
time.sleep(.1)
except KeyboardInterrupt:
print('Attempting to close threads.')
run_event.clear()
tcp_server_thread.join()
button_1_thread.join()
button_2_thread.join()
poll_imu_sensor_data_thread.join()
print('Threads successfully closed.')
sys.exit()