Skip to content

Commit

Permalink
scripts: imgtool: compression
Browse files Browse the repository at this point in the history
incomplete compression draft

Signed-off-by: Mateusz Michalek <mateusz.michalek@nordicsemi.no>
  • Loading branch information
michalek-no committed Aug 27, 2024
1 parent 15b63e0 commit 8f5f189
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 10 deletions.
36 changes: 29 additions & 7 deletions scripts/imgtool/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"""

import hashlib
import array
import os.path
import struct
from enum import Enum
Expand Down Expand Up @@ -58,6 +59,8 @@
'NON_BOOTABLE': 0x0000010,
'RAM_LOAD': 0x0000020,
'ROM_FIXED': 0x0000100,
'COMPRESSED_LZMA1': 0x0000200,
'COMPRESSED_LZMA2': 0x0000400,
}

TLV_VALUES = {
Expand All @@ -76,6 +79,9 @@
'DEPENDENCY': 0x40,
'SEC_CNT': 0x50,
'BOOT_RECORD': 0x60,
'COMP_SIZE': 0x70,
'COMP_SHA': 0x71,
'COMP_SIGNATURE': 0x72,
}

TLV_SIZE = 4
Expand Down Expand Up @@ -164,6 +170,8 @@ def __init__(self, version=None, header_size=IMAGE_HEADER_SIZE,
if load_addr and rom_fixed:
raise click.UsageError("Can not set rom_fixed and load_addr at the same time")

self.image_hash = None
self.image_size = None
self.version = version or versmod.decode_version("0")
self.header_size = header_size
self.pad_header = pad_header
Expand Down Expand Up @@ -239,6 +247,7 @@ def load(self, path):
self.payload = f.read()
except FileNotFoundError:
raise click.UsageError("Input file not found")
self.image_size = len(self.payload)

# Add the image header if needed.
if self.pad_header and self.header_size > 0:
Expand Down Expand Up @@ -333,8 +342,8 @@ def ecies_hkdf(self, enckey, plainkey):
return cipherkey, ciphermac, pubk

def create(self, key, public_key_format, enckey, dependencies=None,
sw_type=None, custom_tlvs=None, encrypt_keylen=128, clear=False,
fixed_sig=None, pub_key=None, vector_to_sign=None):
sw_type=None, custom_tlvs=None, compression_tlvs=None, compression_type=None, encrypt_keylen=128,
clear=False, fixed_sig=None, pub_key=None, vector_to_sign=None):
self.enckey = enckey

# Check what hashing algorithm should be used
Expand Down Expand Up @@ -400,6 +409,9 @@ def create(self, key, public_key_format, enckey, dependencies=None,
dependencies_num = len(dependencies[DEP_IMAGES_KEY])
protected_tlv_size += (dependencies_num * 16)

if compression_tlvs is not None:
for value in compression_tlvs.values():
protected_tlv_size += TLV_SIZE + len(value)
if custom_tlvs is not None:
for value in custom_tlvs.values():
protected_tlv_size += TLV_SIZE + len(value)
Expand All @@ -421,11 +433,15 @@ def create(self, key, public_key_format, enckey, dependencies=None,
else:
self.payload.extend(pad)

compression_flags = 0x0
if compression_tlvs is not None:
if compression_type == "lzma2":
compression_flags = IMAGE_F['COMPRESSED_LZMA2']
# This adds the header to the payload as well
if encrypt_keylen == 256:
self.add_header(enckey, protected_tlv_size, 256)
self.add_header(enckey, protected_tlv_size, compression_flags, 256)
else:
self.add_header(enckey, protected_tlv_size)
self.add_header(enckey, protected_tlv_size, compression_flags)

prot_tlv = TLV(self.endian, TLV_PROT_INFO_MAGIC)

Expand Down Expand Up @@ -455,10 +471,12 @@ def create(self, key, public_key_format, enckey, dependencies=None,
)
prot_tlv.add('DEPENDENCY', payload)

if compression_tlvs is not None:
for tag, value in compression_tlvs.items():
prot_tlv.add(tag, value)
if custom_tlvs is not None:
for tag, value in custom_tlvs.items():
prot_tlv.add(tag, value)

protected_tlv_off = len(self.payload)
self.payload += prot_tlv.get()

Expand All @@ -470,6 +488,7 @@ def create(self, key, public_key_format, enckey, dependencies=None,
sha.update(self.payload)
digest = sha.digest()
tlv.add(hash_tlv, digest)
self.image_hash = digest

if vector_to_sign == 'payload':
# Stop amending data to the image
Expand Down Expand Up @@ -549,10 +568,13 @@ def create(self, key, public_key_format, enckey, dependencies=None,

self.check_trailer()

def get_struct_endian(self):
return STRUCT_ENDIAN_DICT[self.endian]

def get_signature(self):
return self.signature

def add_header(self, enckey, protected_tlv_size, aes_length=128):
def add_header(self, enckey, protected_tlv_size, compression_flags, aes_length=128):
"""Install the image header."""

flags = 0
Expand Down Expand Up @@ -588,7 +610,7 @@ def add_header(self, enckey, protected_tlv_size, aes_length=128):
protected_tlv_size, # TLV Info header +
# Protected TLVs
len(self.payload) - self.header_size, # ImageSz
flags,
flags | compression_flags,
self.version.major,
self.version.minor or 0,
self.version.revision or 0,
Expand Down
52 changes: 49 additions & 3 deletions scripts/imgtool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import getpass
import imgtool.keys as keys
import sys
import subprocess
import struct
import os
import lzma
import hashlib
import base64
from imgtool import image, imgtool_version
from imgtool.version import decode_version
Expand Down Expand Up @@ -340,6 +345,10 @@ def convert(self, value, param, ctx):
type=click.Choice(['128', '256']),
help='When encrypting the image using AES, select a 128 bit or '
'256 bit key len.')
@click.option('--compression', default='lzma2',
type=click.Choice(['disabled', 'lzma1', 'lzma2']),
help='Try image compression. If it deflates image, use '
'compressed one.')
@click.option('-c', '--clear', required=False, is_flag=True, default=False,
help='Output a non-encrypted image with encryption capabilities,'
'so it can be installed in the primary slot, and encrypted '
Expand Down Expand Up @@ -408,7 +417,7 @@ def convert(self, value, param, ctx):
.hex extension, otherwise binary format is used''')
def sign(key, public_key_format, align, version, pad_sig, header_size,
pad_header, slot_size, pad, confirm, max_sectors, overwrite_only,
endian, encrypt_keylen, encrypt, infile, outfile, dependencies,
endian, encrypt_keylen, encrypt, compression, infile, outfile, dependencies,
load_addr, hex_addr, erased_val, save_enctlv, security_counter,
boot_record, custom_tlv, rom_fixed, max_align, clear, fix_sig,
fix_sig_pubkey, sig_out, vector_to_sign):
Expand All @@ -424,6 +433,7 @@ def sign(key, public_key_format, align, version, pad_sig, header_size,
endian=endian, load_addr=load_addr, rom_fixed=rom_fixed,
erased_val=erased_val, save_enctlv=save_enctlv,
security_counter=security_counter, max_align=max_align)
compression_tlvs = {}
img.load(infile)
key = load_key(key) if key else None
enckey = load_key(encrypt) if encrypt else None
Expand Down Expand Up @@ -477,10 +487,46 @@ def sign(key, public_key_format, align, version, pad_sig, header_size,
}

img.create(key, public_key_format, enckey, dependencies, boot_record,
custom_tlvs, int(encrypt_keylen), clear, baked_signature,
custom_tlvs, compression_tlvs, int(encrypt_keylen), clear, baked_signature,
pub_key, vector_to_sign)
img.save(outfile, hex_addr)

if re.match('.*\.bin$', infile) is not None :
if compression == "lzma2" :
compressed_img = image.Image(version=decode_version(version), header_size=header_size,
pad_header=pad_header, pad=pad, confirm=confirm,
align=int(align), slot_size=slot_size,
max_sectors=max_sectors, overwrite_only=overwrite_only,
endian=endian, load_addr=load_addr, rom_fixed=rom_fixed,
erased_val=erased_val, save_enctlv=save_enctlv,
security_counter=security_counter, max_align=max_align)
compressed_file = re.sub('zephyr\.bin$','zephyr.compressed.bin',infile)
compression_filters = [
{"id": lzma.FILTER_LZMA1, "dict_size": 131072},
{"id": lzma.FILTER_LZMA1, "lp": 1},
{"id": lzma.FILTER_LZMA1, "lc": 3},
{"id": lzma.FILTER_LZMA1, "preset": 9},
]
in_fd=open(infile, "rb")
with lzma.open(compressed_file, "w", format=lzma.FORMAT_ALONE) as out_fd:
out_fd.write(in_fd.read())
in_fd.close()
uncompressed_size = os.path.getsize(infile)
compressed_size = os.path.getsize(compressed_file)
print("deflated ", compressed_size, "/", uncompressed_size)
if compressed_size < uncompressed_size:
print("loading " + compressed_file)
compressed_img.load(compressed_file)
compression_tlvs["COMP_SIZE"] = struct.pack(img.get_struct_endian() + 'L', img.image_size)
compression_tlvs["COMP_SHA"] = img.image_hash
compression_tlvs["COMP_SIGNATURE"] = img.signature
compressed_img.create(key, public_key_format, enckey, dependencies, boot_record,
custom_tlvs, compression_tlvs, compression, int(encrypt_keylen), clear, baked_signature,
pub_key, vector_to_sign)
img = compressed_img
else:
print("ommiting compression because it did nothing")

img.save(outfile, hex_addr)
if sig_out is not None:
new_signature = img.get_signature()
save_signature(sig_out, new_signature)
Expand Down

0 comments on commit 8f5f189

Please sign in to comment.