From 8f5f189eec02d74e3dbaa391b17a09e1117b51ce Mon Sep 17 00:00:00 2001 From: Mateusz Michalek Date: Fri, 9 Aug 2024 12:16:40 +0200 Subject: [PATCH] scripts: imgtool: compression incomplete compression draft Signed-off-by: Mateusz Michalek --- scripts/imgtool/image.py | 36 ++++++++++++++++++++++------ scripts/imgtool/main.py | 52 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 78 insertions(+), 10 deletions(-) diff --git a/scripts/imgtool/image.py b/scripts/imgtool/image.py index a30d53bf2..ab942de5e 100644 --- a/scripts/imgtool/image.py +++ b/scripts/imgtool/image.py @@ -21,6 +21,7 @@ """ import hashlib +import array import os.path import struct from enum import Enum @@ -58,6 +59,8 @@ 'NON_BOOTABLE': 0x0000010, 'RAM_LOAD': 0x0000020, 'ROM_FIXED': 0x0000100, + 'COMPRESSED_LZMA1': 0x0000200, + 'COMPRESSED_LZMA2': 0x0000400, } TLV_VALUES = { @@ -76,6 +79,9 @@ 'DEPENDENCY': 0x40, 'SEC_CNT': 0x50, 'BOOT_RECORD': 0x60, + 'COMP_SIZE': 0x70, + 'COMP_SHA': 0x71, + 'COMP_SIGNATURE': 0x72, } TLV_SIZE = 4 @@ -164,6 +170,8 @@ def __init__(self, version=None, header_size=IMAGE_HEADER_SIZE, if load_addr and rom_fixed: raise click.UsageError("Can not set rom_fixed and load_addr at the same time") + self.image_hash = None + self.image_size = None self.version = version or versmod.decode_version("0") self.header_size = header_size self.pad_header = pad_header @@ -239,6 +247,7 @@ def load(self, path): self.payload = f.read() except FileNotFoundError: raise click.UsageError("Input file not found") + self.image_size = len(self.payload) # Add the image header if needed. if self.pad_header and self.header_size > 0: @@ -333,8 +342,8 @@ def ecies_hkdf(self, enckey, plainkey): return cipherkey, ciphermac, pubk def create(self, key, public_key_format, enckey, dependencies=None, - sw_type=None, custom_tlvs=None, encrypt_keylen=128, clear=False, - fixed_sig=None, pub_key=None, vector_to_sign=None): + sw_type=None, custom_tlvs=None, compression_tlvs=None, compression_type=None, encrypt_keylen=128, + clear=False, fixed_sig=None, pub_key=None, vector_to_sign=None): self.enckey = enckey # Check what hashing algorithm should be used @@ -400,6 +409,9 @@ def create(self, key, public_key_format, enckey, dependencies=None, dependencies_num = len(dependencies[DEP_IMAGES_KEY]) protected_tlv_size += (dependencies_num * 16) + if compression_tlvs is not None: + for value in compression_tlvs.values(): + protected_tlv_size += TLV_SIZE + len(value) if custom_tlvs is not None: for value in custom_tlvs.values(): protected_tlv_size += TLV_SIZE + len(value) @@ -421,11 +433,15 @@ def create(self, key, public_key_format, enckey, dependencies=None, else: self.payload.extend(pad) + compression_flags = 0x0 + if compression_tlvs is not None: + if compression_type == "lzma2": + compression_flags = IMAGE_F['COMPRESSED_LZMA2'] # This adds the header to the payload as well if encrypt_keylen == 256: - self.add_header(enckey, protected_tlv_size, 256) + self.add_header(enckey, protected_tlv_size, compression_flags, 256) else: - self.add_header(enckey, protected_tlv_size) + self.add_header(enckey, protected_tlv_size, compression_flags) prot_tlv = TLV(self.endian, TLV_PROT_INFO_MAGIC) @@ -455,10 +471,12 @@ def create(self, key, public_key_format, enckey, dependencies=None, ) prot_tlv.add('DEPENDENCY', payload) + if compression_tlvs is not None: + for tag, value in compression_tlvs.items(): + prot_tlv.add(tag, value) if custom_tlvs is not None: for tag, value in custom_tlvs.items(): prot_tlv.add(tag, value) - protected_tlv_off = len(self.payload) self.payload += prot_tlv.get() @@ -470,6 +488,7 @@ def create(self, key, public_key_format, enckey, dependencies=None, sha.update(self.payload) digest = sha.digest() tlv.add(hash_tlv, digest) + self.image_hash = digest if vector_to_sign == 'payload': # Stop amending data to the image @@ -549,10 +568,13 @@ def create(self, key, public_key_format, enckey, dependencies=None, self.check_trailer() + def get_struct_endian(self): + return STRUCT_ENDIAN_DICT[self.endian] + def get_signature(self): return self.signature - def add_header(self, enckey, protected_tlv_size, aes_length=128): + def add_header(self, enckey, protected_tlv_size, compression_flags, aes_length=128): """Install the image header.""" flags = 0 @@ -588,7 +610,7 @@ def add_header(self, enckey, protected_tlv_size, aes_length=128): protected_tlv_size, # TLV Info header + # Protected TLVs len(self.payload) - self.header_size, # ImageSz - flags, + flags | compression_flags, self.version.major, self.version.minor or 0, self.version.revision or 0, diff --git a/scripts/imgtool/main.py b/scripts/imgtool/main.py index f70a8bf51..aa0460810 100755 --- a/scripts/imgtool/main.py +++ b/scripts/imgtool/main.py @@ -22,6 +22,11 @@ import getpass import imgtool.keys as keys import sys +import subprocess +import struct +import os +import lzma +import hashlib import base64 from imgtool import image, imgtool_version from imgtool.version import decode_version @@ -340,6 +345,10 @@ def convert(self, value, param, ctx): type=click.Choice(['128', '256']), help='When encrypting the image using AES, select a 128 bit or ' '256 bit key len.') +@click.option('--compression', default='lzma2', + type=click.Choice(['disabled', 'lzma1', 'lzma2']), + help='Try image compression. If it deflates image, use ' + 'compressed one.') @click.option('-c', '--clear', required=False, is_flag=True, default=False, help='Output a non-encrypted image with encryption capabilities,' 'so it can be installed in the primary slot, and encrypted ' @@ -408,7 +417,7 @@ def convert(self, value, param, ctx): .hex extension, otherwise binary format is used''') def sign(key, public_key_format, align, version, pad_sig, header_size, pad_header, slot_size, pad, confirm, max_sectors, overwrite_only, - endian, encrypt_keylen, encrypt, infile, outfile, dependencies, + endian, encrypt_keylen, encrypt, compression, infile, outfile, dependencies, load_addr, hex_addr, erased_val, save_enctlv, security_counter, boot_record, custom_tlv, rom_fixed, max_align, clear, fix_sig, fix_sig_pubkey, sig_out, vector_to_sign): @@ -424,6 +433,7 @@ def sign(key, public_key_format, align, version, pad_sig, header_size, endian=endian, load_addr=load_addr, rom_fixed=rom_fixed, erased_val=erased_val, save_enctlv=save_enctlv, security_counter=security_counter, max_align=max_align) + compression_tlvs = {} img.load(infile) key = load_key(key) if key else None enckey = load_key(encrypt) if encrypt else None @@ -477,10 +487,46 @@ def sign(key, public_key_format, align, version, pad_sig, header_size, } img.create(key, public_key_format, enckey, dependencies, boot_record, - custom_tlvs, int(encrypt_keylen), clear, baked_signature, + custom_tlvs, compression_tlvs, int(encrypt_keylen), clear, baked_signature, pub_key, vector_to_sign) - img.save(outfile, hex_addr) + if re.match('.*\.bin$', infile) is not None : + if compression == "lzma2" : + compressed_img = image.Image(version=decode_version(version), header_size=header_size, + pad_header=pad_header, pad=pad, confirm=confirm, + align=int(align), slot_size=slot_size, + max_sectors=max_sectors, overwrite_only=overwrite_only, + endian=endian, load_addr=load_addr, rom_fixed=rom_fixed, + erased_val=erased_val, save_enctlv=save_enctlv, + security_counter=security_counter, max_align=max_align) + compressed_file = re.sub('zephyr\.bin$','zephyr.compressed.bin',infile) + compression_filters = [ + {"id": lzma.FILTER_LZMA1, "dict_size": 131072}, + {"id": lzma.FILTER_LZMA1, "lp": 1}, + {"id": lzma.FILTER_LZMA1, "lc": 3}, + {"id": lzma.FILTER_LZMA1, "preset": 9}, + ] + in_fd=open(infile, "rb") + with lzma.open(compressed_file, "w", format=lzma.FORMAT_ALONE) as out_fd: + out_fd.write(in_fd.read()) + in_fd.close() + uncompressed_size = os.path.getsize(infile) + compressed_size = os.path.getsize(compressed_file) + print("deflated ", compressed_size, "/", uncompressed_size) + if compressed_size < uncompressed_size: + print("loading " + compressed_file) + compressed_img.load(compressed_file) + compression_tlvs["COMP_SIZE"] = struct.pack(img.get_struct_endian() + 'L', img.image_size) + compression_tlvs["COMP_SHA"] = img.image_hash + compression_tlvs["COMP_SIGNATURE"] = img.signature + compressed_img.create(key, public_key_format, enckey, dependencies, boot_record, + custom_tlvs, compression_tlvs, compression, int(encrypt_keylen), clear, baked_signature, + pub_key, vector_to_sign) + img = compressed_img + else: + print("ommiting compression because it did nothing") + + img.save(outfile, hex_addr) if sig_out is not None: new_signature = img.get_signature() save_signature(sig_out, new_signature)