forked from u-blox/ubxlib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathautomation.py
303 lines (265 loc) · 12.4 KB
/
automation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
from os import environ
from invoke import task, Exit
from pathlib import PurePath
from scripts import u_utils, u_data, u_connection, u_select, u_report
from scripts import u_run_log, u_run_windows, u_run_lint, u_run_doxygen, u_run_astyle
from scripts import u_run_pylint, u_run_static_size, u_run_no_floating_point
from scripts.packages import u_package
from scripts.u_logging import ULog
from . import nrf5, esp_idf, nrfconnect, stm32cubef4, arduino
from enum import Enum
import sys
import json
DATABASE = PurePath(u_utils.AUTOMATION_DIR, "DATABASE.md").as_posix()
# The environment variable that may contain some defines
# we should use
UBXLIB_DEFINES_VAR = "U_UBXLIB_DEFINES"
class Command(Enum):
BUILD = 1
FLASH = 2
LOG = 3
TEST = 4
def eprint(*args, **kwargs):
"""Helper function for writing to stderr"""
print(*args, file=sys.stderr, **kwargs)
def parse_instance(str):
try:
# Convert a string like "13.0.0" to a Python int list: [13,0,0]
instance = list(map(int, str.split('.')))
except:
raise Exit(f"Invalid instance format: '{str}'")
return instance
def check_return_code(ret):
"""Checks that return code is 0 otherwise an Exit exception is thrown"""
if ret != 0:
raise Exit(f"Return code: '{ret}'")
def instance_command(ctx, instance_str, cmd):
db_data = u_data.get(DATABASE)
instance = parse_instance(instance_str)
# It is sometimes useful for the platform tools to be able
# to detect that they are running under automation (e.g. this
# is used to switch ESP-IDF to using u_runner rather than the
# usual ESP-IDF unit test menu system).
# For this purpose we add ENV_UBXLIB_AUTO to the environment
environ[u_utils.ENV_UBXLIB_AUTO] = "1"
# Read out instance info from DATABASE.md
platform = u_data.get_platform_for_instance(db_data, instance)
if not platform and instance[0] > 9:
raise Exit(f"Unknown platform for: '{instance_str}'")
board = u_data.get_board_for_instance(db_data, instance)
defines = u_data.get_defines_for_instance(db_data, instance)
mcu = u_data.get_mcu_for_instance(db_data, instance)
toolchain = u_data.get_toolchain_for_instance(db_data, instance)
description = u_data.get_description_for_instance(db_data, instance)
# Defines may be provided via an environment
# variable, in a list separated with semicolons, e.g.:
# set U_UBXLIB_DEFINES=THING_1;ANOTHER_THING=123;ONE_MORE=boo
# Add these in.
if UBXLIB_DEFINES_VAR in environ and environ[UBXLIB_DEFINES_VAR].strip():
defines.extend(environ[UBXLIB_DEFINES_VAR].strip().split(";"))
# Merge in any filter string we might have
if (cmd == Command.BUILD or cmd == Command.TEST) and ctx.filter:
defines = u_utils.merge_filter(defines, ctx.filter)
# For ESP targets: Check if RTS and DTR should be set when opening log UART
monitor_dtr_rts_on = None
if u_utils.MONITOR_DTR_RTS_OFF_MARKER in defines:
monitor_dtr_rts_on = False
# Get debugger serial
connection = u_connection.get_connection(instance)
serial = ""
if connection and "debugger" in connection and connection["debugger"]:
serial = connection["debugger"]
platform = platform.lower()
if platform == "nrf5sdk":
nrf5.check_installation(ctx)
if cmd == Command.BUILD:
nrf5.build(ctx, output_name="", build_dir=ctx.build_dir, u_flags=defines)
elif cmd == Command.FLASH:
nrf5.flash(ctx, output_name="", build_dir=ctx.build_dir, debugger_serial=serial)
elif cmd == Command.LOG:
nrf5.log(ctx, debugger_serial=serial)
elif cmd == Command.TEST:
check_return_code(u_run_log.run(instance, ctx.reporter, ctx.test_report))
elif platform == "zephyr":
nrfconnect.check_installation(ctx)
if cmd == Command.BUILD:
nrfconnect.build(ctx, board_name=board, output_name="", build_dir=ctx.build_dir, u_flags=defines)
elif cmd == Command.FLASH:
hex_file = None
if mcu.lower() == "nrf5340":
# The hex file to flash is specified in zephyr/runners.yml for zephyr and usually set to "zephyr.hex".
# For MCUs with multiple cores such as nRF5340 merged_domains.hex is used instead which includes
# multiple firmwares. The problem in this case that when merged_domains.hex is used it is specified
# as a full absoulte path. This becomes a problem in the Jenkins case where the firmware is built
# on one machine and flashed on another. For this reason we will pass the hex path manually here.
hex_file = f"{ctx.build_dir}/zephyr/merged_domains.hex"
nrfconnect.flash(ctx, output_name="", build_dir=ctx.build_dir, debugger_serial=serial, hex_file=hex_file)
elif cmd == Command.LOG:
nrfconnect.log(ctx, debugger_serial=serial)
elif cmd == Command.TEST:
check_return_code(u_run_log.run(instance, ctx.reporter, ctx.test_report))
elif platform == "esp-idf":
esp_idf.check_installation(ctx)
if cmd == Command.BUILD:
esp_idf.build(ctx, output_name="", build_dir=ctx.build_dir, u_flags=defines)
elif cmd == Command.FLASH:
esp_idf.flash(ctx, serial_port=connection["serial_port"],
output_name="", build_dir=ctx.build_dir,
use_flasher_json=True)
elif cmd == Command.LOG:
esp_idf.log(ctx, serial_port=connection["serial_port"],
dtr_state=monitor_dtr_rts_on,
rts_state=monitor_dtr_rts_on)
elif cmd == Command.TEST:
check_return_code(u_run_log.run(instance, ctx.reporter, ctx.test_report))
elif platform == "stm32cube":
stm32cubef4.check_installation(ctx)
if cmd == Command.BUILD:
stm32cubef4.build(ctx, output_name="", build_dir=ctx.build_dir, u_flags=defines)
elif cmd == Command.FLASH:
stm32cubef4.flash(ctx, output_name="", build_dir=ctx.build_dir, debugger_serial=serial)
elif cmd == Command.LOG:
port = instance[0] + 40404
stm32cubef4.log(ctx, debugger_serial=serial, port=port)
elif cmd == Command.TEST:
check_return_code(u_run_log.run(instance, ctx.reporter, ctx.test_report))
elif platform == "arduino":
arduino.check_installation(ctx)
if cmd == Command.BUILD:
arduino.build(ctx, libraries_dir=f"{ctx.build_dir}/libraries", board=board,
output_name="", build_dir=ctx.build_dir, u_flags=defines)
elif cmd == Command.FLASH:
arduino.flash(ctx, serial_port=connection["serial_port"], board=board,
output_name="app", build_dir=ctx.build_dir)
elif cmd == Command.LOG:
port = instance[0] + 40404
arduino.log(ctx, serial_port=connection["serial_port"],
dtr_state=monitor_dtr_rts_on,
rts_state=monitor_dtr_rts_on)
elif cmd == Command.TEST:
check_return_code(u_run_log.run(instance, ctx.reporter, ctx.test_report))
elif platform == "windows":
if cmd == Command.TEST:
check_return_code(u_run_windows.run(instance, toolchain, connection,
None, False, defines,
ctx.reporter, ctx.test_report,
None))
else:
raise Exit(f"Unsupported command for platform: '{platform}'")
elif instance[0] < 10:
# Handle Lint, AStyle and so on...
if cmd != Command.TEST:
raise Exit(f"'{description}' only supports 'test' command")
if instance[0] == 0:
return_code = u_run_lint.run(defines, u_utils.UBXLIB_DIR, ctx.reporter, None)
elif instance[0] == 1:
return_code = u_run_doxygen.run(u_utils.UBXLIB_DIR, ctx.reporter)
elif instance[0] == 2:
return_code = u_run_astyle.run(u_utils.UBXLIB_DIR, ctx.reporter)
elif instance[0] == 3:
return_code = u_run_pylint.run(u_utils.UBXLIB_DIR, ctx.reporter)
elif instance[0] == 4:
return_code = u_run_static_size.run(defines, u_utils.UBXLIB_DIR, ctx.reporter)
elif instance[0] == 5:
return_code = u_run_no_floating_point.run(defines, u_utils.UBXLIB_DIR, ctx.reporter)
elif instance[0] >= 6 and instance[0] <= 9:
raise Exit(f"Instance {instance_str} reserved, nothing to do.")
check_return_code(return_code)
else:
raise Exit(f"Unsupported platform: '{platform}'")
@task()
def export(ctx):
"""Output the u_package environment"""
pkg_cfg = u_package.get_u_packages_config(ctx)
for pkg_name in pkg_cfg:
print(f'export U_PKG_{pkg_name.upper()}={pkg_cfg[pkg_name]["package_dir"]}')
@task()
def install_all(ctx):
"""Makes sure all packages are installed"""
pkg_names = []
pkg_cfg = u_package.get_u_packages_config(ctx)
for pkg_name in pkg_cfg:
pkg_names.append(pkg_name)
u_package.load(ctx, pkg_names)
@task()
def build(ctx, instance, build_dir=None, filter=None):
"""Build the firmware for an automation instance"""
if not build_dir:
build_dir = f'_build/automation/{instance}'
ctx.build_dir = build_dir
ctx.filter = filter
instance_command(ctx, instance, Command.BUILD)
@task()
def flash(ctx, instance, build_dir=None):
"""Flash the firmware for an automation instance"""
if not build_dir:
build_dir = f'_build/automation/{instance}'
ctx.build_dir = build_dir
instance_command(ctx, instance, Command.FLASH)
@task()
def log(ctx, instance):
"""Show a real-time log output for an automation instance"""
instance_command(ctx, instance, Command.LOG)
@task()
def test(ctx, instance, summary_file="summary.txt", debug_file="debug.log",
test_report=None, filter=None):
"""Start the tests for an automation instance"""
# The testing phase uses the loggin facility
ULog.setup_logging(debug_file=debug_file)
_instance = parse_instance(instance)
# With a reporter
with open(summary_file, 'w') as summary_handle:
with u_report.ReportToQueue(None, _instance,
summary_handle) as reporter:
ctx.filter = filter
ctx.reporter = reporter
ctx.test_report = test_report
instance_command(ctx, instance, Command.TEST)
@task()
def run(ctx, instance, build_dir=None, summary_file="summary.txt",
debug_file="debug.log", test_report=None, filter=None):
"""This will build, flash and start test in one command"""
build(ctx, instance, build_dir=build_dir, filter=filter)
flash(ctx, instance, build_dir=build_dir)
test(ctx, instance, filter=filter,
summary_file=summary_file, debug_file=debug_file,
test_report=test_report)
@task()
def get_test_selection(ctx, message="", files="", run_everything=False):
# Get the instance DATABASE by parsing the data file
db_data = u_data.get(DATABASE)
instances = []
filter_string = ""
files = files.split(" ")
if run_everything:
# Safety switch has been thrown, run the lot
print(f"run_everything flag set - running everything")
instances = u_data.get_instances_all(db_data)
else:
# Parse the message
found, filter_string = u_utils.commit_message_parse(message, instances)
if found:
if instances and instances[0][0] == "*":
# If there is a user instance, do what we're told
print("running everything ", end="")
if filter_string:
print(f"on API \"{filter_string}\" ", end="")
print("at user request.")
del instances[:]
instances = u_data.get_instances_all(db_data)
else:
# No instance specified by the user, decide what to run
filter_string = u_select.select(db_data, instances, files)
instance_entries = []
for id in instances:
instance_entries.append({
"id": id,
"platform": u_data.get_platform_for_instance(db_data, id),
"description": u_data.get_description_for_instance(db_data, id),
"mcu": u_data.get_mcu_for_instance(db_data, id),
})
json_data = json.dumps({
"filter": filter_string,
"instances": instance_entries
})
print("JSON_DATA: " + json_data)