forked from apluslms/grade-c
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.py
317 lines (265 loc) · 12.6 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
#!/usr/bin/env python3
import os
import traceback
import argparse
from time import perf_counter
from pathlib import Path
import signal
from contextlib import contextmanager
from importlib.util import spec_from_file_location, module_from_spec
from typing import Type
from util import read_env
from beautify import Beautify
from report_parser import Report
from util import grading_script_error, Failed, run, process_output, read_env, RunnerBase, load_list
def give_points(points, max_points):
if max_points is None:
max_points = 1
with open("/feedback/points", "w") as f:
f.write(f"{round(points)}/{max_points}")
class Grader:
multiplicative_types = ["multiplicative", "mult", "m"]
cumulative_types = ["cumulative", "cum", "c"]
def __init__(self, max_points, penalty_type = "multiplicative"):
self.max_points = max_points
self.fraction = 1 # fraction of points given due to penalties
self.points = 0
if penalty_type in Grader.multiplicative_types:
self.penalty_type = "m"
elif penalty_type in Grader.cumulative_types:
self.penalty_type = "c"
else:
raise Exception(f"Unknown penalty_type. Allowed types: {', '.join(Grader.multiplicative_types + Grader.cumulative_types)}")
self.compile_output = ""
self.valgrind_output = None
self.render = None
self.penalties = {}
def setPoints(self, points, max_points = None):
if max_points is not None:
self.max_points = max_points
self.points = min(self.max_points, points)
def addPoints(self, points, max_points = None):
if max_points is None:
max_points = self.max_points
else:
max_points = self.max_points + max_points
self.setPoints(self.points + points, max_points)
def addPenalty(self, name, penalty):
if self.penalty_type == "m":
self.fraction = max(0, min(1, self.fraction*(1-penalty)))
else:
self.fraction = max(0, min(1, self.fraction-penalty))
if name == "warning":
self.penalties["Warnings in compilation"] = penalty
elif name == "valgrind":
self.penalties["Valgrind warnings/errors"] = penalty
else:
self.penalties[name] = penalty
def __enter__(self):
return self
def __exit__(self, typ, value, traceback):
if grader.valgrind_output and len(grader.valgrind_output) > 0 and "valgrind" in config["penalties"]:
grader.addPenalty("valgrind", config["penalties"]["valgrind"])
if typ is None:
if self.render is not None:
try:
with open("/feedback/out", "w") as f:
f.write(self.render(compile_output=grader.compile_output, valgrind_output=grader.valgrind_output, penalties=self.penalties).replace("\0", "\1"))
except Exception as e:
value = Failed("Error rendering test output. Please contact course staff if this persists.", f"Error rendering test output. (rendering Beautify)\n{str(e)}:\n{traceback.format_exc()}")
typ = Failed
else:
value = Failed("Error rendering test output. Please contact course staff if this persists.", f"Error rendering test output. (beautify is None)")
typ = Failed
if typ is not None:
if typ != Failed:
grading_script_error(f"{str(value)}:\n{traceback}")
with open("/feedback/out", "w") as f:
f.write("Error in grading script. Please contact course staff if this persists.")
else:
grading_script_error(value.error)
with open("/feedback/out", "w") as f:
f.write(value.msg)
self.points = 0
elif self.max_points is None:
grading_script_error("max_points is None")
self.points = 0
give_points(self.points*self.fraction, self.max_points)
# the subprocess timeout doesn't seem to work correctly for long timeouts
# so we use signals
def timeout_handler(signum, frame):
raise Failed("Submission timed out. Check that there are no infinite loops", f"Submission timed out.\ntimeout: {config['timeout']}")
signal.signal(signal.SIGALRM, timeout_handler)
@contextmanager
def Timeout(timeout):
if timeout is None:
signal.alarm(0)
elif timeout == 0:
timeout_handler(None, None)
else:
signal.alarm(timeout)
try:
yield
finally:
signal.alarm(0)
def has_warning(process):
in_stdout = process.stdout is not None and ": warning:" in process.stdout.decode('utf-8')
in_stderr = process.stderr is not None and ": warning:" in process.stderr.decode('utf-8')
return in_stdout or in_stderr
config = {
"runner": "/gcheck/run.py",
"penalty_type": "multiplicative",
"max_points": None,
"penalties": {
},
"valgrind": False,
"valgrind_options": [
"--track-origins=yes",
"--leak-check=full",
],
"timeout": 180,
}
try:
if Path("/exercise/gcheck.yaml").exists():
import yaml
with open("/exercise/gcheck.yaml") as f:
config.update(yaml.safe_load(f))
elif Path("/exercise/gcheck.json").exists():
import json
with open("/exercise/gcheck.json") as f:
config.update(json.load(f))
except Exception as e:
grading_script_error(f"Failed to read gcheck config file:\n{traceback}")
with open("/feedback/out", "w") as f:
f.write("Error in grading script. Please contact course staff if this persists.")
give_points(0, 1)
exit(0)
def absolute_path(path, default_root):
path = Path(path)
if path.is_absolute():
return path
else:
return default_root / path
def import_runner(path) -> Type[RunnerBase]:
path = Path(path)
module_name = path.stem
spec = spec_from_file_location(module_name, path)
if not spec:
raise Failed("Problem with exercise configuration. Please contact course staff.", "Runner source file not found")
module = module_from_spec(spec)
spec.loader.exec_module(module)
return getattr(module, "Runner")
with Grader(config["max_points"], config["penalty_type"]) as grader:
runner_cls = import_runner(config["runner"])
runner = runner_cls()
env = read_env("/compile.env")
env = runner.get_env(env)
testsources = []
if "testsource" in config:
paths = load_list(config, "testsource")
testsources = [str(absolute_path(f, "/exercise")) for f in paths]
if len(testsources) == 0 or "testsourcedir" in config:
paths = load_list(config, "testsourcedir", ["/exercise"])
for path in paths:
path = absolute_path(path, "/exercise")
for file in os.listdir(path):
if file.endswith(".cpp") or file.endswith(".c"):
file = path / file
if file.is_file():
testsources.append(str(file))
if not runner.allow_zero_test_sources and not testsources:
raise Failed("Problem with exercise configuration. Please contact course staff.", "No test sources found. Make sure the exercise config is correct.")
testsources_c = [f for f in testsources if f.endswith(".c")]
testsources_cpp = [f for f in testsources if f.endswith(".cpp")]
includedirs = load_list(config, "includedirs")
includedirs = [d if Path(d).is_absolute() else "/exercise/" + d for d in includedirs]
includedirs = ["-I" + d for d in includedirs]
submission_files = []
for dirpath, dirnames, filenames in os.walk("/submission/user"):
path = Path(dirpath)
submission_files.extend(str(path / f) for f in filenames)
submission_files_c = [f for f in submission_files if f.endswith(".c")]
submission_files_cpp = [f for f in submission_files if f.endswith(".cpp")]
add_flags = runner.additional_flags(env, config)
TESTCPPFLAGS = load_list(config, "TESTCPPFLAGS", ["-c"] + add_flags.get("TESTCPPFLAGS", []) + includedirs)
TESTCFLAGS = load_list(config, "TESTCFLAGS", add_flags.get("TESTCFLAGS", []), env['TESTCFLAGS'])
TESTCXXFLAGS = load_list(config, "TESTCXXFLAGS", add_flags.get("TESTCXXFLAGS", []))
CPPFLAGS = load_list(config, "CPPFLAGS", ["-c"] + add_flags.get("CPPFLAGS", []) + includedirs, env['CPPFLAGS'])
CFLAGS = load_list(config, "CFLAGS", add_flags.get("CFLAGS", []) + [], env['CFLAGS'])
CXXFLAGS = load_list(config, "CXXFLAGS", add_flags.get("CXXFLAGS", []), env['CXXFLAGS'])
LDFLAGS = load_list(config, "LDFLAGS", add_flags.get("LDFLAGS", []), env['LDFLAGS'])
LDLIBS = load_list(config, "LDLIBS", add_flags.get("LDLIBS", []), env['LDLIBS'])
if not any(p.strip().startswith("-std=") for p in TESTCXXFLAGS):
TESTCXXFLAGS = TESTCXXFLAGS + ["-std=c++17"]
if not any(p.strip().startswith("-std=") for p in TESTCFLAGS):
TESTCFLAGS = TESTCFLAGS + ["-std=c99"]
if not any(p.strip().startswith("-std=") for p in CXXFLAGS):
CXXFLAGS = CXXFLAGS + ["-std=c++17"]
if not any(p.strip().startswith("-std=") for p in CFLAGS):
CFLAGS = CFLAGS + ["-std=c99"]
compile_error = False
compile_warning = False
COBJECTS = []
for cfile in submission_files_c:
cmd, process = run(["gcc", *CPPFLAGS, *CFLAGS, cfile, "-o", cfile[:-2] + ".o"])
compile_error = compile_error or process.returncode != 0
compile_warning = compile_warning or has_warning(process)
grader.compile_output += cmd + "\n"
grader.compile_output += process_output(process)
COBJECTS.append(cfile[:-2] + ".o")
CPPOBJECTS = []
for cppfile in submission_files_cpp:
cmd, process = run(["g++", *CPPFLAGS, *CXXFLAGS, cppfile, "-o", cppfile[:-4] + ".o"])
compile_error = compile_error or process.returncode != 0
compile_warning = compile_warning or has_warning(process)
grader.compile_output += cmd + "\n"
grader.compile_output += process_output(process)
CPPOBJECTS.append(cppfile[:-4] + ".o")
TESTCOBJECTS = []
for cfile in testsources_c:
outfile = cfile[:-2] + ".o"
outfile = "/submission/user/" + str(Path(outfile).name)
cmd, process = run(["gcc", *TESTCPPFLAGS, *TESTCFLAGS, cfile, "-o", outfile])
compile_error = compile_error or process.returncode != 0
compile_warning = compile_warning or has_warning(process)
grader.compile_output += cmd + "\n"
grader.compile_output += process_output(process)
TESTCOBJECTS.append(outfile)
TESTCPPOBJECTS = []
for cppfile in testsources_cpp:
outfile = cppfile[:-4] + ".o"
outfile = "/submission/user/" + str(Path(outfile).name)
cmd, process = run(["g++", *TESTCPPFLAGS, *TESTCXXFLAGS, cppfile, "-o", outfile])
compile_error = compile_error or process.returncode != 0
compile_warning = compile_warning or has_warning(process)
grader.compile_output += cmd + "\n"
grader.compile_output += process_output(process)
TESTCPPOBJECTS.append(outfile)
cmd, process = run(["g++", *TESTCPPOBJECTS, *TESTCOBJECTS, *CPPOBJECTS, *COBJECTS, *LDFLAGS, *LDLIBS, "-o", "test"])
compile_error = compile_error or process.returncode != 0
compile_warning = compile_warning or has_warning(process)
grader.compile_output += cmd + "\n"
grader.compile_output += process_output(process)
if compile_warning and "warning" in config["penalties"]:
grader.addPenalty('warning', config['penalties']['warning'])
if not compile_error:
grader.render = runner.render
if config["valgrind"]:
valgrind_filename = "/valgrind_out.txt"
with Timeout(config["timeout"]):
output = runner.run(["valgrind", "-q", "--trace-children=yes", "--log-file=" + valgrind_filename] + config["valgrind_options"] + ["./test"], config, grader.max_points)
try:
with open(valgrind_filename, 'r') as f:
grader.valgrind_output = f.read()
except Exception as e:
raise Failed("Error opening valgrind output. Please contact course staff if this persists.", f"Error opening valgrind output.\n{str(e)}:\n{traceback.format_exc()}")
else:
with Timeout(config["timeout"]):
output = runner.run(["./test"], config, grader.max_points)
grader.compile_output += output + "\n"
grader.setPoints(runner.points(), runner.max_points())
else:
def default_renderer(**kwargs):
beautify = Beautify(Report(), "/gcheck/templates")
return beautify.render("all.html", beautify=beautify, **kwargs)
grader.render = default_renderer