-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmoss
executable file
·1014 lines (809 loc) · 37.4 KB
/
moss
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Script that wraps running MOSS perl script with the file structure created by provide.
It provides a nicer interface to running MOSS by internally building a MOSS command, identifying
the proper submissions to include from Tufts grading directories, and saves the results in
a more organized manner. Saving the results should not be underrated. It allows one to access
the results after MOSS expiration or in the (surprisngly common) instance when the MOSS
results server(s) go down.
This script should have execute permissions if you wish to run it with ./moss. However,
it can just be run with python moss.
Version: 5.3.2024
See https://github.com/ChamiLamelas/moss-wrapper for the most up to date version and more information.
Author: Chami Lamelas (slamel01)
Date: Fall 2022 - Present
"""
import subprocess
import inspect
from datetime import datetime, timedelta
from pytz import timezone
import argparse
import os
from collections import defaultdict
import shutil
import logging
from pathlib import Path
import time
import importlib
import sys
import requests
from math import ceil
import grp
import magic
from dataclasses import dataclass, field, fields
from typing import List
import traceback
import stat
import pwd
# toml module (from PyPI) is used in place of tomllib (Python standard library) as tomllib
# will only be available in Python 3.11.x and Tufts server runs Python 3.9.2.
import toml
# Script descriptions ...
DESCRIPTION = (
"This script is used for running Stanford's MOSS plagiarism detection tool."
)
EPILOG = "Please visit https://github.com/ChamiLamelas/moss-wrapper to learn how to use this script."
@dataclass
class MOSSConfig:
"""Stores configuration for this wrapper"""
name: str = None
base: str = None
output: str = os.getcwd()
curr_sem: str = None
assn_dirs: List[str] = field(default_factory=list)
download: bool = False
match_formatter: str = None
submission_filter: str = None
custom_file: str = None
job: str = None
submissions_to_collect: int = 250
threshold_for_repeated_code: int = 10
language: str = "cc"
source_extensions: List[str] = field(default_factory=lambda: [".h", ".cpp"])
source_file_types: List[str] = field(
default_factory=lambda: ["C source", "C++ source"]
)
required_groups: List[str] = field(default_factory=lambda: ["grade15", "ta15"])
def __repr__(self):
"""
Displays configuration like:
name:
blah ...
base:
blah ...
etc.
"""
return "\n".join(
f"{f.name}:\n\t{getattr(self, f.name)}" for f in fields(MOSSConfig)
)
def __str__(self):
return repr(self)
class MOSSError(Exception):
"""Error class raised by our code"""
pass
class MOSSMatch:
"""Represents a MOSS match"""
def __init__(self):
self.url = None
self.person1 = None
self.person2 = None
self.person1_perc = None
self.person2_perc = None
self.lines_matched = None
# Nice time format
NICE_TIME_FMT = "%m/%d/%Y %I:%M %p"
# Settings from old Tufts grade utility ...
GRADE_TA_EDIT_DIR = "grading"
GRADE_SUFFIX = ".bak"
# Relative name of temporary folder created by moss to transfer over bak files
TEMP_MOSS_DIR = ".tmp_moss_dir"
# Various MOSS script settings ...
MOSS_EXPIRATION_TIME_DAYS = 14
# Log file name
LOG_BASENAME = "moss.log"
CONFIG_COPY_BASENAME = "moss.config"
URL_BASENAME = "moss.url"
# Formatting settings for matches file
MATCHES_HEADERS = [
"Person1",
"Person2",
"Person1_Match_Perc",
"Person2_Match_Perc",
"Lines_Matched",
"URL",
"Results_Subfolder",
]
MATCHES_DELIMITER = "\t"
MATCHES_BASENAME = "matches.tsv"
# Name for main file in match
INDEX_BASENAME = "OPENME.html"
# MOSS URL search string
URL_SEARCH_QUERY = "moss.stanford.edu"
# Non instance method helpers -- more general functions
def get_permissions_info(path):
"""
Get permissions information (file permissions,
owner name, group name) on a file.
"""
file_stats = os.stat(path)
file_permissions = stat.filemode(file_stats.st_mode)
owner_id = file_stats.st_uid
group_id = file_stats.st_gid
# A KeyError is raised in the event you cannot get a username
# from a UID on Linux (same for group name from GID). It looks
# like EECS staff may remove UTLNs from files on Halligan because
# if you look at super old directories, you will see that the user
# names (UTLNs) have been replaced with UIDs.
try:
owner_name = pwd.getpwuid(owner_id).pw_name
except KeyError:
owner_name = owner_id
try:
group_name = grp.getgrgid(group_id).gr_name
except KeyError:
group_name = group_id
return file_permissions, owner_name, group_name
def is_importable_python_file(filepath):
"""Checks if file is a Python file one can import from"""
if not filepath.endswith(".py"):
raise MOSSError(f"To import from {filepath}, it must end with .py")
if "text" not in magic.from_file(filepath).lower():
raise MOSSError(f"{filepath} is not a text file and likely not a Python file")
def convert_user_specified_path(path):
"""Converts path specified by user (e.g. including ~) into a real path"""
return os.path.expanduser(path).rstrip(os.sep)
def file_exists(filepath, errmsg=None):
"""Enforces that filepath is a file"""
if errmsg is None:
errmsg = f"{filepath} is not a file!"
if not os.path.isfile(filepath):
raise MOSSError(errmsg)
def dir_exists(dirpath):
"""Enforces that dirpath is a directory"""
if not os.path.isdir(dirpath):
raise MOSSError(f"{dirpath} is not a directory!")
def replace_path_seps(e):
"""Replaces the path separators in a string so it could be a folder name"""
return e.replace(os.sep, "_")
def alwaystrue(_):
"""Function that always returns True"""
return True
def get_func_byname(module, func_name, substitute):
"""Gets function object by name from module object, errors are thrown if a function with that name cannot be created"""
if func_name is None or not hasattr(module, func_name):
return substitute
func = getattr(module, func_name)
if not inspect.isfunction(func):
return substitute
# Note this does not check if the function takes a single parameter, but we assume that a user could
# figure this out from the traceback who is comfortable enough with Python to write a custom function
# to pass in here
return func
def get_config_file():
"""Gets the configuration filename from the command line (a required argument) and sets up help info."""
parser = argparse.ArgumentParser(description=DESCRIPTION, epilog=EPILOG)
parser.add_argument("config", type=str, help="config file")
args = parser.parse_args()
file_exists(args.config)
return args.config
def check_and_make(dir_path):
"""If dir_path is not a directory, make it"""
Path(dir_path).mkdir(exist_ok=True, parents=True)
def get_submissions(assn_dir):
"""
Gets only the students most recent submissions from a directory if
it's a provide directory. Directories with no period or a period followed
by not a number are also deemed directories (e.g. can be used with a
folder of githubs)
"""
# Here we use a defaultdict to make max( ) easier when determining
# latest provide submission for a student
sub_map = defaultdict(lambda: 0)
for entry in os.scandir(assn_dir):
# Skip files and symlinks
if not entry.is_dir():
continue
period_idx = entry.name.find(".")
if period_idx >= 0:
# Identified a period, try to parse an integer after it
# and then update our max value for submission number for
# this UTLN
try:
utln = entry.name[:period_idx]
sub_num = int(entry.name[period_idx + 1 :])
sub_map[utln] = max(sub_map[utln], sub_num)
except ValueError:
# Need to do this as we loop over sub_map.items() below
# Note, we save the entire name here not just the UTLN
# because if there is no integer that follows then we
# know it's not a UTLN (e.g. could be an email
# as with our hitme system)
sub_map[entry.name] = 0
else:
# Need to do this as we loop over sub_map.items() below
sub_map[entry.name] = 0
# In sub_map, directories not identified as provide submission directories will have
# value 0 indicating they should just be treated as regular folders, where with provide
# submission folders the key is a UTLN that we need to combine with the max submission num
return [
os.path.join(assn_dir, k if v == 0 else f"{k}.{v}") for k, v in sub_map.items()
]
def make_nice_time(secs):
"""Turns integer seconds into xm ys"""
return str(timedelta(seconds=ceil(secs)))
def get_last_line_containing(filelines, str):
"""Get last line in file containing string"""
lines = filelines.splitlines()
for line in lines[::-1]:
if str in line:
return line.rstrip("\n")
return None
class MOSSRunner:
"""Main class of the script - instantiate one and call run()"""
def __init__(self, config_file):
"""Initializes the runner from configuration file path"""
self.config_file = config_file
self.__parse_toml()
self.__check_groups()
# curr_sem must always be provided, and since its a user provided path we make sure
# that ~ is expanded into user home directory; we also strip / from the end because
# we compare curr_sem with match directories to see if we should download
# them (see __should_download( )) -- match directories are identified with
# os.path.dirname( ) which strips the ending /
if self.config.curr_sem is None:
raise MOSSError("curr_sem is a required configuration setting")
self.config.curr_sem = convert_user_specified_path(self.config.curr_sem)
if self.config.job is not None:
self.__load_jobmode_config()
self.run_jobmode = True
else:
self.__load_nonjobmode_config()
self.run_jobmode = False
# In both job and non-job modes, load up the custom functions
self.__load_custom()
self.__setup_logger()
self.logger.debug(f"Loaded config:\n{self.config}")
def run(self):
"""
Actually does the bulk work. It either will run a job or download
the results of a previous job.
"""
if not self.run_jobmode:
# Handy to print this so user can cd into it
self.logger.info(f"Job folder is {self.config.job}")
self.logger.info("Constructing moss.pl command ...")
self.moss_cmd = ["perl", "moss.pl"]
self.__add_html_header()
self.__set_lang()
self.__set_other_moss_options()
self.__add_basefiles()
self.__add_assignment_directory_settings()
self.logger.info("moss.pl command constructed.")
self.__run_nonjob_mode()
else:
# Load url from URL file to do the download
urlfilepath = self.__get_url_filename()
file_exists(
urlfilepath,
f"URL file does not exist -- are you sure that {self.config.job} is a job folder?",
)
self.url = get_last_line_containing(
Path(urlfilepath).read_text(), URL_SEARCH_QUERY
)
if self.url is None:
raise MOSSError(f"Could not find URL in {urlfilepath}")
self.__download_results()
# Helper functions for loading configuration ...
def __load_jobmode_config(self):
"""If user has specified we are in job mode, we use this to load config"""
self.config.job = convert_user_specified_path(self.config.job)
dir_exists(self.config.job)
def __load_nonjobmode_config(self):
"""If user has specified we are running a job, we use this to load config"""
self.config.assn_dirs = list(
map(convert_user_specified_path, self.config.assn_dirs)
)
if self.config.curr_sem in self.config.assn_dirs:
raise MOSSError(f"curr_sem should not be in assn_dirs")
for d in self.config.assn_dirs:
dir_exists(d)
self.config.base = convert_user_specified_path(self.config.base)
dir_exists(self.config.base)
self.config.output = convert_user_specified_path(self.config.output)
check_and_make(self.config.output)
# Make job folder here, b/c we're going to put log into that
# Need to get submission time before that (used in job folder name construction)
self.sub_time = datetime.now(timezone("US/Eastern"))
self.config.job = self.__job_folder_name()
os.mkdir(self.config.job)
for e in self.config.source_extensions:
if not e.startswith("."):
raise MOSSError(f"all extensions must start with a '.', {e} does not")
def __parse_toml(self):
"""Load in config into dataclass object"""
try:
self.config = MOSSConfig(**toml.load(self.config_file))
except TypeError as e:
errmsg = str(e)
setting = errmsg[errmsg.index("'") : errmsg.rindex("'") + 1]
raise MOSSError(f"{setting} is not a valid configuration setting")
def __check_groups(self):
"""Makes sure each of required groups is in user's groups"""
users_groups = {grp.getgrgid(id).gr_name for id in os.getgroups()}
for group in self.config.required_groups:
if group not in users_groups:
raise MOSSError(f"you do not belong to required group {group}")
def __load_custom(self):
"""Loads custom functions from provided file into class attributes to be used later"""
if self.config.custom_file is not None:
# CUSTOMS_FILE is user provided path so we again expand ~ to home dir and make sure
# file exists and is a Python file
self.config.custom_file = convert_user_specified_path(
self.config.custom_file
)
file_exists(self.config.custom_file)
is_importable_python_file(self.config.custom_file)
# Add python file to path and then import the module (assumed to be x if CUSTOM_PATH is PATH/TO/x.py)
sys.path.append(os.path.dirname(self.config.custom_file))
module_name = os.path.basename(self.config.custom_file)[: -len(".py")]
custom_module = importlib.import_module(module_name)
# Now, load the functions themselves using helper -- substitute defaults if the function can't be
# found (remember this can be entered if only 1 of SUBMISSION_FILTER, MATCH_FORMATTER is specified)
self.match_formatter = get_func_byname(
custom_module, self.config.match_formatter, replace_path_seps
)
self.submission_filter = get_func_byname(
custom_module, self.config.submission_filter, alwaystrue
)
else:
self.match_formatter = replace_path_seps
self.submission_filter = alwaystrue
# Helper functions for logging ...
def __setup_logger(self):
"""Sets up logger - basically copy pasted from the Python logging cookbook"""
self.log = os.path.join(self.config.job, LOG_BASENAME)
self.logger = logging.getLogger("moss")
self.logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(self.log)
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
file_formatter = logging.Formatter(
"%(asctime)s | [%(levelname)s] : %(message)s"
)
fh.setFormatter(file_formatter)
console_formatter = logging.Formatter("[%(levelname)s] : %(message)s")
ch.setFormatter(console_formatter)
self.logger.addHandler(fh)
self.logger.addHandler(ch)
# Helper functions for building MOSS command ...
def __should_collect(self, filepath):
"""
Function that determines whether a file should be collected
into MOSS job it goes through a series of checks. This is
a crucial component of building the moss.pl command.
First, if the submission_filter denies the path, then
we don't do any further checks.
Second, the magic number file type (think Linux file command)
is checked to contain any of the source_file_types. This is
to protect against a student naming say x.cpp to x to hide
that it's a source file.
Third, the extension is checked. If it belongs to the set
of allowed extensions, the file is considered a candidate to
add. Otherwise, the file is rejected.
Lastly, the candidate is added if it is a text file with that extension
as source files are always considered text. This is to protect
against a student naming say a binary file or zip file as
x.cpp (probably by accident) which crashes MOSS.
"""
if not self.submission_filter(filepath):
return False
filetype = magic.from_file(filepath)
# File is not one of the desired types
if any(ft in filetype for ft in self.config.source_file_types):
return True
# Remove .bak if it's there before doing the extension check
if filepath.endswith(GRADE_SUFFIX):
filepath = filepath[: -len(GRADE_SUFFIX)]
period = filepath.rfind(".")
# File has no extension or it's extension isn't in set -> False
if period < 0 or filepath[period:] not in self.config.source_extensions:
return False
# Collect file if it's text -- empty files will be ignored by
# this as they have type 'empty'
return "text" in filetype
def __add_html_header(self):
"""Adds MOSS comment so we recognize resulting HTML - includes name, submission date + time"""
self.moss_cmd.extend(
[
"-c",
'"'
+ (f"{self.config.name}: " if self.config.name else "")
+ f"job submitted at {self.sub_time.strftime(NICE_TIME_FMT)} EST. "
+ self.__get_exp_time_msg()
+ '"',
]
)
def __set_lang(self):
"""Sets language option in MOSS"""
self.moss_cmd.extend(["-l", self.config.language])
def __set_other_moss_options(self):
"""Sets m, n in MOSS"""
self.moss_cmd.extend(
[
"-n",
str(self.config.submissions_to_collect),
"-m",
str(self.config.threshold_for_repeated_code),
]
)
def __add_basefiles(self):
"""Adds base files from base directory"""
if self.config.base is not None:
for path in self.__collect_files(self.config.base):
self.moss_cmd.extend(["-b", path])
def __add_assignment_directory_settings(self):
"""
Adds submission files from assignment directories.
In particular, it takes .h and .cpp files from the subdirectories of each assignment directory.
If there are multiple directories for the same student as returned by provide, such as
<name>.x where x = 1, 2, ... this only takes the latest submission. All non directories in
the assignment directories are ignored (e.g. time and log files created by provide).
"""
self.moss_cmd.append("-d")
# For each assignment directory (inc. CURR_SEM) add files from it
for ad in [self.config.curr_sem] + self.config.assn_dirs:
for sub in get_submissions(ad):
self.__add_sub_files(sub)
# Helper functions for __ad_assignment_directory_settings( ) ...
def __add_sub_files(self, sub_dir):
"""
Adds submission files from submission directory to MOSS command. This includes
dealing with files that were messed up by use of grade utility.
"""
if not os.path.isdir(os.path.join(sub_dir, GRADE_TA_EDIT_DIR)):
# If no grading folder inside, collect C++ files from this directory
# (see __collect_files for filtering) and add to command
self.moss_cmd.extend(self.__collect_files(sub_dir))
else:
# Temporary directory where we transfer grading/ files out of and
# rename to be .h, .cpp instead of .h.bak, .cpp.bak respectively
temp_dir = os.path.join(sub_dir, TEMP_MOSS_DIR)
# It's possible temp_dir existed if there was some unexpected error from a previous run of
# this script, so remove it before reconstructing it
if os.path.isdir(temp_dir):
shutil.rmtree(temp_dir)
os.mkdir(temp_dir)
# For each submission file found in grading/
for path in self.__collect_files(sub_dir, grade_mode=True):
# Identify path that .bak will be copied to (e.g. test/x.cpp.bak -> test/temp/x.cpp),
# then copy there and add path to copied file to MOSS command
copied_bak = os.path.join(
temp_dir, os.path.basename(path)[: -len(GRADE_SUFFIX)]
)
shutil.copy2(path, copied_bak)
self.moss_cmd.append(copied_bak)
# __Match helper functions ...
def __match_toline(self, match):
"""Converts a Match to a TSV line using provided match formatter"""
str_comps = [
self.match_formatter(match.person1),
self.match_formatter(match.person2),
str(match.person1_perc),
str(match.person2_perc),
str(match.lines_matched),
match.url,
self.__make_dirname(match),
]
return MATCHES_DELIMITER.join(str_comps)
def __make_dirname(self, match):
"""Makes directory name for a match using match formatter"""
return (
self.match_formatter(match.person1)
+ "_and_"
+ self.match_formatter(match.person2)
)
def __should_download(self, match):
"""Determines whether a match should be downloaded based on 1 of match participants being from directory of current sem"""
return (
os.path.dirname(match.person1) == self.config.curr_sem
or os.path.dirname(match.person2) == self.config.curr_sem
)
# Helper functions for running job ...
def __run_nonjob_mode(self):
"""Runs non job mode (i.e. when we actually have to run a job)"""
self.logger.info("Submitting MOSS job...")
# Save a copy of the config file user provided (in case they lose it)
shutil.copy2(
self.config_file, os.path.join(self.config.job, CONFIG_COPY_BASENAME)
)
# Run MOSS job and report how long it took
runtime, output = self.__run_and_monitor(self.moss_cmd)
self.logger.debug("MOSS job completion time: " + make_nice_time(runtime))
# Clean up the temporary directories before we check if URL exists
self.__cleanup_moss_folders()
# For a new job that we've just submitted, we look for it in the output
# of the moss.pl commend -- we don't want to read over the whole log file
# for which that output was appended to by __run_and_monitor
self.url = get_last_line_containing(output, URL_SEARCH_QUERY)
if self.url is None:
raise MOSSError(
f"Error occurred with this MOSS job - see {self.log} for details. If there is nothing there that is obvious, most likely the MOSS job timed out waiting for server response, so come back and try again later."
)
self.logger.info("Success! Results at " + self.url)
self.logger.debug(self.__get_exp_time_msg())
# Save URL and expiration to a separate file in job folder for easy user access
with open(self.__get_url_filename(), "w+", encoding="utf-8") as url_file:
url_file.write(f"{self.url}\n{self.__get_exp_time_msg()}\n")
# If user wanted to download -- download
if self.config.download:
self.__download_results()
def __download_results(self):
"""
Downloads MOSS results by first downloading index then parsing match links and downloading
each of those. Match information is saved to 2 csv files.
"""
# Make folder where matches will be
results_path = os.path.join(self.config.job, "results")
check_and_make(results_path)
# This is the path where GET downloads it to
index_path = os.path.join(results_path, "index.html")
# Download index with URLs of all the matches -- self.url guaranteed to be
# set whether it is called directly in __init__ or by __run_nonjob_mode
self.__download(self.url + "/index.html", index_path)
# Loads matches using helper __parse_line( ) inside <TABLE></TABLE> section of index.html
# this code is unstable to how Stanford decides to display MOSS results, but I'd be surprised
# if they change things any time soon...
self.matches = list()
with open(index_path, mode="r", encoding="utf-8") as f:
table_line = 0
in_table = False
for line in f:
if "<TABLE>" in line:
in_table = True
elif "</TABLE>" in line:
in_table = False
# Lines Matched is the column header in the line
# right after the <TABLE> line - we skip it hence
# __parse_line( ) has line_number starting from
# 2nd line after <TABLE>
elif in_table and "Lines Matched" not in line:
self.__parse_line(line, table_line)
table_line += 1
# Build matches.tsv and download them into their own folders
self.__order_matches()
self.__download_matches(results_path)
# Don't need the original HTML anymore (its links are not relative)
os.remove(index_path)
# Helper functions for __run_nonjob_mode( ) ...
def __job_folder_name(self):
"""Gets job directory path"""
basename = (
f"{self.config.name}_" if self.config.name is not None else ""
) + f'job_{self.sub_time.strftime("%Y%m%d_%H%M%S")}'
return os.path.join(self.config.output, basename)
def __cleanup_moss_folders(self):
"""Removes the temporary moss folders created in submissions if grade was utility used"""
# For each of the assignment directories, go through submissions that we would
# have looked into and remove temporary directories if they were created
for ad in [self.config.curr_sem] + self.config.assn_dirs:
for sub in get_submissions(ad):
temp_dir = os.path.join(sub, TEMP_MOSS_DIR)
if os.path.isdir(temp_dir):
shutil.rmtree(temp_dir)
# Helper functions for __download_results( ) ...
def __parse_line(self, line, line_number):
"""
Parses Match into matches from line based on line_number
line_number starts at 0 from second line after <TABLE> in index.html
=> line_number % 3 == 0 means current line is person 1
=> line_number % 3 == 1 means current line is person 2
=> line_number % 3 == 2 means current line has lines matched
"""
if line_number % 3 == 0:
# Beginning of match data, so we create a __Match
self.matches.append(MOSSMatch())
# Parse the line -- this is just hardcoded to parse the index HTML
# files from Stanford... nothing really clever here, just trying to get
# the data from the HTML line.. note that we strip the / here when
# storing the submission directories of people in the match (these
# will be the submission directories as given in assignment directories
# so match formatting is done later; but this is why we always strip ending
# path separators above for consistency)
url_person_split = line.index('">')
self.matches[-1].url = line[line.index('"') + 1 : url_person_split]
self.matches[-1].person1 = line[
url_person_split + 2 : line.index(" ", url_person_split)
].rstrip("/")
self.matches[-1].person1_perc = int(
line[line.index("(") + 1 : line.index("%")]
)
elif line_number % 3 == 1:
# Similar to if body - just hardcoded to parse their HTML
url_person_split = line.index('">')
self.matches[-1].person2 = line[
url_person_split + 2 : line.index(" ", url_person_split)
].rstrip("/")
self.matches[-1].person2_perc = int(
line[line.index("(") + 1 : line.index("%")]
)
else:
# Now that we have identified both people in the match we determine if we want to download
# it, if we don't we remove that __Match from our collectoin
if self.__should_download(self.matches[-1]):
self.matches[-1].lines_matched = int(
line[line.index(">") + 1 :].rstrip("\n")
)
else:
self.matches.pop()
def __order_matches(self):
"""Orders matches in descending order of lines matched and writes to TSV file"""
with open(
os.path.join(self.config.job, MATCHES_BASENAME),
mode="w+",
encoding="utf-8",
) as lf:
lines = [
self.__match_toline(m)
for m in sorted(
self.matches, reverse=True, key=lambda m: m.lines_matched
)
]
lf.write(
MATCHES_DELIMITER.join(MATCHES_HEADERS) + "\n" + "\n".join(lines) + "\n"
)
def __download_matches(self, results_path):
"""Downloads match files into results folder and appends results into stdout, stderr files."""
# matches only stores the matches we wanted to download as identified in parse_line
self.logger.info(f"Downloading {len(self.matches)} matches...")
for m in self.matches:
# Makes directory for it, downloads into it (using URL collected into __Match by parse_line), then renames HTMLs
match_dir = os.path.join(results_path, self.__make_dirname(m))
check_and_make(match_dir)
self.__download_match(m.url, match_dir)
self.__rename_htmls(match_dir)
def __rename_htmls(self, match_dir):
"""
Modifies the HTMLS in a MOSS download folder to have more easily accessible names:
matchX.html -> OPENME.html
matchX-0.html -> match-0.html
matchX-1.html -> match-1.html
matchX-top.html -> match-top.html
"""
# Identify matchX.html
index_file = [e.path for e in os.scandir(match_dir) if "-" not in e.name][0]
# Identify matchX from matchX.html
match_id = index_file[: index_file.rindex(".html")]
# Builds map of renaming shown above
rename_map = dict()
for e in os.scandir(match_dir):
if e.path == index_file:
rename_map[e.name] = INDEX_BASENAME
else:
# we need to join match_dir with e.path again here because e.path is
# relative to match_dir
rename_map[e.name] = e.path.replace(match_id, "match")
# For each file, update its contents so that any references to other files
# are updated to be their new names - then rename the files as well
for ko in rename_map:
path = os.path.join(match_dir, ko)
contents = Path(path).read_text()
for ki, vi in rename_map.items():
contents = contents.replace(ki, vi)
# for some reason, MOSS HTML files use relative paths in
# the matchX.html, matchX-0.html, and matchX-1.html files
# but they use (online) links in the matchX-top.html
# so here as a precaution in every file I make them relative
# by dropping the URL/
contents = contents.replace(self.url + "/", "")
Path(path).write_text(contents)
for k, v in rename_map.items():
os.rename(os.path.join(match_dir, k), os.path.join(match_dir, v))
def __download_match(self, match_url, match_dir):
"""Downloads all 4 match files into a directory"""
# Splicing index to splice off extension (note it's negative)
extension_idx = -len(".html")
# Gets:
# http://moss.stanford.edu/results/9/3599792119088/match21
# From:
# http://moss.stanford.edu/results/9/3599792119088/match21.html
url_up_to_extension = match_url[:extension_idx]
# Gets 'match21' from:
# http://moss.stanford.edu/results/9/3599792119088/match21.html
match_num = match_url[match_url.rindex(os.sep) + 1 : extension_idx]
# MOSS breaks up the display over 4 files that need to be downloaded
# in order to get properly working local HTML files
for sub in ["", "-0", "-1", "-top"]:
download_url = url_up_to_extension + sub + ".html"
destination_dir = os.path.join(match_dir, match_num + sub + ".html")
self.__download(download_url, destination_dir)
# Additional helper functions used within in multiple other helpers ...
def __get_exp_time_msg(self):
"""Gets a message on expiration time of a MOSS job submitted at submission time"""
return f"Estimated expiration time: {(self.sub_time + timedelta(days=MOSS_EXPIRATION_TIME_DAYS)).strftime(NICE_TIME_FMT)} EST"
def __collect_files(self, folder, grade_mode=False):
"""
Collects all files in a submission folder that should be included in a MOSS job.
Handles possibility of folder not being accessible and when grade
utility has been run on the submission folder
"""
if not os.access(folder, os.R_OK):
# Some folders on Tufts grading server are locked to certain people, we just skip
# them but warn the user
self.logger.warning(
f"Could not collect files from {folder} -- permission denied"
)
file_perms, owner, group = get_permissions_info(folder)
self.logger.warning(
f"permissions: {file_perms} owner: {owner} group: {group}"
)
return []
files = list()
for e in os.scandir(folder):
# Skip subdirs and symlinks always, if collecting files in
# a grade-affected folder, skip files that don't end with grade suffix
if not e.is_file() or (grade_mode and not e.name.endswith(GRADE_SUFFIX)):
continue
if not os.access(e.path, os.R_OK):
# Some files on Tufts grading server are locked to certain people, we just skip
# them but warn the user
self.logger.warning(
f"Could not collect file {e.path} -- permission denied"
)
file_perms, owner, group = get_permissions_info(e.path)
self.logger.warning(
f"permissions: {file_perms} owner: {owner} group: {group}"
)
continue
# Only keep files we deemed necessary to collect
if self.__should_collect(e.path):
files.append(e.path)
return files
def __run_and_monitor(self, cmd):
"""Runs a command in a child process, times it, and logs its stderr + stdout to log file"""
self.logger.debug(f"Running {' '.join(cmd)}")
ti = time.time()
result = subprocess.run(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
tf = time.time()
contents = result.stdout.decode("utf-8")
with open(self.log, mode="a") as file:
file.write(contents)
self.logger.debug(f"Finished")
return tf - ti, contents
def __get_url_filename(self):
"""Gets URL filepath in a job"""
return os.path.join(self.config.job, URL_BASENAME)
def __download(self, url, output_fp):
"""Downloads URL into output file"""
self.logger.debug(f"Running GET {url}")
response = requests.get(url)
if response.ok:
Path(output_fp).write_text(response.text)
self.logger.debug(f"Response saved to {output_fp}")
else:
self.logger.warning(
f"GET {url} failed. Status: {response.status_code} Reason: {response.reason}"
)
def main():
runner = None
try:
runner = MOSSRunner(get_config_file())
except MOSSError as e:
print(f"moss error in setup: {e}", file=sys.stderr)
except Exception as e:
print(f"unexpected error in setup: {e}", file=sys.stderr)
if runner is not None:
try:
runner.run()