-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcheck.py
118 lines (110 loc) · 4.32 KB
/
check.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import common
import sqlite3
import os
import sys
import shutil
# Collect paths of runs
def collect_runs():
runs = set()
api_dirs = os.scandir('./results')
for api_dir in api_dirs:
if api_dir.is_dir():
tool_dirs = os.scandir(api_dir)
for tool_dir in tool_dirs:
if tool_dir.is_dir():
run_dirs = os.scandir(tool_dir)
for run_dir in run_dirs:
runs.add(run_dir.path)
return runs
# Count code coverage samples
def count_coverage_samples(run):
files = os.listdir(f"{run}{common.CODE_COVERAGE_PATH}")
exec_count = 0
csv_count = 0
for file in files:
if file.endswith('.exec'):
exec_count += 1
elif file.endswith('.csv'):
csv_count += 1
return exec_count, csv_count
# Perform integrity analysis
def analyze():
runs = collect_runs()
print(f"Analyzing {len(runs)} runs.")
analized = {}
for run in runs:
# Init dictionary to store results
result = {}
# Verify started.txt exists
result['started'] = os.path.exists(f"{run}/started.txt")
# Verify completed.txt exists
result['completed'] = os.path.exists(f"{run}/completed.txt")
# Verify database exists
result['db'] = os.path.exists(f"{run}/{common.DB_FILENAME}")
# Verify interactions table exists
if result['db']:
conn = sqlite3.connect(f"{run}/{common.DB_FILENAME}")
cursor = conn.cursor()
result['interactions_table'] = int(cursor.execute("SELECT COUNT(1) FROM sqlite_master WHERE type='table' AND name = 'interactions'").fetchone()[0]) > 0
else:
result['interactions_table'] = False
# Verify at least 10_000 requests have been sent
if result['interactions_table']:
if "genome-nexus" in run and "schemathesis" in run:
result['requests'] = cursor.execute('SELECT COUNT(1) FROM interactions').fetchone()[0] >= 3_000
else:
result['requests'] = cursor.execute('SELECT COUNT(1) FROM interactions').fetchone()[0] >= 8_000
else:
result['requests'] = False
# Verify requests span is at least 3500 seconds (almost 1h)
if result['interactions_table'] and result['requests']:
result['time_span'] = cursor.execute('SELECT MAX(request_timestamp) - MIN(request_timestamp) FROM interactions').fetchone()[0] >= 3200
else:
result['time_span'] = False
# Verify code coverage dir exists
result['coverage_dir'] = os.path.exists(f"{run}{common.CODE_COVERAGE_PATH}")
# Verify all code coverage samples exist
if result['coverage_dir']:
exec_count, csv_count = count_coverage_samples(run)
result['exec_count'] = exec_count >= 730
result['csv_count'] = csv_count >= 730
result['exec_csv_match'] = exec_count == csv_count
else:
result['exec_count'] = False
result['csv_count'] = False
result['exec_csv_match'] = False
# Add result to retured dictionary
analized[run] = result
# Print if somethind is wrong
for key in result:
if result[key] == False:
print(f" => Something wrong in {run}: {result}")
break
return analized
# Removes runs with problems
def clean(runs):
for run in runs:
shutil.rmtree(run, ignore_errors=True)
print("Removed.")
if __name__ == '__main__':
common.welcome()
print("This is the verify_runs module. It will check the integrity of runs.")
print("[1] Analyze runs")
print("[2] Analyze runs and remove wrong ones")
choice = input("Your choice: ")
if choice != '1' and choice != '2':
print("Invalid choice!")
sys.exit(1)
analyzed = analyze()
with_problems = []
for run in analyzed:
for key in analyzed[run]:
if analyzed[run][key] == False:
with_problems.append(run)
break
if len(with_problems) == 0:
print("No runs with problems found.")
else:
if choice == '2':
input(f"Are you sure you want to delete {len(with_problems)}/{len(analyzed)} runs with problems? Press ENTER to continue, or CTRL+C to cancel...")
clean(with_problems)