Skip to content

Commit

Permalink
Add bluetooth 'run' tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jcapona committed Feb 6, 2024
1 parent a4a4553 commit 49ae29e
Show file tree
Hide file tree
Showing 3 changed files with 554 additions and 8 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ test =
pytest-aiohttp
pytest-mock
aioresponses
testpath

[options.entry_points]
console_scripts =
Expand Down
294 changes: 286 additions & 8 deletions tests/e2e/test_bluetooth_run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from datetime import datetime
from json import dumps
from json import dumps, loads
from time import time

import pytest

Expand Down Expand Up @@ -145,15 +145,13 @@ async def test_run_shell(bluetooth_server):
)

await send_formatted_bluetooth_message(bluetooth_server, char, start_cmd)

await wait_until(
message_received(b'{"type": "started", "data": null, "process": "1"}', messages)
)

stdin_message = create_message("stdin", {"input": code}, "1")
stdin_message = append_to_message(stdin_message, {"client_uuid": "1"})
await send_formatted_bluetooth_message(bluetooth_server, char, stdin_message)

await wait_until(
lambda: True in [output.encode() in message for message in messages]
)
Expand All @@ -162,13 +160,48 @@ async def test_run_shell(bluetooth_server):
stop_cmd = append_to_message(stop_cmd, {"client_uuid": "1"})
await send_formatted_bluetooth_message(bluetooth_server, char, stop_cmd)

await asyncio.sleep(2)
print(messages)

# really we expect a -15 exit code but this isn't necessarily a problem
# in buster, the process exits with code 0
await wait_until(lambda: len(messages) >= 3)
chunk = Chunk(messages[-1])
msg = loads(chunk.payload)
msg["type"] == "stopped"

Check notice

Code scanning / CodeQL

Statement has no effect Note test

This statement has no effect.
msg["data"]["exitCode"] in (-9, 0)

Check notice

Code scanning / CodeQL

Statement has no effect Note test

This statement has no effect.
assert 1 == 0


@pytest.mark.asyncio
async def test_run_executable(bluetooth_server):
service = bluetooth_server.get_service(PT_SERVICE_UUID)
char = service.get_characteristic(PT_RUN_WRITE_CHARACTERISTIC_UUID)

code = """\
#!/bin/bash
date +%s # unix time in seconds
"""
start_cmd = create_message("start", {"runner": "exec", "code": code}, "1")
start_cmd = append_to_message(start_cmd, {"client_uuid": "1"})

messages = []
service.get_characteristic(PT_RUN_READ_CHARACTERISTIC_UUID)._subscribe(
lambda msg: messages.append(msg)
)

await send_formatted_bluetooth_message(bluetooth_server, char, start_cmd)

await wait_until(
message_received(b'{"type": "started", "data": null, "process": "1"}', messages)
)

seconds = str(int(time()))
stdout_message = dumps(
{"type": "stdout", "data": {"output": f"{seconds}\r\n"}, "process": "1"}
)

await wait_until(message_received(stdout_message.encode(), messages))
await wait_until(
message_received(
b'{"type": "stopped", "data": {"exitCode": -9}, "process": "1"}', messages
b'{"type": "stopped", "data": {"exitCode": 0}, "process": "1"}', messages
)
)

Expand Down Expand Up @@ -227,3 +260,248 @@ async def test_two_clients(bluetooth_server):
b'{"type": "stopped", "data": {"exitCode": -15}, "process": "1"}', messages
)
)


@pytest.mark.asyncio
async def test_stop_early(bluetooth_server):
service = bluetooth_server.get_service(PT_SERVICE_UUID)
char = service.get_characteristic(PT_RUN_WRITE_CHARACTERISTIC_UUID)

code = "while True: pass"
start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")
start_cmd = append_to_message(start_cmd, {"client_uuid": "1"})

messages = []
service.get_characteristic(PT_RUN_READ_CHARACTERISTIC_UUID)._subscribe(
lambda msg: messages.append(msg)
)

await send_formatted_bluetooth_message(bluetooth_server, char, start_cmd)

await wait_until(
message_received(b'{"type": "started", "data": null, "process": "1"}', messages)
)

stop_cmd = create_message("stop", None, "1")
stop_cmd = append_to_message(stop_cmd, {"client_uuid": "1"})

await send_formatted_bluetooth_message(bluetooth_server, char, stop_cmd)
await wait_until(
message_received(
b'{"type": "stopped", "data": {"exitCode": -15}, "process": "1"}', messages
)
)


@pytest.mark.asyncio
async def test_bad_code(bluetooth_server):
service = bluetooth_server.get_service(PT_SERVICE_UUID)
char = service.get_characteristic(PT_RUN_WRITE_CHARACTERISTIC_UUID)

code = "i'm not valid python"
start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")
start_cmd = append_to_message(start_cmd, {"client_uuid": "1"})

messages = []
service.get_characteristic(PT_RUN_READ_CHARACTERISTIC_UUID)._subscribe(
lambda msg: messages.append(msg)
)

await send_formatted_bluetooth_message(bluetooth_server, char, start_cmd)
await wait_until(lambda: len(messages) == 3)
assert messages[0].endswith(b'{"type": "started", "data": null, "process": "1"}')
assert messages[2].endswith(
b'{"type": "stopped", "data": {"exitCode": 1}, "process": "1"}'
)

chunk = Chunk(messages[1])
content = loads(chunk.payload)
lines = content["data"]["output"].split("\n")
assert lines[0].startswith(" File")
assert lines[1] == " i'm not valid python\r"
assert lines[2][-2:] == "^\r"
assert lines[3] == "SyntaxError: EOL while scanning string literal\r"


@pytest.mark.asyncio
async def test_input(bluetooth_server):
code = """s = input()
while "BYE" != s:
print(["HUH?! SPEAK UP, SONNY!","NO, NOT SINCE 1930"][s.isupper()])
s = input()"""
service = bluetooth_server.get_service(PT_SERVICE_UUID)
char = service.get_characteristic(PT_RUN_WRITE_CHARACTERISTIC_UUID)

start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")
start_cmd = append_to_message(start_cmd, {"client_uuid": "1"})

messages = []
service.get_characteristic(PT_RUN_READ_CHARACTERISTIC_UUID)._subscribe(
lambda msg: messages.append(msg)
)

await send_formatted_bluetooth_message(bluetooth_server, char, start_cmd)
await wait_until(
message_received(b'{"type": "started", "data": null, "process": "1"}', messages)
)

user_input = create_message("stdin", {"input": "hello\n"}, "1")
user_input = append_to_message(user_input, {"client_uuid": "1"})
await send_formatted_bluetooth_message(bluetooth_server, char, user_input)
await wait_until(
message_received(
b'{"type": "stdout", "data": {"output": "hello\\r\\nHUH?! SPEAK UP, SONNY!\\r\\n"}, "process": "1"}',
messages,
)
)

user_input = create_message("stdin", {"input": "HEY GRANDMA\n"}, "1")
user_input = append_to_message(user_input, {"client_uuid": "1"})
await send_formatted_bluetooth_message(bluetooth_server, char, user_input)
await wait_until(
message_received(
b'{"type": "stdout", "data": {"output": "HEY GRANDMA\\r\\nNO, NOT SINCE 1930\\r\\n"}, "process": "1"}',
messages,
)
)

user_input = create_message("stdin", {"input": "BYE\n"}, "1")
user_input = append_to_message(user_input, {"client_uuid": "1"})
await send_formatted_bluetooth_message(bluetooth_server, char, user_input)
await wait_until(
message_received(
b'{"type": "stopped", "data": {"exitCode": 0}, "process": "1"}', messages
)
)


@pytest.mark.asyncio
async def test_out_of_order_commands(bluetooth_server):
service = bluetooth_server.get_service(PT_SERVICE_UUID)
char = service.get_characteristic(PT_RUN_WRITE_CHARACTERISTIC_UUID)

messages = []
service.get_characteristic(PT_RUN_READ_CHARACTERISTIC_UUID)._subscribe(
lambda msg: messages.append(msg)
)

user_input = create_message("stdin", {"input": "hello\n"}, "1")
user_input = append_to_message(user_input, {"client_uuid": "1"})
await send_formatted_bluetooth_message(bluetooth_server, char, user_input)
await wait_until(
message_received(
b'{"type": "error", "data": {"message": "Bad message"}, "process": null}',
messages,
)
)
messages.clear()

stop_msg = create_message("stop", None, "1")
stop_msg = append_to_message(stop_msg, {"client_uuid": "1"})
await send_formatted_bluetooth_message(bluetooth_server, char, stop_msg)
await wait_until(
message_received(
b'{"type": "error", "data": {"message": "Bad message"}, "process": null}',
messages,
)
)
messages.clear()

# send start
code = "while True: pass"
start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")
start_cmd = append_to_message(start_cmd, {"client_uuid": "1"})
await send_formatted_bluetooth_message(bluetooth_server, char, start_cmd)
await wait_until(
message_received(b'{"type": "started", "data": null, "process": "1"}', messages)
)
messages.clear()

# send start again
await send_formatted_bluetooth_message(bluetooth_server, char, start_cmd)
await wait_until(
message_received(
b'{"type": "error", "data": {"message": "Bad message"}, "process": null}',
messages,
)
)
messages.clear()

# send stop
stop_cmd = create_message("stop", None, "1")
stop_cmd = append_to_message(stop_cmd, {"client_uuid": "1"})
await send_formatted_bluetooth_message(bluetooth_server, char, stop_cmd)
await wait_until(
message_received(
b'{"type": "stopped", "data": {"exitCode": -15}, "process": "1"}', messages
)
)

# send stop again
await send_formatted_bluetooth_message(bluetooth_server, char, stop_cmd)
await wait_until(
message_received(
b'{"type": "error", "data": {"message": "Bad message"}, "process": null}',
messages,
)
)


@pytest.mark.asyncio
async def test_discard_old_input(bluetooth_server):
code = 'print("hello world")'
service = bluetooth_server.get_service(PT_SERVICE_UUID)
char = service.get_characteristic(PT_RUN_WRITE_CHARACTERISTIC_UUID)

messages = []
service.get_characteristic(PT_RUN_READ_CHARACTERISTIC_UUID)._subscribe(
lambda msg: messages.append(msg)
)

start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")
start_cmd = append_to_message(start_cmd, {"client_uuid": "1"})
await send_formatted_bluetooth_message(bluetooth_server, char, start_cmd)
await wait_until(
message_received(b'{"type": "started", "data": null, "process": "1"}', messages)
)

unterminated_input = create_message("stdin", {"input": "unterminated input"}, "1")
unterminated_input = append_to_message(unterminated_input, {"client_uuid": "1"})
await send_formatted_bluetooth_message(bluetooth_server, char, unterminated_input)

await wait_until(
message_received(
b'{"type": "stdout", "data": {"output": "hello world\\r\\n"}, "process": "1"}',
messages,
)
)
await wait_until(
message_received(
b'{"type": "stopped", "data": {"exitCode": 0}, "process": "1"}', messages
)
)

messages.clear()

code = "print(input())"
start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")
start_cmd = append_to_message(start_cmd, {"client_uuid": "1"})
await send_formatted_bluetooth_message(bluetooth_server, char, start_cmd)
await wait_until(
message_received(b'{"type": "started", "data": null, "process": "1"}', messages)
)

user_input = create_message("stdin", {"input": "hello\n"}, "1")
user_input = append_to_message(user_input, {"client_uuid": "1"})
await send_formatted_bluetooth_message(bluetooth_server, char, user_input)
await wait_until(
message_received(
b'{"type": "stdout", "data": {"output": "hello\\r\\nhello\\r\\n"}, "process": "1"}',
messages,
)
)
await wait_until(
message_received(
b'{"type": "stopped", "data": {"exitCode": 0}, "process": "1"}', messages
)
)
Loading

0 comments on commit 49ae29e

Please sign in to comment.