-
Notifications
You must be signed in to change notification settings - Fork 5
/
visDQMReceiveDaemon
executable file
·481 lines (425 loc) · 16.7 KB
/
visDQMReceiveDaemon
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
#!/usr/bin/env python3
import os, os.path, time, sys, re, hashlib
import logging
from traceback import print_exc
from Monitoring.Core.Utils.Common import logme
from tempfile import mkstemp
from stat import *
from Monitoring.DQM import visDQMUtils
from functools import cmp_to_key
DROPBOX = sys.argv[1] # Directory where we receive input ("drop box").
FILEREPO = sys.argv[2] # Final file repository of original DQM files.
NEXT = sys.argv[3:] # Directories for the next agent in chain.
WAITTIME = 5 # Daemon cycle time.
# Regexp for save file name paths. We don't process anything else.
RXSAFEPATH = re.compile(r"^[-A-Za-z0-9_/]+\.root$")
# Regexp for .origin file contents.
RXORIGIN = re.compile(r"^md5:([0-9a-f]+) (\d+) (\S+)$")
# Regexp for acquisition era part of the processed dataset name.
RXERA = re.compile(r"^([A-Za-z]+\d+|CMSSW(?:_[0-9]+)+(?:_pre[0-9]+)?)")
# Location of the ROOT verification scripts.
CHECKDIR = os.path.normcase(os.path.abspath(__file__)).rsplit("/", 2)[0]
if os.access("%s/xdata/root/visDQMVerifyLoose.C" % CHECKDIR, os.R_OK):
CHECKDIR = "%s/xdata/root" % CHECKDIR
else:
CHECKDIR = "%s/data/root" % CHECKDIR
# --------------------------------------------------------------------
def cmp(a, b):
return (a > b) - (a < b)
def warnPath(info, path, msg):
logme("%s: %s", path, msg)
if isinstance(info, str):
try:
os.rename(info, "%s.bad" % info)
except:
pass
elif isinstance(info, dict):
try:
os.rename(info["origin"], "%s.bad" % info["origin"])
except:
pass
def current_umask():
val = os.umask(0)
os.umask(val)
return val
# Verify DQM file. This makes initial checks required to ensure
# the file is suitable for registration into the DQM GUI, enough
# to make a tentative ordering for file registration.
# path: path (relative to the uploads dir, coming from the walk) of the root file
# md5sum: md5sum as string (info coming from the origin file)
# size: size in bytes as int (info coming from the origin file)
# xpath: full path of the root file (info coming from the origin file)
# origin: path (relative) of the origin file
def verifyDQMFile(path, md5sum, size, xpath, origin):
# Verify the path name is completely safe
if not re.match(RXSAFEPATH, path):
warnPath(origin, path, "unsafe file path")
return None
# Verify we can read the file.
if not os.access(path, os.R_OK):
warnPath(origin, path, "no such file")
return None
# Verify it's a regular file.
info = os.lstat(path)
if not S_ISREG(info[ST_MODE]):
warnPath(origin, path, "not a file")
return None
# Verify the file size is as expected.
if size != info[ST_SIZE]:
warnPath(
origin,
path,
"size mismatch, expected %d, found %d bytes" % (size, info[ST_SIZE]),
)
return None
# Ask for file classification, and fail if it fails.
classification_ok, classification_result = visDQMUtils.classifyDQMFile(path)
if not classification_ok:
warnPath(origin, path, classification_result)
return None
else:
# OK for now, return result of the classification
classification_result["origin"] = origin
classification_result["import"] = path
classification_result["size"] = size
classification_result["md5sum"] = md5sum
classification_result["xpath"] = xpath
classification_result["time"] = info[ST_MTIME]
return classification_result
# --------------------------------------------------------------------
# Find new files. Look for specific ROOT file names with complete
# upload info (.origin file), verify file integrity, then process
# the files.
def findNewFiles():
new = []
for dir, subdirs, files in os.walk(DROPBOX):
for f in files:
# We are only interested in ROOT files.
if not f.endswith(".root"):
continue
# Locate the file and its upload info, and read the latter in.
# If we fail to do so, skip the file.
path = "%s/%s" % (dir, f)
origin = "%s.origin" % path
try:
m = None
with open(origin) as _f:
m = re.match(RXORIGIN, _f.read())
if not m:
continue
md5sum = m.group(1)
size = int(m.group(2))
xpath = m.group(3)
# path will be local to the dropbox, coming from the walk
# xpath will be the complete path, like found in the origin file
except:
continue
# If the file is ok, append it to the list of new files.
c = verifyDQMFile(path, md5sum, size, xpath, origin)
if c:
new.append(c)
return new
# Split a dataset name /PRIMDS/PROCDS/TIER to tuple parts,
# with additional ERA separated from the beginning of PROCDS.
def splitDataset(dataset):
(junk, primds, procds, tier) = dataset.split("/")
era = re.match(RXERA, procds)
return (primds, procds, tier, era and era.group(1))
# Check file object dataset for validity.
def checkDataset(info):
(primds, procds, tier, era) = splitDataset(info["dataset"])
if not era:
warnPath(info, info["import"], "failed to determine era")
return False
elif info["class"].startswith("relval_"):
if not era.startswith("CMSSW_"):
warnPath(
info,
info["import"],
"relval dataset era '%s' is not CMSSW release" % era,
)
return False
elif era.find("CMSSW") >= 0:
warnPath(info, info["import"], "CMSSW era '%s' but data is not relval" % era)
return False
info["era"] = era
info["primds"] = primds
info["procds"] = procds
info["tier"] = tier
return True
# --------------------------------------------------------------------
# Create uniquely versioned file name.
def assignUniqueVersion(info):
while True:
destpath = info["filepat"] % info["version"]
if not os.path.exists("%s/%s.dqminfo" % (FILEREPO, destpath)):
break
info["version"] += 1
info["path"] = destpath
# Order input files so we process them in a sane order:
# - descending by run
# - ascending by version
# - descending by release (if set)
# - descending by dataset
# - ascending by file name (= original version)
def orderFiles(a, b):
diff = b["runnr"] - a["runnr"]
if diff:
return diff
diff = a["version"] - b["version"]
if diff:
return diff
diff = cmp(b.get("release", ""), a.get("release", ""))
if diff:
return diff
diff = cmp(b["dataset"], a["dataset"])
if diff:
return diff
return cmp(a["import"], b["import"])
# --------------------------------------------------------------------
# Complete checking and other processing for one input file. This is
# the slowest code, mainly because we have to actually look into the
# file to make sure it's ok. So we want to make continuous process.
#
# This routine first completes the checks to verify the file is safe
# for importing into the DQM GUI. Files which this routine rejects
# should be treated as a potential hazard to the system, such as
# malicious content or a badly written file. The file is passed to
# subsequent processing only if all checks pass.
def finaliseOneFile(info):
path = info["import"]
# Verify the MD5 checksum matches
with open(path, "rb") as _f:
curmd5 = hashlib.md5(_f.read()).hexdigest()
if curmd5 != info["md5sum"]:
warnPath(
info,
path,
f"md5 checksum mismatch, expected [{info['md5sum']}], found [{curmd5}]",
)
return False
# Verify it's a ROOT file.
with open(path, "rb") as _f:
if _f.read(5) != b"root\x00":
warnPath(info, path, "not a root file")
return False
# Verify it's a properly structure ROOT file.
checklevel = "Loose"
if "online_data" in info["class"] and not info["subsystem"]:
checklevel = "Strict"
checkprog = "%s/visDQMVerify%s.C" % (CHECKDIR, checklevel)
# Run the ROOT verifier and get only stderr, discard stdout.
cmd = (
"exec perl -e 'alarm(30); exec qw(root -n -l -b -q %s %s)' 2>&1 >/dev/null"
% (path, checkprog)
)
logme(f"Running: {cmd}", level=logging.DEBUG)
check = os.popen(cmd).read().rstrip()
if not check.startswith("VERIFY: Good to go"):
warnPath(info, path, check)
return False
# OK, update classification.
info["check"] = check
# Assign file paths, including versioning
ok = False
assert "version" in info
assert "runnr" in info
assert info["version"] == 1, "starting version should be one"
if info["class"] == "online_data":
assert "subsystem" in info
assert info["runnr"] > 1
subsys = info["subsystem"]
if subsys:
info["filepat"] = "OnlineData/%05dxxxx/%07dxx/DQM_V%%04d_%s_R%09d.root" % (
info["runnr"] / 10000,
info["runnr"] / 100,
subsys,
info["runnr"],
)
else:
info["filepat"] = "OnlineData/%05dxxxx/%07dxx/DQM_V%%04d_R%09d.root" % (
info["runnr"] / 10000,
info["runnr"] / 100,
info["runnr"],
)
info["zippat"] = "OnlineData/%05dxxxx/DQM_Online_R%07dxx_S%%04d.zip" % (
info["runnr"] / 10000,
info["runnr"] / 100,
)
assignUniqueVersion(info)
ok = True
elif info["class"] == "offline_data":
assert "dataset" in info
assert re.match(visDQMUtils.RXDATASET, info["dataset"])
assert not re.match(visDQMUtils.RXRELVALMC, info["dataset"])
assert not re.match(visDQMUtils.RXRELVALDATA, info["dataset"])
assert not re.match(visDQMUtils.RXRELVALRUNDEPMC, info["dataset"])
assert not re.match(visDQMUtils.RXRUNDEPMC, info["dataset"])
assert info["runnr"] > 1
if checkDataset(info):
info["filepat"] = "OfflineData/%s/%s/%07dxx/DQM_V%%04d_R%09d%s.root" % (
info["era"],
info["primds"],
info["runnr"] / 100,
info["runnr"],
info["dataset"].replace("/", "__"),
)
info["zippat"] = (
"OfflineData/%s/%s/DQM_Offline_%s_%s_R%07dxx_S%%04d.zip"
% (
info["era"],
info["primds"],
info["era"],
info["primds"],
info["runnr"] / 100,
)
)
assignUniqueVersion(info)
ok = True
elif info["class"] == "relval_data":
assert "dataset" in info
assert "release" in info
assert re.match(visDQMUtils.RXDATASET, info["dataset"])
assert not re.match(visDQMUtils.RXRELVALMC, info["dataset"])
assert not re.match(visDQMUtils.RXRELVALRUNDEPMC, info["dataset"])
assert not re.match(visDQMUtils.RXRUNDEPMC, info["dataset"])
assert re.match(visDQMUtils.RXRELVALDATA, info["dataset"])
assert info["runnr"] > 1
assert info["release"].startswith("CMSSW")
if checkDataset(info):
info["filepat"] = "RelValData/%s_x/DQM_V%%04d_R%09d%s.root" % (
"_".join(info["release"].split("_")[0:3]),
info["runnr"],
info["dataset"].replace("/", "__"),
)
info["zippat"] = "RelValData/%s_x_x/DQM_RelValData_%s_%s_S%%04d.zip" % (
"_".join(info["release"].split("_")[0:2]),
info["release"],
info["primds"],
)
assignUniqueVersion(info)
ok = True
elif info["class"] == "relval_mc":
assert "dataset" in info
assert "release" in info
assert re.match(visDQMUtils.RXDATASET, info["dataset"])
assert re.match(visDQMUtils.RXRELVALMC, info["dataset"])
assert info["runnr"] == 1
assert info["release"].startswith("CMSSW")
if checkDataset(info):
info["filepat"] = "RelVal/%s_x/DQM_V%%04d_R%09d%s.root" % (
"_".join(info["release"].split("_")[0:3]),
info["runnr"],
info["dataset"].replace("/", "__"),
)
info["zippat"] = "RelVal/%s_x_x/DQM_RelVal_%s_S%%04d.zip" % (
"_".join(info["release"].split("_")[0:2]),
info["release"],
)
assignUniqueVersion(info)
ok = True
elif info["class"] == "relval_rundepmc":
assert "dataset" in info
assert "release" in info
assert re.match(visDQMUtils.RXDATASET, info["dataset"])
assert re.match(visDQMUtils.RXRELVALRUNDEPMC, info["dataset"])
assert info["runnr"] > 1
assert info["release"].startswith("CMSSW")
if checkDataset(info):
info["filepat"] = "RelVal/%s_x/DQM_V%%04d_R%09d%s.root" % (
"_".join(info["release"].split("_")[0:3]),
info["runnr"],
info["dataset"].replace("/", "__"),
)
info["zippat"] = "RelVal/%s_x_x/DQM_RelVal_%s_S%%04d.zip" % (
"_".join(info["release"].split("_")[0:2]),
info["release"],
)
assignUniqueVersion(info)
ok = True
elif info["class"] == "simulated":
assert "dataset" in info
assert re.match(visDQMUtils.RXDATASET, info["dataset"])
assert not re.match(visDQMUtils.RXRELVALMC, info["dataset"])
assert not re.match(visDQMUtils.RXRELVALDATA, info["dataset"])
assert info["runnr"] == 1
if checkDataset(info):
filedate = time.strftime("%Y%m", time.gmtime(info["time"]))
info["filepat"] = "MonteCarlo/%s/DQM_V%%04d_R%09d%s.root" % (
info["era"],
info["runnr"],
info["dataset"].replace("/", "__"),
)
info["zippat"] = "MonteCarlo/%s/DQM_MC_%s_%s_%s_S%%04d.zip" % (
info["era"],
info["era"],
info["primds"],
filedate,
)
assignUniqueVersion(info)
ok = True
elif info["class"] == "simulated_rundep":
assert "dataset" in info
assert re.match(visDQMUtils.RXDATASET, info["dataset"])
assert not re.match(visDQMUtils.RXRELVALMC, info["dataset"])
assert not re.match(visDQMUtils.RXRELVALRUNDEPMC, info["dataset"])
assert not re.match(visDQMUtils.RXRELVALDATA, info["dataset"])
assert info["runnr"] > 1
if checkDataset(info):
filedate = time.strftime("%Y%m", time.gmtime(info["time"]))
info["filepat"] = "MonteCarlo_RunDep/%s/DQM_V%%04d_R%09d%s.root" % (
info["era"],
info["runnr"],
info["dataset"].replace("/", "__"),
)
info["zippat"] = "MonteCarlo_RunDep/%s/DQM_MC_%s_%s_%s_S%%04d.zip" % (
info["era"],
info["era"],
info["primds"],
filedate,
)
assignUniqueVersion(info)
ok = True
else:
assert False, "don't know how to handle data %s" % info
# If the final checks didn't pass, bail out now.
if not ok:
return False
# Move the files into place. Output a new info file with all
# the info we have created, and remove the .origin file.
fname = "%s/%s" % (FILEREPO, info["path"])
finfo = "%s.dqminfo" % fname
(dname, filepart) = fname.rsplit("/", 1)
if not os.path.exists(dname):
os.makedirs(dname)
(fd, tmp) = mkstemp(dir=dname)
os.write(fd, f"{info}\n".encode())
os.close(fd)
os.chmod(tmp, 0o666 & ~myumask)
os.rename(tmp, finfo)
os.rename(info["import"], fname)
os.remove("%s.origin" % info["import"])
for n in NEXT:
if not os.path.exists(n):
os.makedirs(n)
ninfo = "%s/%s" % (n, finfo.rsplit("/", 1)[-1])
if not os.path.exists(ninfo):
os.link(finfo, ninfo)
return True
# --------------------------------------------------------------------
# Process files forever.
myumask = current_umask()
while True:
try:
# Find new complete files. Compute repository destination for
# each file. Reversion files where an older one already exists.
for info in sorted(findNewFiles(), key=cmp_to_key(orderFiles)):
logme("receiving %s" % info["import"])
finaliseOneFile(info)
# If anything bad happened, barf but keep going.
except KeyboardInterrupt as e:
sys.exit(0)
except Exception as e:
logme("error: %s", e)
print_exc()
time.sleep(WAITTIME)