Skip to content

Commit

Permalink
add nixos-test runner
Browse files Browse the repository at this point in the history
  • Loading branch information
Mic92 committed Nov 8, 2024
1 parent 394f171 commit bec9b3d
Showing 1 changed file with 168 additions and 0 deletions.
168 changes: 168 additions & 0 deletions home/bin/nixos-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
#!/usr/bin/env nix-shell
#!nix-shell -i python -p python3 tigervnc

import argparse
import subprocess
from dataclasses import dataclass
import difflib
import json
import re
import time
from pathlib import Path
import shlex
import os
import signal
import socket
from typing import Iterator
from contextlib import contextmanager, ExitStack


@dataclass
class Options:
checkname: str
interactive: bool
system: str


def current_system() -> str:
return run(["nix", "config", "show", "system"]).stdout.strip()


def parse_args() -> Options:
parser = argparse.ArgumentParser()
parser.add_argument(
"-i", "--interactive", action="store_true", help="Run in interactive mode"
)
parser.add_argument(
"--system",
help="System to run the check on. Defaults to the current system",
default=None,
)
parser.add_argument("checkname", help="Name of the check to run")
args = parser.parse_args()
return Options(
checkname=args.checkname,
interactive=args.interactive,
system=args.system or current_system(),
)


class Error(Exception):
pass


def run(
cmd: list[str | Path], stdout: int | None = subprocess.PIPE
) -> subprocess.CompletedProcess:
print("$", " ".join(map(lambda p: shlex.quote(str(p)), cmd)))
return subprocess.run(cmd, check=True, stdout=stdout, text=True)


def nix(args: list[str]) -> subprocess.CompletedProcess:
return run(["nix", "--extra-experimental-features", "nix-command flakes", *args])


def get_number_of_vms(exe_path: Path) -> int:
with exe_path.open() as exe:
for line in exe:
if m := re.match(r"export startScripts='(.*)'", line):
return len(m.group(1).split())
msg = f"Could not find number of VMs in {exe_path}"
raise Error(msg)


@contextmanager
def terminate_on_exit(proc: subprocess.Popen) -> Iterator[subprocess.Popen]:
try:
yield proc
finally:
try:
pgid = os.getpgid(proc.pid)
if pgid == os.getpgid(os.getpid()):
msg = "Trying to kill the current process group. This is a bug. Processes should be start with start_new_session"
raise Error(msg)
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
except ProcessLookupError:
return
proc.wait()

def wait_tcp_port(port: int) -> None:
for _ in range(200):
try:
socket.create_connection(("localhost", port), timeout=1)
except ConnectionRefusedError:
time.sleep(0.1)


def run_interactive_nixos_test(checkname: str, system: str) -> None:
driver_path = nix(
["build", "-L", "--print-out-paths", f".#checks.{system}.{checkname}.driver"]
).stdout.strip()
exe_path = Path(driver_path) / "bin" / "nixos-test-driver"
if not exe_path.exists():
msg = f"Driver {exe_path} not found. Is this a nixos test?"
raise Error(msg)
num_vms = get_number_of_vms(exe_path)

for i in range(num_vms):
port = 5900 + i
try:
socket.create_connection(("localhost", port), timeout=1)
except ConnectionRefusedError:
continue
else:
msg = f"VNC Port {port} is already in use. Is another test running?"
raise Error(msg)

process = subprocess.Popen([str(exe_path)], start_new_session=True)
with ExitStack() as stack:
proc = stack.enter_context(terminate_on_exit(process))
for i in range(num_vms):
port = 5900 + i
wait_tcp_port(port)
vnc_proc = subprocess.Popen(["vncviewer", f"localhost:{port}"], start_new_session=True)
stack.enter_context(terminate_on_exit(vnc_proc))
proc.wait()


def inner_main() -> None:
opts = parse_args()
all_tests = json.loads(
nix(
[
"eval",
"--json",
"--apply",
"builtins.attrNames",
f".#checks.{opts.system}",
]
).stdout.strip()
)

if difflib.get_close_matches(opts.checkname, all_tests):
if opts.interactive:
run_interactive_nixos_test(opts.checkname, opts.system)
else:
nix(
[
"build",
"-L",
f".#checks.{opts.system}.{opts.checkname}",
],
)
else:
msg = f"Check {opts.checkname} not found. Available checks are:\n"
msg += "\n".join(all_tests)
raise Error(msg)


def main() -> None:
try:
inner_main()
except Error as e:
print(e)
exit(1)


if __name__ == "__main__":
main()

0 comments on commit bec9b3d

Please sign in to comment.