-
Notifications
You must be signed in to change notification settings - Fork 9
/
runner.py
73 lines (64 loc) · 2.68 KB
/
runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import typer
import pathlib
import datetime
from typing_extensions import Annotated
from experiment import get_results
from workloads import WORKLOAD_GROUPS
from prov_collectors import PROV_COLLECTOR_GROUPS
from stats import STATS
from util import flatten1
import enum
CollectorGroup = enum.Enum("CollectorGroup", {key: key for key in PROV_COLLECTOR_GROUPS}) # type: ignore
WorkloadGroup = enum.Enum("WorkloadGroup", {key.lower(): key.lower() for key in WORKLOAD_GROUPS.keys() if key}) # type: ignore
StatsNames = enum.Enum("Sats", {key: key for key in STATS.keys()}) # type: ignore
def main(
collector_groups: Annotated[list[CollectorGroup], typer.Option("--collectors", "-c")] = [CollectorGroup.noprov],
workload_groups: Annotated[list[WorkloadGroup], typer.Option("--workloads", "-w")] = [WorkloadGroup["fast"]],
stats_names: Annotated[list[StatsNames], typer.Option("--stats", "-s")] = [],
iterations: int = 1,
seed: int = 0,
rerun: Annotated[bool, typer.Option("--rerun")] = False,
ignore_failures: Annotated[bool, typer.Option("--keep-going")] = False,
parallelism: int = 1,
) -> None:
"""
Run a full matrix of these workloads in those provenance collectors.
This program writes intermediate results to disk, so if it gets interrupted part
way through, it can pick back up where it left of last time. However, if you really
want to ignore prior runs, pass `--rerun`.
"""
collectors = list(flatten1([
PROV_COLLECTOR_GROUPS[collector_name.value]
for collector_name in collector_groups
]))
workloads = list(flatten1([
WORKLOAD_GROUPS[workload_name.value]
for workload_name in workload_groups
]))
stats = [
STATS[stats_name.value]
for stats_name in stats_names
]
if not collectors:
raise ValueError("Must select some collectors")
if not workloads:
raise ValueError("Must select some workloads")
with pathlib.Path("runner.log").open("a+") as file:
print("Starting", datetime.datetime.now().isoformat(), file=file)
print(", ".join(collector_group.value for collector_group in collector_groups), file=file)
print(", ".join(workload_group.value for workload_group in workload_groups), file=file)
df = get_results(
collectors,
workloads,
iterations=iterations,
seed=0,
ignore_failures=ignore_failures,
rerun=rerun,
parallelism=parallelism,
)
with pathlib.Path("runner.log").open("a+") as file:
print("Done", datetime.datetime.now().isoformat(), file=file)
for stat in stats:
stat(df)
if __name__ == "__main__":
typer.run(main)