diff --git a/README.md b/README.md new file mode 100644 index 0000000..0ae2906 --- /dev/null +++ b/README.md @@ -0,0 +1,23 @@ +## Installation + +You just need to install required python packages running `pip3 install -r +requirements.txt` and set following environment variables in `~/.bashrc`: +``` +export RUNNER_PATH= +export PATH="$PATH:$RUNNER_PATH/bin/" +``` + +## Setup + +1. To start new validation scope create empty directory and inside this +directory run `jogger init`. +2. Fill `configs/duts_config.yml` with your DUT configuration. +3. Edit `configs/scope_config.yml`. Set path to OCL tests directory +and provide list of tests to be included in scope. +4. Start runner daemon by running `runnerd`. +5. At this point you should be able to run selected tests and check scope +progress using `jogger` utility. + +## Usage + +See `jogger --help`. diff --git a/bin/jogger b/bin/jogger new file mode 120000 index 0000000..8b2bd35 --- /dev/null +++ b/bin/jogger @@ -0,0 +1 @@ +../jogger \ No newline at end of file diff --git a/bin/runnerd b/bin/runnerd new file mode 120000 index 0000000..cb2d6b7 --- /dev/null +++ b/bin/runnerd @@ -0,0 +1 @@ +../runnerd \ No newline at end of file diff --git a/common.py b/common.py new file mode 100644 index 0000000..488fca2 --- /dev/null +++ b/common.py @@ -0,0 +1,211 @@ +# +# Copyright(c) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# + +from contextlib import contextmanager +from datetime import timedelta +from filelock import FileLock +from json.decoder import JSONDecodeError +import hashlib +import json +import os +import random +import re +import sys +import time +import yaml + + +meta_lock = FileLock("meta/runner.lock") + + +class ConfigFile: + def __init__(self, path): + self.path = os.path.abspath(path) + self.last_modify = 0 + + def __access(self): + self.last_modify = os.path.getmtime(self.path) + + def need_reload(self): + return self.last_modify != os.path.getmtime(self.path) + + def load(self): + with meta_lock: + self.__access() + with open(self.path, 'r') as conf: + return yaml.safe_load(conf) + + def save(self, data): + with meta_lock: + self.__access() + with open(self.path, 'w') as conf: + return yaml.dump(data, conf) + + +class JournalFile: + def __init__(self, path): + self.path = os.path.abspath(path) + self.last_modify = 0 + + def __access(self): + self.last_modify = os.path.getmtime(self.path) + + def create(self): + with self.record(): + pass + + def need_reload(self): + if not os.path.isfile(self.path): + return False + return self.last_modify != os.path.getmtime(self.path) + + def load(self): + with meta_lock: + if not os.path.isfile(self.path): + return [] + self.__access() + with open(self.path, 'r') as journal_file: + return json.load(journal_file) + + @contextmanager + def record(self): + with meta_lock: + with open(self.path, 'a+') as journal_file: + self.__access() + try: + journal_file.seek(0) + journal = json.load(journal_file) + except JSONDecodeError: + journal = [] + new_entries = [] + yield new_entries + journal.extend(new_entries) + journal_file.truncate(0) + json.dump(journal, journal_file) + + +class StatusFile: + def __init__(self, path): + self.path = os.path.abspath(path) + + def create(self): + with self.edit(): + pass + + def load(self): + with meta_lock: + if not os.path.isfile(self.path): + return {} + with open(self.path, 'r') as status_file: + return json.load(status_file) + + @contextmanager + def edit(self): + with meta_lock: + with open(self.path, 'a+') as status_file: + try: + status_file.seek(0) + status = json.load(status_file) + except JSONDecodeError: + status = {} + yield status + status_file.truncate(0) + json.dump(status, status_file) + + +class TestCase(dict): + def __init__(self, data): + super().__init__(data) + if 'sha' not in self: + signature = self.signature().encode("UTF-8") + self['sha'] = hashlib.sha1(signature).hexdigest().upper() + + def signature(self): + return f"{self['dir']}|{self}|{self['seed']}" + + def function(self): + if self['params']: + return f"{self['name']}[{self['params']}]" + else: + return f"{self['name']}" + + def test(self): + return f"{self['path']}::{self.function()}" + + def __hash__(self): + return hash(self.signature()) + + def __eq__(self, other): + return self.signature() == other.signature() + + def __repr__(self): + return self.test() + + def __str__(self): + return self.test() + + @classmethod + def from_canonical_name(cls, directory, canon_name, seed, pytest_options): + m = re.fullmatch(r'(\S+)::([^\[]+)\[?([^\]]+)?\]?', canon_name) + path, name, params = m.groups() + return cls({ + 'dir': directory, + 'path': path, + 'name': name, + 'params': params, + 'seed': seed, + 'pytest-options': pytest_options + }) + + +class TestEvent(dict): + def __init__(self, data): + super().__init__(data) + self['test-case'] = TestCase(self['test-case']) + + def signature(self): + return f"{self['test-case'].signature()}|{self['sha']}" + + def duration(self): + try: + start_time = self['start-timestamp'] + end_time = self.get('end-timestamp', time.time()) + return timedelta(seconds=int(end_time-start_time)) + except: + return timedelta(0) + + def __eq__(self, other): + return self.signature() == other.signature() + + def __repr__(self): + return self.__str__() + + def __str__(self): + return f"<{self['sha']}>{self['test-case']}" + + @classmethod + def new(cls, test_case, data={}): + signature = f"{test_case.signature()}{time.time()}".encode("UTF-8") + return cls({ + **{ + 'test-case': test_case, + 'sha': hashlib.sha1(signature).hexdigest() + }, + **data + }) + +class JournalParser: + def __init__(self, journal_file): + self.journal_file = journal_file + + def parse(self): + journal_dict = {} + for entry in self.journal_file.load(): + test_event = TestEvent(entry['test-event']) + if entry['type'] == "add": + journal_dict[test_event['sha']] = test_event + elif entry['type'] == "delete": + del journal_dict[test_event['sha']] + return list(journal_dict.values()) diff --git a/configs/base_dut_config.yml b/configs/base_dut_config.yml new file mode 100644 index 0000000..7ebe02f --- /dev/null +++ b/configs/base_dut_config.yml @@ -0,0 +1,11 @@ +type: "ssh" + +allow_disk_autoselect: False +working_dir: "/tmp/open-cas-linux/" + +plugins: + example_plugin: + provided_by: "internal_plugins.example_plugin" + config: + property1: "value1" + property2: "value2" diff --git a/configs/duts_config.yml b/configs/duts_config.yml new file mode 100644 index 0000000..9876aef --- /dev/null +++ b/configs/duts_config.yml @@ -0,0 +1,41 @@ +duts: + - ip: "192.168.0.15" + user: "root" + password: "PasswordInPlainText!SomeonePleaseCallThePolice!" + disks: + - path: "/dev/nvme0n1" + serial: "000000000001" + type: "optane" + blocksize: 512 + - path: "/dev/nvme1n1" + serial: "000000000002" + type: "optane" + blocksize: 512 + - path: "/dev/sda" + serial: "420420420" + type: "hdd" + blocksize: 512 + - path: "/dev/sdb" + serial: "123456789" + type: "hdd" + blocksize: 512 + - ip: "192.168.0.42" + user: "root" + password: "AnotherPasswordInPlainText!DontTellMyMom!" + disks: + - path: "/dev/nvme0n1" + serial: "0000000000f1" + type: "optane" + blocksize: 512 + - path: "/dev/nvme1n1" + serial: "00000000000f2" + type: "optane" + blocksize: 512 + - path: "/dev/sda" + serial: "420420421" + type: "hdd" + blocksize: 512 + - path: "/dev/sdb" + serial: "12345678a" + type: "hdd" + blocksize: 512 diff --git a/configs/scope_config.yml b/configs/scope_config.yml new file mode 100644 index 0000000..f3114f3 --- /dev/null +++ b/configs/scope_config.yml @@ -0,0 +1,13 @@ +scope: + tag: "EXAMPLE_TEST_SCOPE" + +global_pytest_options: + example_option: "abc" + +tests_path: "/home/user/open-cas-linux/test/functional" +tests: + - path: tests/cache_ops/test_seq_cutoff.py + - path: tests/cli/test_cli_help_and_version.py::test_cli_version + pytest_options: + example_option: "ABC" # Takes precedence + - path: tests/cli/test_cli_help_and_version.py::test_cli_help[True] diff --git a/jogger b/jogger new file mode 100755 index 0000000..06a503e --- /dev/null +++ b/jogger @@ -0,0 +1,722 @@ +#!/usr/bin/env python3 +# +# Copyright(c) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# + +from common import ConfigFile, JournalFile, StatusFile, TestCase, TestEvent, JournalParser + +from datetime import datetime +from functools import reduce +from tabulate import tabulate +from tempfile import NamedTemporaryFile +import argparse +import daemon +import hashlib +import json +import os +import shutil +import sys +import webbrowser + + +def error(*args, **kwargs): + print(*args, *kwargs, file=sys.stderr) + + +class Printer: + @staticmethod + def red(string): + return "\033[0;31m"+string+"\033[0m" + + @staticmethod + def green(string): + return "\033[0;32m"+string+"\033[0m" + + @staticmethod + def yellow(string): + return "\033[0;33m"+string+"\033[0m" + + @staticmethod + def blue(string): + return "\033[0;34m"+string+"\033[0m" + + +class DataPrinter: + def __init__(self, output_format='table'): + if output_format not in ['table', 'json']: + raise ValueError(f"Invalid output format '{output_format}'") + self.output_format = output_format + self.caption = None + self.data = None + + def setCaptions(self, captions): + self.captions = captions + + def setData(self, data): + self.data = data + + def print_json(self): + print(json.dumps(self.data, indent=2)) + + def print_table(self): + data = [[caption, self.data[field]] for caption, field in self.captions] + print(tabulate(data)) + + def print(self): + if self.output_format == 'json': + self.print_json() + else: + self.print_table() + + +class ListPrinter: + def __init__(self, output_format='table'): + if output_format not in ['table', 'json']: + raise ValueError(f"Invalid output format '{output_format}'") + self.output_format = output_format + self.header = None + self.entries = [] + + def setHeader(self, header): + self.header = header + + def addEntry(self, entry): + self.entries.append(entry) + + def print_json(self): + print(json.dumps(self.entries, indent=2)) + + def print_table(self): + headers = [title for title, _ in self.header] + data = [] + for entry in self.entries: + data.append([entry[field] for _, field in self.header]) + print(tabulate(data, headers)) + + def print(self): + if self.output_format == 'json': + self.print_json() + else: + self.print_table() + + +class ScopeHandler: + def __init__(self): + self.journal_file = JournalFile("meta/journal.json") + self.progress_file = StatusFile("meta/progress.json") + self.scope_file = StatusFile("meta/scope.json") + self.results_file = StatusFile("meta/results.json") + + @staticmethod + def test2canon(test): + return f"{test['path']}::{test['name']}[{test['params']}]" + + @staticmethod + def event2canon(event): + return f"{event['path']}::{event['name']}[{event['params']}]" + + @staticmethod + def result2canon(result): + return f"{result['module']}::{result['function']}" + + def __get_results(self): + results = self.results_file.load().get('results', []) + test_events = [] + for res in results: + test_event = TestEvent(res) + test_event['logs'] = os.path.abspath(test_event['logs']) + test_events.append(test_event) + return test_events + + def __get_scope(self): + return self.scope_file.load() + + def __get_journal(self): + return JournalParser(self.journal_file).parse() + + def __get_queue(self): + journal = self.__get_journal() + progress = [] + for entry in self.progress_file.load().get('test-events', []): + progress.append(TestEvent(entry)) + progress_dict = {} + for test_event in progress: + progress_dict[test_event['sha']] = test_event + for test_event in journal: + test_event.update(progress_dict.get(test_event['sha'], {})) + return journal + + def __get_tests(self, full=False, collapsed=False): + scope = self.__get_scope() + tests_dict = {} + for entry in scope.get("tests", []): + test_case = TestCase(entry) + del entry['sha'] + entry['params'] = None + collapsed_test_case = TestCase(entry) + collapsed_test_case['children'] = [] + if (full or collapsed) and test_case == collapsed_test_case: + tests_dict[test_case['sha']] = test_case + continue + if full: + tests_dict[test_case['sha']] = test_case + if collapsed: + tests_dict.setdefault( + collapsed_test_case['sha'], + collapsed_test_case + )['children'].append(test_case) + return list(tests_dict.values()) + + def __tests_by_sha(self, sha): + test_cases = self.__get_tests(full=True, collapsed=True) + entry = next( + filter( + lambda test_case: test_case['sha'].startswith(sha), + test_cases + ) + ) + return entry.get('children', [entry]) + + def scope(self): + return self.__get_scope() + + def tests(self, req): + return self.__get_tests( + full=req.get('full', False), + collapsed=req.get('collapsed', False) + ) + + def run(self, req): + tests = reduce(lambda acc, sha: acc+self.__tests_by_sha(sha), req, []) + test_events = [] + with self.journal_file.record() as journal: + for test_case in tests: + test_event = TestEvent.new(test_case, {'status': "queued"}) + test_events.append(test_event) + journal.append({ + 'type': "add", + 'test-event': test_event + }) + return test_events + + def delete(self, req): + test_event = req['test-event'] + if test_event['status'] == "complete": + return None + with self.journal_file.record() as journal: + journal.append({ + 'type': "delete", + 'test-event': test_event + }) + return test_event + + def queue(self): + results = self.__get_results() + queue = self.__get_queue() + + results_dict = {} + for res in results: + results_dict[res['sha']] = res + + for test_event in filter(lambda te: te['status'] == "complete", queue): + try: + result_event = results_dict[test_event['sha']] + del result_event['status'] + test_event.update(result_event) + except: + test_event['status'] = "error" + + return queue + + def status(self): + results = self.__get_results() + tests = self.__get_tests(full=True) + queue = self.queue() + + results_dict = {} + for res in results: + results_dict[res['sha']] = res + + scope_status = [] + for test_case in tests: + queued_events = filter( + lambda e: e['test-case'] == test_case, + reversed(queue) + ) + last_event = next(queued_events, {}) + if last_event.get('status') in ['complete', 'error']: + completed_event = last_event + else: + completed_event = {} + for test_event in queued_events: + if test_event['status'] in ['complete', 'error']: + completed_event = test_event + break + + test_case['queued-event'] = last_event + test_case['last-event'] = completed_event + + return tests + + def results(self, req): + def in_tests(result, tests): + return any([result['test-case'] == test_case for test_case in tests]) + + results = [ev for ev in self.queue() if ev['status'] == "complete"] + result_dict = {} + for test_event in results: + test_case = test_event['test-case'] + result_dict[test_case] = test_event + + if req.get('filter', {}).get('last'): + results = result_dict.values() + + if req.get('filter', {}).get('passed'): + results = [res for res in results if res['result'] == "PASSED"] + + if req.get('filter', {}).get('failed'): + results = [res for res in results if res['result'] == "FAILED"] + + if req.get('filter', {}).get('test-sha'): + tests = self.__tests_by_sha(req['filter']['test-sha']) + results = [res for res in results if in_tests(res, tests)] + + return results + + def result_by_sha(self, req): + results = self.results({}) + return next( + filter( + lambda res: res['sha'].startswith(req['sha']), + results), + None + ) + + def test_event_by_sha(self, req): + test_events = self.queue() + return next( + filter( + lambda te: te['sha'].startswith(req['sha']), + test_events), + None + ) + + + +class TestSelector: + def __init__(self, tests): + self.tests = tests + + def select(self): + with NamedTemporaryFile(mode="r+") as tmpf: + data = [] + for test_case in self.tests: + data.append([ + test_case['sha'][:16], + test_case.function(), + test_case.get('last-event', {}).get('result', "") + ]) + tmpf.write(tabulate(data, tablefmt="plain")) + tmpf.flush() + + os.system(f"vim {tmpf.name}") + + tmpf.seek(0) + return [line.split()[0] for line in tmpf.readlines()] + +usage = """%(prog)s command [args] + +Supported commands: + init Initialize new test scope + tests Print list of test cases + run Run specified tests or select them interactively + delete Delete test event from the queue + queue Print test event queue + status Print scope status + results Print list of test results + show Print details of test result + log Open log(s) for given test case in default browser + test-log Open log for given test event in default browser + stdout Show pytest standard output on selected DUT +""" + + +class SuperRunnerCli: + def __init__(self, argv): + parser = argparse.ArgumentParser(description="Super Runner CLI", usage=usage) + parser.add_argument('command') + args = parser.parse_args(argv[0:1]) + command = args.command.replace("-", "_") + if not hasattr(self, command): + error(f"Unrecognized command '{args.command}'") + parser.print_help() + exit(1) + self.scope_handler = ScopeHandler() + getattr(self, command)(f"{parser.prog} {args.command}", argv[1:]) + + def __color_result(self, string): + return { + 'PASSED': Printer.green, + 'FAILED': Printer.red, + }.get(string, Printer.yellow)(string) + + def __print_test_events(self, args, test_events): + p = ListPrinter(args.format) + id_field = ('id', 'sha')[args.long] + test_field = ('function', 'test')[args.long] + test_event_field = ('event-id', 'event-sha')[args.long] + p.setHeader([ + ('Id', id_field), + ('Test', test_field), + ('Event', test_event_field), + ]) + for test_event in test_events: + test_case = test_event['test-case'] + p.addEntry({ + 'id': test_case['sha'][:16], + 'sha': test_case['sha'], + 'function': test_case.function(), + 'test': test_case.test(), + 'event-id': test_event['sha'][:16], + 'event-sha': test_event['sha'], + }) + p.print() + + def init(self, prog, argv): + parser = argparse.ArgumentParser( + prog=prog, + description="Initialize new test scope" + ) + args = parser.parse_args(argv) + + runner_path = os.getenv('RUNNER_PATH') + if not runner_path: + error("Enrvironment variable 'RUNNER_PATH' is not set!") + exit(1) + if os.path.isdir("meta"): + error("Existing runner scope found in this directory!") + exit(1) + os.mkdir("meta") + os.mkdir("results") + shutil.copytree(os.path.join(runner_path, "configs"), "configs") + + def tests(self, prog, argv): + parser = argparse.ArgumentParser( + prog=prog, + description="Print list of test cases" + ) + parser.add_argument('--long', action='store_true') + parser.add_argument('--collapse', action='store_true') + parser.add_argument('--format', choices=['table', 'json'], default='table') + args = parser.parse_args(argv) + + tests = self.scope_handler.tests({ + 'full': not args.collapse, + 'collapsed': args.collapse + }) + + p = ListPrinter(args.format) + id_field = ('id', 'sha')[args.long] + test_field = ('function', 'test')[args.long] + p.setHeader([('Id', id_field), ('Test', test_field)]) + for test_case in tests: + p.addEntry({ + 'id': test_case['sha'][:16], + 'sha': test_case['sha'], + 'function': test_case.function(), + 'test': test_case.test() + }) + p.print() + + def run(self, prog, argv): + parser = argparse.ArgumentParser( + prog=prog, + description="Run specified tests or select them interactively" + ) + parser.add_argument('ids', nargs='*') + parser.add_argument('--failed', action='store_true') + parser.add_argument('--not-passed', action='store_true') + parser.add_argument('--missing', action='store_true') + parser.add_argument('--include-queued', action='store_true') + parser.add_argument('--long', action='store_true') + parser.add_argument('--format', choices=['table', 'json'], default='table') + args = parser.parse_args(argv) + + if args.ids: + test_events = self.scope_handler.run(args.ids) + self.__print_test_events(args, test_events) + return + + test_cases = self.scope_handler.status() + + run_all = not any((args.failed, args.not_passed, args.missing)) + + if not args.include_queued and not run_all: + test_cases = list(filter( + lambda tc: tc.get('queued-event', {}).get('status') == "complete", + test_cases + )) + + if args.failed: + test_cases = list(filter( + lambda tc: tc.get('last-event', {}).get('result') == "FAILED", + test_cases + )) + + if args.not_passed: + test_cases = list(filter( + lambda tc: tc.get('last-event', {}).get('result') not in [None, "PASSED"], + test_cases + )) + + if args.missing: + test_cases = [tc for tc in test_cases if not tc.get('queued-event')] + + to_run = TestSelector(test_cases).select() + + test_events = self.scope_handler.run(to_run) + self.__print_test_events(args, test_events) + + def delete(self, prog, argv): + parser = argparse.ArgumentParser( + prog=prog, + description="Delete test event from the queue" + ) + parser.add_argument('id') + args = parser.parse_args(argv) + + test_event = self.scope_handler.test_event_by_sha({'sha': args.id}) + deleted_event = self.scope_handler.delete({'test-event': test_event}) + + def queue(self, prog, argv): + parser = argparse.ArgumentParser( + prog=prog, + description="Print test event queue" + ) + parser.add_argument('--all', action='store_true') + parser.add_argument('--long', action='store_true') + parser.add_argument('--format', choices=['table', 'json'], default='table') + args = parser.parse_args(argv) + + queue = self.scope_handler.queue() + + p = ListPrinter(args.format) + id_field = ('id', 'sha')[args.long] + test_field = ('function', 'test')[args.long] + p.setHeader([ + ('Id', id_field), + ('Test', test_field), + ('Status', 'status'), + ('DUT', 'dut'), + ('Duration', 'duration') + ]) + for test_event in queue: + if not args.all and test_event['status'] in ["complete", "error"]: + continue + test_case = test_event['test-case'] + p.addEntry({ + 'id': test_event['sha'][:16], + 'sha': test_event['sha'], + 'function': test_case.function(), + 'test': test_case.test(), + 'status': test_event['status'], + 'dut': test_event.get('ip', ""), + 'duration': test_event.duration() + }) + p.print() + + def status(self, prog, argv): + parser = argparse.ArgumentParser( + prog=prog, + description="Print scope status" + ) + parser.add_argument('--long', action='store_true') + parser.add_argument('--format', choices=['table', 'json'], default='table') + args = parser.parse_args(argv) + + status = self.scope_handler.status() + + p = ListPrinter(args.format) + id_field = ('id', 'sha')[args.long] + result_id_field = ('last-result-id', 'last-result-sha')[args.long] + test_field = ('function', 'test')[args.long] + p.setHeader([ + ('Id', id_field), + ('Test', test_field), + ('Status', 'status'), + ('Last result id', result_id_field), + ('Last result', 'last-result'), + ('Last result date', 'last-result-date'), + ]) + for test_case in status: + last_event = test_case.get('last-event', {}) + queued_event = test_case.get('queued-event', {}) + try: + last_event_result = self.__color_result(last_event['result']) + last_event_id = last_event['sha'][:16] + last_event_sha = last_event['sha'] + last_event_date = datetime.fromtimestamp(last_event['end-timestamp']) \ + .strftime("%Y-%m-%d %H:%M:%S") + except: + last_event_result = last_event_id = last_event_sha = last_event_date = "" + p.addEntry({ + 'id': test_case['sha'][:16], + 'sha': test_case['sha'], + 'function': test_case.function(), + 'test': test_case.test(), + 'status': queued_event.get('status', "none"), + 'dut': queued_event.get('ip', ""), + 'last-result': last_event_result, + 'last-result-id': last_event_id, + 'last-result-sha': last_event_sha, + 'last-result-date': last_event_date + }) + p.print() + + def results(self, prog, argv): + parser = argparse.ArgumentParser( + prog=prog, + description="Print list of test results" + ) + parser.add_argument('id', nargs='?') + parser.add_argument('--last', action='store_true') + parser.add_argument('--passed', action='store_true') + parser.add_argument('--failed', action='store_true') + parser.add_argument('--long', action='store_true') + parser.add_argument('--format', choices=['table', 'json'], default='table') + args = parser.parse_args(argv) + + if args.passed and args.failed: + error("Options --passed and --failed cannot be used together!") + exit(1) + + results = self.scope_handler.results({ + 'filter': { + 'last': args.last, + 'passed': args.passed, + 'failed': args.failed, + 'test-sha': args.id + } + }) + + p = ListPrinter(args.format) + id_field = ('id', 'sha')[args.long] + test_field = ('function', 'test')[args.long] + p.setHeader([ + ('Id', id_field), + ('Test', test_field), + ('Result', 'result'), + ('DUT', 'dut'), + ('Duration', 'duration') + ]) + for test_event in results: + test_case = test_event['test-case'] + p.addEntry({ + 'id': test_event['sha'][:16], + 'sha': test_event['sha'], + 'dut': test_event['ip'], + 'function': test_case.function(), + 'test': test_case.test(), + 'result': self.__color_result(test_event['result']), + 'duration': test_event.duration() + }) + p.print() + + def show(self, prog, argv): + parser = argparse.ArgumentParser( + prog=prog, + description="Print details of test result" + ) + parser.add_argument('sha') + parser.add_argument('--format', choices=['table', 'json'], default='table') + args = parser.parse_args(argv) + + test_event = self.scope_handler.result_by_sha({'sha': args.sha}) + if not test_event: + error(f"Result with id '{args.sha}' not found") + exit(1) + + test_case = test_event['test-case'] + + p = DataPrinter(args.format) + p.setCaptions([ + ('SHA', 'sha'), + ('Test', 'test'), + ('DUT', 'dut'), + ('Logs', 'logs'), + ('Result', 'result'), + ('Duration', 'duration') + ]) + p.setData({ + 'sha': test_event['sha'], + 'test': f"{test_case}", + 'dut': test_event['ip'], + 'logs': test_event['logs'], + 'result': self.__color_result(test_event['result']), + 'duration': test_event.duration() + }) + p.print() + + def log(self, prog, argv): + parser = argparse.ArgumentParser( + prog=prog, + description="Open log for given test event in default browser" + ) + parser.add_argument('sha') + args = parser.parse_args(argv) + + res = self.scope_handler.result_by_sha({'sha': args.sha}) + if not res: + error(f"Result with id '{args.sha}' not found") + exit(1) + + with daemon.DaemonContext(): + webbrowser.open_new_tab(os.path.join(res['logs'], "main.html")) + self.scope_handler.log(prog, argv) + + def test_log(self, prog, argv): + parser = argparse.ArgumentParser( + prog=prog, + description="Open log(s) for given test case in default browser" + ) + parser.add_argument('id') + parser.add_argument('--passed', action='store_true') + parser.add_argument('--failed', action='store_true') + args = parser.parse_args(argv) + + results = self.scope_handler.results({ + 'filter': { + 'last': True, + 'passed': args.passed, + 'failed': args.failed, + 'test-sha': args.id + } + }) + if not results: + error(f"Result with id '{args.sha}' not found") + exit(1) + + for test_case in results: + with daemon.DaemonContext(): + webbrowser.open_new_tab(os.path.join(test_case['logs'], "main.html")) + + def stdout(self, prog, argv): + parser = argparse.ArgumentParser( + prog=prog, + description="Show pytest standard output on selected DUT" + ) + parser.add_argument('ip') + args = parser.parse_args(argv) + + stdout_path = os.path.join("results", args.ip, "stdout") + + if not os.path.isfile(stdout_path): + error(f"DUT with ip address '{args.ip}' not found") + exit(1) + + os.system(f"tail -f results/{args.ip}/stdout") + + +if __name__ == '__main__': + SuperRunnerCli(sys.argv[1:]) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3992629 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +filelock>=3.0.12 +python-daemon>=2.2.4 +setproctitle>=1.1.10 +tabulate>=0.8.7 +watchdog>=0.10.3 diff --git a/runnerd b/runnerd new file mode 100755 index 0000000..a0bb92b --- /dev/null +++ b/runnerd @@ -0,0 +1,370 @@ +#!/usr/bin/env python3 +# +# Copyright(c) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# + +from common import ConfigFile, JournalFile, StatusFile, TestCase, TestEvent, JournalParser + +from pathlib import Path +from queue import Queue +from setproctitle import setproctitle +from subprocess import Popen, PIPE +from tempfile import NamedTemporaryFile +from threading import Thread, Lock, Event +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler +import argparse +import contextlib +import random +import daemon +import filelock +import json +import logging +import os +import sys +import time + + +logging.basicConfig( + filename="runnerd.log", + level=logging.ERROR, + format='%(asctime)s %(levelname)-8s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') +log = logging.getLogger("runnerd") + +scriptdir = os.path.dirname(__file__) + + +class EventHandler(FileSystemEventHandler): + def __init__(self, callback): + self.callback = callback + def on_modified(self, event): + self.callback() + + +class PytestRunner: + def __init__(self, test_dir=None, log_dir=None, stdout_path=None): + self.test_dir = test_dir + self.log_dir = log_dir + self.stdout_path = stdout_path + + def run(self, dut_config_path, test_case): + cmd = "" + if self.test_dir: + cmd += f"cd {self.test_dir}; " + cmd += f"pytest " + cmd += f"--dut-config={dut_config_path} " + if self.log_dir: + cmd += f"--log-path={self.log_dir} " + cmd += f"--random-seed={test_case['seed']} " + if test_case['pytest-options'] is not None: + for option, value in test_case['pytest-options'].items(): + cmd += f"--{option}={value} " + cmd += f"\"{test_case}\"" + + out = open(self.stdout_path, "w") if self.stdout_path else PIPE + process = Popen(cmd, shell=True, stdout=out, stderr=out) + process.wait() + if self.stdout_path: + out.close() + + def collect(self, test_path, seed): + cmd = "" + if self.test_dir: + cmd += f"cd {self.test_dir}; " + cmd += f"pytest --collect-only --random-seed={seed} -q \"{test_path}\"" + process = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE) + process.wait() + stdout = process.stdout.read().decode('ascii') + return stdout.splitlines()[:-2] + + +class Dut: + def __init__(self, config): + self.config = config + self.ip = config['ip'] + log_path = Path('results').joinpath(self.ip) + log_path.mkdir(parents=True, exist_ok=True) + self.log_path = log_path.absolute() + stdout_path = log_path.joinpath("stdout") + stdout_path.touch() + self.stdout_path = stdout_path.absolute() + + def run_test(self, test_event, on_complete): + def run_in_thread(self, test_event, on_complete): + self.config['meta'] = { + **test_event, + 'test-case' : {**test_event['test-case']} + } + tmp_conf_file = NamedTemporaryFile(prefix="dut_config_", suffix=".yml") + ConfigFile(tmp_conf_file.name).save(self.config) + log_path = Path('results').joinpath(self.ip) + log_path.mkdir(parents=True, exist_ok=True) + test_case = test_event['test-case'] + log.info(f"Start {test_case} @ {self.ip}") + runner = PytestRunner( + test_dir = test_case['dir'], + log_dir = self.log_path, + stdout_path = self.stdout_path + ) + runner.run(tmp_conf_file.name, test_case) + log.info(f"Complete {test_case} @ {self.ip}") + on_complete(self, test_event) + + thread = Thread(target=run_in_thread, args=(self, test_event, on_complete)) + thread.start() + return thread + + +class DutsManager: + base_dut_config_path = "configs/base_dut_config.yml" + + def __init__(self, duts_config_file): + self.duts_config_file = duts_config_file + self.base_dut_config = ConfigFile(self.base_dut_config_path).load() + self.duts_queue = Queue() + self.duts = [] + + def collect_duts(self): + if not self.duts_config_file.need_reload(): + return + + self.duts_config = self.duts_config_file.load() + self.duts = [] + for config in self.duts_config['duts']: + dut = Dut({**self.base_dut_config, **config}) + self.duts.append(dut) + self.duts_queue.put(dut) + + def get_free_dut(self, dut_filter=None): + filtered_duts = [] + found = False + for _ in self.duts: + dut = self.duts_queue.get() + if not dut_filter or dut_filter(dut): + found = True + break + filtered_duts.append(dut) + map(self.duts_queue.put, filtered_duts) + if not found: + raise Exception("No DUT matching to filter!") + return dut + + def mark_free(self, dut): + self.duts_queue.put(dut) + + +class TestManager: + def __init__(self): + self.lock = Lock() + self.test_events_todo = [] + self.test_events_in_progress = [] + self.test_events_complete = [] + self.progress_file = StatusFile("meta/progress.json") + self.__load_progress() + self.journal_file = JournalFile("meta/journal.json") + self.journal_file.create() + self.journal_event = Event() + self.journal_observer = Observer() + self.journal_observer.schedule(EventHandler(self.__collect_tests), + self.journal_file.path, recursive=True) + self.journal_observer.start() + self.__collect_tests() + + def __load_progress(self): + self.test_events_complete.clear() + with self.progress_file.edit() as progress: + if not 'test-events' in progress: + return + progress['test-events'] = list(filter( + lambda entry: entry['status'] == "complete", + progress['test-events'] + )) + for entry in progress['test-events']: + self.test_events_complete.append(TestEvent(entry)) + + def __append_progress(self, test_event): + with self.progress_file.edit() as progress: + progress.setdefault('test-events', []).append(test_event) + + def __load_journal(self): + self.test_events_todo.clear() + self.test_events_todo = JournalParser(self.journal_file).parse() + + def __collect_tests(self): + if not self.journal_file.need_reload(): + return + log.debug("Collecting tests") + with self.lock: + self.__load_journal() + test_events = [] + for test_event in self.test_events_todo: + if test_event in self.test_events_complete: + continue + if test_event in self.test_events_in_progress: + continue + test_events.append(test_event) + self.test_events_todo = test_events + if self.test_events_todo: + self.journal_event.set() + + def get_next_test(self): + self.journal_event.clear() + if not self.test_events_todo: + self.__collect_tests() + self.journal_event.wait() + with self.lock: + return self.test_events_todo.pop(0) + + def mark_started(self, test_event): + with self.lock: + test_event['status'] = "started" + self.test_events_in_progress.append(test_event) + self.__append_progress(test_event) + + def mark_complete(self, test_event): + with self.lock: + test_event['status'] = "complete" + self.__append_progress(test_event) + self.test_events_complete.append(test_event) + self.test_events_in_progress.remove(test_event) + + +class ResultsCollector: + def __init__(self): + self.results_file = StatusFile("meta/results.json") + + def collect(self): + result_list = [] + for root, _, files in os.walk("results/"): + if not (root.count(os.sep) == 3 and 'info.json' in files): + continue + with open(os.path.join(root, 'info.json')) as info_file: + info = json.load(info_file) + result_list.append(TestEvent({ + **info['meta'], + 'logs': root, + 'result': info['status'] + })) + + result_list.sort(key=lambda e: e['start-timestamp']) + with self.results_file.edit() as results: + results['results'] = result_list + + +class MasterRunner: + duts_config_path = "configs/duts_config.yml" + + def __init__(self): + if not os.path.isfile(self.duts_config_path): + raise FileNotFoundError(self.duts_config_path) + self.duts_manager = DutsManager(ConfigFile(self.duts_config_path)) + self.test_manager = TestManager() + self.results_collector = ResultsCollector() + self.results_collector.collect() + + def run(self): + def test_run_complete(dut, test_event): + self.duts_manager.mark_free(dut) + test_event['end-timestamp'] = time.time() + self.test_manager.mark_complete(test_event) + self.results_collector.collect() + while True: + self.duts_manager.collect_duts() + log.debug("Looking for free DUT... ") + dut = self.duts_manager.get_free_dut() + log.debug(f"Found DUT {dut.ip}") + log.debug("Looking for next test... ") + test_event = self.test_manager.get_next_test() + log.debug(f"Found test {test_event}") + test_event['ip'] = dut.ip + test_event['start-timestamp'] = time.time() + self.test_manager.mark_started(test_event) + dut.run_test(test_event, test_run_complete) + + +class ScopeParser: + scope_config_path = "configs/scope_config.yml" + + def __init__(self): + self.scope_config_file = ConfigFile(self.scope_config_path) + self.scope_file = StatusFile("meta/scope.json") + + def __parse_config(self): + log.debug("Reloading scope config") + scope_config = self.scope_config_file.load() + scope = self.scope_file.load() + pytest_runner = PytestRunner(test_dir=scope_config['tests_path']) + if 'seed' in scope_config: + seed = scope_config['seed'] + else: + seed = scope.get('seed', random.randrange(sys.maxsize)) + + global_pytest_options = scope_config.get('global_pytest_options') or {} + + test_cases = [] + for test_info in scope_config['tests']: + items = pytest_runner.collect(test_info['path'], seed) + local_pytest_options = test_info.get('pytest_options') or {} + pytest_options = (global_pytest_options | local_pytest_options) or None + + for item in items: + test_cases.append(TestCase.from_canonical_name( + scope_config['tests_path'], + item, + seed, + pytest_options + )) + test_cases = list(dict.fromkeys(test_cases)) + + with self.scope_file.edit() as scope: + scope['scope'] = scope_config['scope'] + scope['seed'] = seed + scope['tests'] = test_cases + + def start(self): + self.__parse_config() + self.observer = Observer() + self.observer.schedule(EventHandler(self.__parse_config), + self.scope_config_path, recursive=True) + self.observer.start() + + def stop(self): + self.observer.join() + +def daemonize(enabled): + if enabled: + print("Starting daemon") + logger_files = [handler.stream.fileno() for handler in logging.root.handlers] + return daemon.DaemonContext(working_directory=os.getcwd(), files_preserve=logger_files) + else: + print("Starting in non-daemon mode") + return contextlib.suppress() + +if __name__ == '__main__': + setproctitle('runnerd') + parser = argparse.ArgumentParser() + parser.add_argument('--debug', action='store_true') + parser.add_argument('--no-daemon', action='store_true') + parser.add_argument('--version', action='version', version='Superrunner4000 v0.2') + args = parser.parse_args() + + log.setLevel((logging.INFO, logging.DEBUG)[args.debug]) + + if not os.path.isdir("meta"): + print("Scope is not initialized in this directory!") + exit(1) + + try: + lock = filelock.FileLock("meta/daemon.lock").acquire(timeout=0.01) + except filelock.Timeout: + print("Another instance of 'runnerd' demon is active!") + exit(1) + + print("Preparing scope") + scope_parser = ScopeParser() + scope_parser.start() + with daemonize(not args.no_daemon): + runner = MasterRunner() + runner.run()