Skip to content

Commit

Permalink
examples/suit_update: improve test script
Browse files Browse the repository at this point in the history
- Verify smaller image sequence numbers are rejected
- Verify invalid signatures are rejected
  • Loading branch information
fjmolinas committed Dec 2, 2019
1 parent b88e2ec commit 09a5282
Showing 1 changed file with 61 additions and 19 deletions.
80 changes: 61 additions & 19 deletions examples/suit_update/tests/01-run.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ def notify(coap_server, client_url, version=None):
assert not subprocess.call(cmd)


def publish(server_dir, server_url, app_ver, latest_name=None):
def publish(server_dir, server_url, app_ver, keys='default', latest_name=None):
cmd = [
"make",
"suit/publish",
"SUIT_COAP_FSROOT={}".format(server_dir),
"SUIT_COAP_SERVER={}".format(server_url),
"APP_VER={}".format(app_ver),
"RIOTBOOT_SKIP_COMPILE=1",
"SUIT_KEY={}".format(keys),
]
if latest_name is not None:
cmd.append("SUIT_MANIFEST_SIGNED_LATEST={}".format(latest_name))
Expand All @@ -73,6 +74,7 @@ def wait_for_update(child):


def get_ipv6_addr(child):
child.expect_exact('>')
child.sendline('ifconfig')
if USE_ETHOS == 0:
# Get device global address
Expand Down Expand Up @@ -111,28 +113,45 @@ def ping6(client):
return ping_ok


def testfunc(child):
"""For one board test if specified application is updatable"""
def get_reachable_addr(child):
# Wait for suit_coap thread to start
child.expect_exact("suit_coap: started.")
child.expect_exact("Starting the shell")
# give some time for the network interface to be configured
time.sleep(1)
# Get address
client_addr = get_ipv6_addr(child)
# Verify address is reachable
ping6(client_addr)
return "[{}]".format(client_addr)

# Initial Setup and wait for address configuration
child.expect_exact("main(): This is RIOT!")

def app_version(child):
# get version of currently running image
# "Image Version: 0x00000000"
child.expect(r"Image Version: (?P<app_ver>0x[0-9a-fA-F:]+)\r\n")
current_app_ver = int(child.match.group("app_ver"), 16)

for version in [current_app_ver + 1, current_app_ver + 2]:
child.expect_exact("suit_coap: started.")
child.expect_exact("Starting the shell")
# give some time for the network interface to be configured
time.sleep(1)
# Get address, if using ethos it will change on each reboot
client_addr = get_ipv6_addr(child)
client = "[{}]".format(client_addr)
# Wait for suit_coap thread to start
# Ping6
ping6(client_addr)
app_ver = int(child.match.group("app_ver"), 16)
return app_ver


def _test_invalid_version(child, client, app_ver):
publish(TMPDIR.name, COAP_HOST, app_ver - 1)
notify(COAP_HOST, client, app_ver - 1)
child.expect_exact("suit_coap: trigger received")
child.expect_exact("suit: verifying manifest signature...")
child.expect_exact("seq_nr <= running image")


def _test_invalid_signature(child, client, app_ver):
publish(TMPDIR.name, COAP_HOST, app_ver + 1, 'invalid_keys')
notify(COAP_HOST, client, app_ver + 1)
child.expect_exact("suit_coap: trigger received")
child.expect_exact("suit: verifying manifest signature...")
child.expect_exact("Unable to validate signature")


def _test_successful_update(child, client, app_ver):
for version in [app_ver + 1, app_ver + 2]:
# Trigger update process, verify it validates manifest correctly
publish(TMPDIR.name, COAP_HOST, version)
notify(COAP_HOST, client, version)
Expand All @@ -150,6 +169,30 @@ def testfunc(child):
# Verify running slot
child.expect(r"running from slot (\d+)\r\n")
assert target_slot == int(child.match.group(1)), "BOOTED FROM SAME SLOT"
# Verify client is reachable and get address
client = get_reachable_addr(child)


def testfunc(child):
# Get current app_ver
current_app_ver = app_version(child)
# Verify client is reachable and get address
client = get_reachable_addr(child)

def run(func):
if child.logfile == sys.stdout:
func(child, client, current_app_ver)
else:
try:
func(child, client, current_app_ver)
print(".", end="", flush=True)
except Exception as e:
print("FAILED")
raise e

run(_test_invalid_signature)
run(_test_invalid_version)
run(_test_successful_update)

print("TEST PASSED")

Expand All @@ -159,7 +202,6 @@ def testfunc(child):
res = 1
aiocoap_process = start_aiocoap_fileserver()
# TODO: wait for coap port to be available

res = run(testfunc, echo=True)

except Exception as e:
Expand Down

0 comments on commit 09a5282

Please sign in to comment.