-
Notifications
You must be signed in to change notification settings - Fork 4
/
_rpi_boot.py
563 lines (477 loc) · 21.9 KB
/
_rpi_boot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
# Copyright 2022 TIER IV, INC. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Boot control support for Raspberry pi 4 Model B."""
from __future__ import annotations
import contextlib
import logging
import os
import subprocess
from pathlib import Path
from string import Template
from typing import Any, Generator, Literal
from typing_extensions import Self
import otaclient.app.errors as ota_errors
from otaclient.app.boot_control._common import (
CMDHelperFuncs,
OTAStatusFilesControl,
SlotMountHelper,
write_str_to_file_sync,
)
from otaclient.app.boot_control.configs import rpi_boot_cfg as cfg
from otaclient.app.boot_control.protocol import BootControllerProtocol
from otaclient_api.v2 import types as api_types
from otaclient_common.common import replace_atomic
from otaclient_common.linux import subprocess_run_wrapper
from otaclient_common.typing import StrOrPath
logger = logging.getLogger(__name__)
# ------ types ------ #
class SlotID(str):
"""slot_id for A/B slots."""
VALID_SLOTS = ["slot_a", "slot_b"]
def __new__(cls, _in: str | Self) -> Self:
if isinstance(_in, cls):
return _in
if _in in cls.VALID_SLOTS:
return str.__new__(cls, _in)
raise ValueError(f"{_in=} is not valid slot num, should be {cls.VALID_SLOTS=}")
class _RPIBootControllerError(Exception):
"""rpi_boot module internal used exception."""
# ------ consts ------ #
CONFIG_TXT = "config.txt" # primary boot cfg
TRYBOOT_TXT = "tryboot.txt" # tryboot boot cfg
VMLINUZ = "vmlinuz"
INITRD_IMG = "initrd.img"
CMDLINE_TXT = "cmdline.txt"
SYSTEM_BOOT_FSLABEL = "system-boot"
SLOT_A = SlotID("slot_a")
SLOT_B = SlotID("slot_b")
AB_FLIPS = {SLOT_A: SLOT_B, SLOT_B: SLOT_A}
SEP_CHAR = "_"
"""separator between boot files name and slot suffix."""
_FSTAB_TEMPLATE_STR = (
"LABEL=${rootfs_fslabel}\t/\text4\tdiscard,x-systemd.growfs\t0\t1\n"
"LABEL=system-boot\t/boot/firmware\tvfat\tdefaults\t0\t1\n"
)
# ------ helper functions ------ #
BOOTFILES = Literal["vmlinuz", "initrd.img", "config.txt", "tryboot.txt", "cmdline.txt"]
def get_sysboot_files_fpath(boot_fname: BOOTFILES, slot: SlotID) -> Path:
"""Get the boot files fpath for specific slot from /boot/firmware.
For example, for vmlinuz for slot_a, we get /boot/firmware/vmlinuz_slot_a
"""
fname = f"{boot_fname}{SEP_CHAR}{slot}"
return Path(cfg.SYSTEM_BOOT_MOUNT_POINT) / fname
class _RPIBootControl:
"""Boot control helper for rpi4 support.
Supported partition layout:
/dev/sd<x>:
- sd<x>1: fat32, fslabel=systemb-boot
- sd<x>2: ext4, fslabel=slot_a
- sd<x>3: ext4, fslabel=slot_b
slot_id is also the fslabel for each AB rootfs.
NOTE that we allow extra partitions with ID after 3.
Boot files for each slot have the following naming format:
<boot_file_fname><sep_char><slot>
For example, config.txt for slot_a will be config.txt_slot_a
"""
def __init__(self) -> None:
self.system_boot_mp = Path(cfg.SYSTEM_BOOT_MOUNT_POINT)
self.system_boot_mp.mkdir(exist_ok=True)
# sanity check, ensure we are running at raspberry pi device
model_fpath = Path(cfg.RPI_MODEL_FILE)
err_not_rpi_device = f"{cfg.RPI_MODEL_FILE} doesn't exist! Are we running at raspberry pi device?"
if not model_fpath.is_file():
logger.error(err_not_rpi_device)
raise _RPIBootControllerError(err_not_rpi_device)
model_info = model_fpath.read_text()
if model_info.find("Pi") == -1:
raise _RPIBootControllerError(err_not_rpi_device)
logger.info(f"{model_info=}")
try:
# ------ detect active slot ------ #
active_slot_dev = CMDHelperFuncs.get_current_rootfs_dev()
assert active_slot_dev
self.active_slot_dev = active_slot_dev
except Exception as e:
_err_msg = f"failed to detect current rootfs device: {e!r}"
logger.error(_err_msg)
raise _RPIBootControllerError(_err_msg) from e
try:
# detect the parent device of boot device
# i.e., for /dev/sda2 here we get /dev/sda
parent_dev = CMDHelperFuncs.get_parent_dev(str(self.active_slot_dev))
# get device tree, for /dev/sda device, we will get:
# ["/dev/sda", "/dev/sda1", "/dev/sda2", "/dev/sda3"]
device_tree = CMDHelperFuncs.get_device_tree(parent_dev)
logger.info(device_tree)
# NOTE that we allow extra partitions presented after sd<x>3.
assert len(device_tree) >= 4, "need at least 3 partitions"
except Exception as e:
_err_msg = f"failed to detect partition layout: {e!r}"
logger.error(_err_msg)
raise _RPIBootControllerError(_err_msg)
# check system-boot partition mount
system_boot_partition = device_tree[1]
if not CMDHelperFuncs.is_target_mounted(
self.system_boot_mp, raise_exception=False
):
_err_msg = f"system-boot is not mounted at {self.system_boot_mp}, try to mount it..."
logger.warning(_err_msg)
try:
CMDHelperFuncs.mount(
system_boot_partition,
self.system_boot_mp,
options=["defaults"],
)
except subprocess.CalledProcessError as e:
_err_msg = (
f"failed to mount system-boot partition: {e!r}, {e.stderr.decode()}"
)
logger.error(_err_msg)
raise _RPIBootControllerError(_err_msg) from e
# check slots
# sd<x>2 and sd<x>3
rootfs_partitions = device_tree[2:4]
# get the active slot ID by its position in the disk
try:
idx = rootfs_partitions.index(active_slot_dev)
except ValueError:
raise _RPIBootControllerError(
f"active slot dev not found: {active_slot_dev=}, {rootfs_partitions=}"
)
if idx == 0: # slot_a
self.active_slot = SLOT_A
self.standby_slot = SLOT_B
self.standby_slot_dev = rootfs_partitions[1]
elif idx == 1: # slot_b
self.active_slot = SLOT_B
self.standby_slot = SLOT_A
self.standby_slot_dev = rootfs_partitions[0]
logger.info(
f"rpi_boot: active_slot: {self.active_slot}({self.active_slot_dev}), "
f"standby_slot: {self.standby_slot}({self.standby_slot_dev})"
)
# ------ continue rpi_boot starts up ------ #
self._check_boot_files()
self._check_active_slot_id()
# NOTE(20240604): for backward compatibility, always remove flag file
flag_file = Path(cfg.SYSTEM_BOOT_MOUNT_POINT) / cfg.SWITCH_BOOT_FLAG_FILE
flag_file.unlink(missing_ok=True)
def _check_active_slot_id(self):
"""Check whether the active slot fslabel is matching the slot id.
If mismatched, try to correct the problem.
"""
fslabel = self.active_slot
actual_fslabel = CMDHelperFuncs.get_attrs_by_dev(
"LABEL", self.active_slot_dev, raise_exception=False
)
if actual_fslabel == fslabel:
return
logger.warning(
(
f"current active slot is {fslabel}, but its fslabel is {actual_fslabel}, "
f"try to correct the fslabel with slot id {fslabel}..."
)
)
try:
CMDHelperFuncs.set_ext4_fslabel(self.active_slot_dev, fslabel)
os.sync()
except subprocess.CalledProcessError as e:
logger.error(
f"failed to correct the fslabel mismatched: {e!r}, {e.stderr.decode()}"
)
logger.error("this might cause problem on future OTA update!")
def _check_boot_files(self):
"""Check the availability of boot files.
The following boot files will be checked:
1. config.txt_<slot_suffix> (required)
2. cmdline.txt_<slot_suffix> (required)
If any of the required files are missing, BootControlInitError will be raised.
In such case, a reinstall/setup of AB partition boot files is required.
"""
logger.debug("checking boot files...")
active_slot, standby_slot = self.active_slot, self.standby_slot
# ------ check active slot boot files ------ #
config_txt_active_slot = get_sysboot_files_fpath(CONFIG_TXT, active_slot)
if not config_txt_active_slot.is_file():
_err_msg = f"missing {config_txt_active_slot=}"
logger.error(_err_msg)
raise _RPIBootControllerError(_err_msg)
cmdline_txt_active_slot = get_sysboot_files_fpath(CMDLINE_TXT, active_slot)
if not cmdline_txt_active_slot.is_file():
_err_msg = f"missing {cmdline_txt_active_slot=}"
logger.error(_err_msg)
raise _RPIBootControllerError(_err_msg)
# ------ check standby slot boot files ------ #
config_txt_standby_slot = get_sysboot_files_fpath(CONFIG_TXT, standby_slot)
if not config_txt_standby_slot.is_file():
_err_msg = f"missing {config_txt_standby_slot=}"
logger.error(_err_msg)
raise _RPIBootControllerError(_err_msg)
cmdline_txt_standby_slot = get_sysboot_files_fpath(CMDLINE_TXT, standby_slot)
if not cmdline_txt_standby_slot.is_file():
_err_msg = f"missing {cmdline_txt_standby_slot=}"
logger.error(_err_msg)
raise _RPIBootControllerError(_err_msg)
@staticmethod
@contextlib.contextmanager
def _prepare_flash_kernel(target_slot_mp: StrOrPath) -> Generator[None, Any, None]:
"""Do a bind mount of /boot/firmware, /proc and /sys to the standby slot,
preparing for calling flash-kernel with chroot.
flash-kernel requires at least these mounts to work properly.
"""
target_slot_mp = Path(target_slot_mp)
mounts: dict[str, str] = {}
# we need to mount /proc, /sys and /boot/firmware to make flash-kernel works
system_boot_mp = target_slot_mp / Path(cfg.SYSTEM_BOOT_MOUNT_POINT).relative_to(
"/"
)
mounts[str(system_boot_mp)] = cfg.SYSTEM_BOOT_MOUNT_POINT
proc_mp = target_slot_mp / "proc"
mounts[str(proc_mp)] = "/proc"
sys_mp = target_slot_mp / "sys"
mounts[str(sys_mp)] = "/sys"
try:
for _mp, _src in mounts.items():
CMDHelperFuncs.mount(
_src,
_mp,
options=["bind"],
params=["--make-unbindable"],
)
yield
# NOTE: passthrough the mount failure to caller
finally:
for _mp in mounts:
CMDHelperFuncs.umount(_mp, raise_exception=False)
def update_firmware(self, *, target_slot: SlotID, target_slot_mp: StrOrPath):
"""Call flash-kernel to install new dtb files, boot firmwares and kernel, initrd.img
from target slot.
The following things will be done:
1. bind mount the /boot/firmware and /proc into the target slot.
2. chroot into the target slot's rootfs, execute flash-kernel
"""
logger.info(f"try to flash-kernel from {target_slot}...")
try:
with self._prepare_flash_kernel(target_slot_mp):
subprocess_run_wrapper(
["/usr/sbin/flash-kernel"],
check=True,
check_output=True,
chroot=target_slot_mp,
# must set this env variable to make flash-kernel work under chroot
env={"FK_FORCE": "yes"},
)
os.sync()
except subprocess.CalledProcessError as e:
_err_msg = f"flash-kernel failed: {e!r}\nstderr: {e.stderr.decode()}\nstdout: {e.stdout.decode()}"
logger.error(_err_msg)
raise _RPIBootControllerError(_err_msg)
try:
# flash-kernel will install the kernel and initrd.img files from /boot to /boot/firmware
if (vmlinuz := self.system_boot_mp / VMLINUZ).is_file():
os.replace(
vmlinuz,
get_sysboot_files_fpath(VMLINUZ, target_slot),
)
if (initrd_img := self.system_boot_mp / INITRD_IMG).is_file():
os.replace(
initrd_img,
get_sysboot_files_fpath(INITRD_IMG, target_slot),
)
# NOTE(20240603): for backward compatibility(downgrade), still create the flag file.
# The present of flag files means the firmware is updated.
flag_file = Path(cfg.SYSTEM_BOOT_MOUNT_POINT) / cfg.SWITCH_BOOT_FLAG_FILE
flag_file.write_text("")
os.sync()
except Exception as e:
_err_msg = f"failed to apply new kernel,initrd.img for {target_slot}: {e!r}"
logger.error(_err_msg)
raise _RPIBootControllerError(_err_msg)
# exposed API methods/properties
def finalize_switching_boot(self) -> bool:
"""Finalize switching boot by swapping config.txt and tryboot.txt if we should.
Finalize switch boot:
1. atomically replace tryboot.txt with tryboot.txt_<standby_slot_id>
2. atomically replace config.txt with config.txt_<active_slot_id>
Returns:
A bool indicates whether the switch boot succeeded or not. Note that no exception
will be raised if finalizing failed.
"""
logger.info("finalizing switch boot...")
current_slot, standby_slot = self.active_slot, self.standby_slot
config_txt_current = get_sysboot_files_fpath(CONFIG_TXT, current_slot)
config_txt_standby = get_sysboot_files_fpath(CONFIG_TXT, standby_slot)
try:
replace_atomic(config_txt_current, self.system_boot_mp / CONFIG_TXT)
replace_atomic(config_txt_standby, self.system_boot_mp / TRYBOOT_TXT)
logger.info(
"finalizing boot configuration,"
f"replace {CONFIG_TXT} with {config_txt_current=}, "
f"replace {TRYBOOT_TXT} with {config_txt_standby=}"
)
# on success switching boot, cleanup the bak files created by flash-kernel
for _bak_file in self.system_boot_mp.glob("**/*.bak"):
_bak_file.unlink(missing_ok=True)
return True
except Exception as e:
_err_msg = f"failed to finalize boot switching: {e!r}"
logger.error(_err_msg)
return False
def prepare_tryboot_txt(self):
"""Copy the standby slot's config.txt as tryboot.txt."""
try:
replace_atomic(
get_sysboot_files_fpath(CONFIG_TXT, self.standby_slot),
self.system_boot_mp / TRYBOOT_TXT,
)
logger.info(f"set {TRYBOOT_TXT} as {self.standby_slot}'s one")
except Exception as e:
_err_msg = f"failed to prepare tryboot.txt for {self.standby_slot}"
logger.error(_err_msg)
raise _RPIBootControllerError(_err_msg) from e
def reboot_tryboot(self):
"""Reboot with tryboot flag."""
logger.info(f"tryboot reboot to standby slot({self.standby_slot})...")
try:
# NOTE: "0 tryboot" is a single param.
CMDHelperFuncs.reboot(args=["0 tryboot"])
except Exception as e:
_err_msg = "failed to reboot"
logger.exception(_err_msg)
raise _RPIBootControllerError(_err_msg) from e
class RPIBootController(BootControllerProtocol):
"""BootControllerProtocol implementation for rpi4 support."""
def __init__(self) -> None:
try:
self._rpiboot_control = _RPIBootControl()
# mount point prepare
self._mp_control = SlotMountHelper(
standby_slot_dev=self._rpiboot_control.standby_slot_dev,
standby_slot_mount_point=cfg.MOUNT_POINT,
active_slot_dev=self._rpiboot_control.active_slot_dev,
active_slot_mount_point=cfg.ACTIVE_ROOT_MOUNT_POINT,
)
# init ota-status files
self._ota_status_control = OTAStatusFilesControl(
active_slot=self._rpiboot_control.active_slot,
standby_slot=self._rpiboot_control.standby_slot,
current_ota_status_dir=Path(cfg.ACTIVE_ROOTFS_PATH)
/ Path(cfg.OTA_STATUS_DIR).relative_to("/"),
# NOTE: might not yet be populated before OTA update applied!
standby_ota_status_dir=Path(cfg.MOUNT_POINT)
/ Path(cfg.OTA_STATUS_DIR).relative_to("/"),
finalize_switching_boot=self._rpiboot_control.finalize_switching_boot,
)
logger.info("rpi_boot starting finished")
except Exception as e:
_err_msg = f"failed to start rpi boot controller: {e!r}"
logger.error(_err_msg)
raise ota_errors.BootControlStartupFailed(_err_msg, module=__name__) from e
def _write_standby_fstab(self):
"""Override the standby's fstab file.
The fstab file will contain 2 lines, one line for mounting rootfs,
another line is for mounting system-boot partition.
NOTE: slot id is the fslabel for slot's rootfs
"""
logger.debug("update standby slot fstab file...")
try:
_fstab_fpath = self._mp_control.standby_slot_mount_point / Path(
cfg.FSTAB_FPATH
).relative_to("/")
_updated_fstab_str = Template(_FSTAB_TEMPLATE_STR).substitute(
rootfs_fslabel=self._rpiboot_control.standby_slot
)
logger.debug(f"{_updated_fstab_str=}")
write_str_to_file_sync(_fstab_fpath, _updated_fstab_str)
except Exception as e:
_err_msg = f"failed to update fstab file for standby slot: {e!r}"
logger.error(_err_msg)
raise _RPIBootControllerError(_err_msg) from e
# APIs
def get_standby_slot_path(self) -> Path:
return self._mp_control.standby_slot_mount_point
def get_standby_boot_dir(self) -> Path:
return self._mp_control.standby_boot_dir
def pre_update(self, version: str, *, standby_as_ref: bool, erase_standby: bool):
try:
logger.info("rpi_boot: pre-update setup...")
### udpate active slot's ota_status ###
self._ota_status_control.pre_update_current()
### mount slots ###
self._mp_control.prepare_standby_dev(
erase_standby=erase_standby,
fslabel=self._rpiboot_control.standby_slot,
)
self._mp_control.mount_standby()
self._mp_control.mount_active()
### update standby slot's ota_status files ###
self._ota_status_control.pre_update_standby(version=version)
except Exception as e:
_err_msg = f"failed on pre_update: {e!r}"
logger.error(_err_msg)
raise ota_errors.BootControlPreUpdateFailed(
_err_msg, module=__name__
) from e
def pre_rollback(self):
try:
logger.info("rpi_boot: pre-rollback setup...")
self._ota_status_control.pre_rollback_current()
self._mp_control.mount_standby()
self._ota_status_control.pre_rollback_standby()
except Exception as e:
_err_msg = f"failed on pre_rollback: {e!r}"
logger.error(_err_msg)
raise ota_errors.BootControlPreRollbackFailed(
_err_msg, module=__name__
) from e
def post_rollback(self):
try:
logger.info("rpi_boot: post-rollback setup...")
self._rpiboot_control.prepare_tryboot_txt()
self._mp_control.umount_all(ignore_error=True)
self._rpiboot_control.reboot_tryboot()
except Exception as e:
_err_msg = f"failed on post_rollback: {e!r}"
logger.error(_err_msg)
raise ota_errors.BootControlPostRollbackFailed(
_err_msg, module=__name__
) from e
def post_update(self) -> Generator[None, None, None]:
try:
logger.info("rpi_boot: post-update setup...")
self._mp_control.preserve_ota_folder_to_standby()
self._write_standby_fstab()
self._rpiboot_control.update_firmware(
target_slot=self._rpiboot_control.standby_slot,
target_slot_mp=self._mp_control.standby_slot_mount_point,
)
self._rpiboot_control.prepare_tryboot_txt()
self._mp_control.umount_all(ignore_error=True)
yield # hand over control back to otaclient
self._rpiboot_control.reboot_tryboot()
except Exception as e:
_err_msg = f"failed on post_update: {e!r}"
logger.error(_err_msg)
raise ota_errors.BootControlPostUpdateFailed(
_err_msg, module=__name__
) from e
def on_operation_failure(self):
"""Failure registering and cleanup at failure."""
logger.warning("on failure try to unmounting standby slot...")
self._ota_status_control.on_failure()
self._mp_control.umount_all(ignore_error=True)
def load_version(self) -> str:
return self._ota_status_control.load_active_slot_version()
def get_booted_ota_status(self) -> api_types.StatusOta:
return self._ota_status_control.booted_ota_status