-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdashboard.py
executable file
·131 lines (97 loc) · 4.03 KB
/
dashboard.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python3
import argparse
import logging
import time
from threading import Thread
from typing import Container
from asciimatics.screen import Screen
from serial import Serial
from crsf_parser import CRSFParser, PacketValidationStatus
from crsf_parser.payloads import PacketsTypes
parser = argparse.ArgumentParser()
parser.add_argument('-b', '--baud', default=425000, type=int)
parser.add_argument('-ss', '--serialsize', default=100, type=int)
parser.add_argument('-be', '--bridgeenabled', default=False, type=bool)
parser.add_argument('-v', '--verbose', default=False, type=bool)
parser.add_argument('-p', '--port', default="/dev/ttyS0", type=str)
parser.add_argument('-ll', '--loglevel', default=20, type=int,
help="default INFO = 20, ERROR = 40, WARN = 30, DEBUG = 10")
parser.add_argument('-lf', '--logfile', default="dashboard.log", type=str)
args = parser.parse_args()
logging.basicConfig(level=args.loglevel, filename=args.logfile,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s [%(threadName)s]')
channels_state = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
running = True
refresh_count = 0
serial_count = 0
buffer = bytearray()
def update_state(frame: Container, status: PacketValidationStatus) -> None:
if status != PacketValidationStatus.VALID: # TODO update valid/invalid counters
return
if frame.header.type != PacketsTypes.RC_CHANNELS_PACKED: # TODO update types counters
return
global channels_state, serial_count
channels_state = frame.payload.channels
serial_count += 1
crsf_parser = CRSFParser(update_state)
def dashboard(screen):
global refresh_count, serial_count, running, buffer, crsf_parser
while True:
crsf_parser.parse_stream(buffer)
refresh_count += 1
screen.print_at(f'CH01:{channels_state[0]:05d} '
f'CH02:{channels_state[1]:05d} '
f'CH03:{channels_state[2]:05d} '
f'CH04:{channels_state[3]:05d} '
f'CH05:{channels_state[4]:05d} '
f'CH06:{channels_state[5]:05d} '
f'CH07:{channels_state[6]:05d} '
f'CH08:{channels_state[7]:05d} '
f'CH09:{channels_state[8]:05d} '
f'CH10:{channels_state[9]:05d} '
f'CH11:{channels_state[10]:05d} '
f'CH12:{channels_state[11]:05d} '
f'CH13:{channels_state[12]:05d} '
f'CH14:{channels_state[13]:05d} '
f'CH15:{channels_state[14]:05d} '
f'CH16:{channels_state[15]:05d}'
, 0, 0)
screen.print_at(f'refresh count: {refresh_count}'
, 0, 1)
screen.print_at(f'serial read iterations: {serial_count}'
, 0, 2)
ev = screen.get_key()
if ev in (ord('Q'), ord('q')):
running = False
return
if not running:
return
screen.refresh()
time.sleep(0.1)
def monitor_serial():
global running, buffer
try:
logging.info("Start serial thread")
with Serial(args.port, args.baud) as ser:
logging.info(f'Opened serial {args.port} at baud {args.baud}')
while True:
if not running:
return
values = ser.read(args.serialsize)
if args.bridgeenabled:
ser.write(values)
buffer.extend(values)
# if args.verbose:
# stats = crsf_parser.get_stats()
# logging.info("stats: ", stats)
except Exception:
running = False
logging.exception("error in monitor serial thread")
logging.info("Starting...")
serial_thread = Thread(target=monitor_serial, name="serial")
serial_thread.start()
Screen.wrapper(dashboard)
logging.info("Shutdown...")
serial_thread.join(10)
logging.info("Bye-bye")
exit(0)