Skip to content
This repository has been archived by the owner on Jan 30, 2024. It is now read-only.

2.1.22 #198

Merged
merged 43 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
1299189
New version
PalNilsson Aug 19, 2019
f107c5f
Silenced atlasLocalSetup.sh output in proxy setup
PalNilsson Aug 19, 2019
ba9429a
Preventing None values to end up in the zombies list causing pilot to…
PalNilsson Aug 19, 2019
422f72b
Added new error code 1345, "Singularity: Failed to create user namesp…
PalNilsson Aug 19, 2019
0cf94e5
Now identifying "Failed to create user namespace" singularity error
PalNilsson Aug 19, 2019
1bd9490
Simplified extract_stderr_msg() which now also finds "Warning" messag…
PalNilsson Aug 19, 2019
74acd77
Added new error codes and support; 1346 (Transform not found), 1347 (…
PalNilsson Aug 19, 2019
5f9eacc
Added new error codes and support; 1348 (Singularity: Resource tempor…
PalNilsson Aug 19, 2019
bf41f79
Added new error code and support; 1349 (Unrecognized transform argume…
PalNilsson Aug 19, 2019
27c005c
Added setup.py
fbarreir Aug 19, 2019
1b39bb7
Moved pilot version banner to auxiliary module
PalNilsson Aug 19, 2019
6890af8
Printing pilot version banner after re-establishing logging before as…
PalNilsson Aug 19, 2019
41c9378
setup.py: removed print
fbarreir Aug 20, 2019
d32fb32
Changed allownoputput from list to string. Added skeleton for verify_…
Aug 20, 2019
032b7b2
Updated comment
PalNilsson Aug 20, 2019
2c7e3cd
Simplified maximum_getjob_requests logic
PalNilsson Aug 20, 2019
5253174
Now sending pq in trace report. Requested by T. Beermann
PalNilsson Aug 20, 2019
7d2178f
Added missing DOCTYPE in PFC, needed by release 20.7.7
PalNilsson Aug 20, 2019
6d45551
Update
PalNilsson Aug 20, 2019
e14e8c2
Fixed get_logger import complications
PalNilsson Aug 20, 2019
e1f66c8
Update
Aug 21, 2019
599ccbe
Update
Aug 21, 2019
5ba1f41
Update
Aug 21, 2019
bfcc8c8
Added new error code 1350, EMPTYOUTPUTFILE: "Empty output file detected"
Aug 21, 2019
5e6dc06
Now verifying output files for production jobs
Aug 21, 2019
9c1436f
Updated build number
Aug 21, 2019
d6e8047
Merge pull request #196 from PanDAWMS/setup.py
PalNilsson Aug 21, 2019
5cd460a
Merge remote-tracking branch 'upstream/next' into next
PalNilsson Aug 21, 2019
5bafd37
Moved around some code
PalNilsson Aug 21, 2019
285dbc2
Updated build number
PalNilsson Aug 21, 2019
5435537
Update
PalNilsson Aug 21, 2019
7d88e08
Cleaned up
PalNilsson Aug 21, 2019
ede0623
Cleaned up some more
PalNilsson Aug 21, 2019
7ba540e
Updated comments
PalNilsson Aug 21, 2019
7e45687
Added another case for remove from stage-out
PalNilsson Aug 21, 2019
0ceaa05
Added another case in output file verification
PalNilsson Aug 21, 2019
8d810a0
Added another case in output file verification
PalNilsson Aug 21, 2019
d1037be
Correction
PalNilsson Aug 21, 2019
b9ee6de
Cleanup
PalNilsson Aug 21, 2019
f65c298
Updated build number
PalNilsson Aug 22, 2019
4192438
Ignoring flake8 too complex message for verify_output_files
PalNilsson Aug 22, 2019
3169d7e
Cleanup
PalNilsson Aug 22, 2019
d45c81e
Merge pull request #197 from PalNilsson/next
PalNilsson Aug 22, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.21.17
2.1.22.8
25 changes: 1 addition & 24 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,14 @@

from pilot.common.exception import PilotException
from pilot.info import infosys
from pilot.util.auxiliary import shell_exit_code
from pilot.util.auxiliary import pilot_version_banner, shell_exit_code
from pilot.util.config import config
from pilot.util.constants import SUCCESS, FAILURE, ERRNO_NOJOBS, PILOT_START_TIME, PILOT_END_TIME, get_pilot_version, \
SERVER_UPDATE_NOT_DONE
from pilot.util.filehandling import get_pilot_work_dir, mkdirs, establish_logging
from pilot.util.harvester import is_harvester_mode
from pilot.util.https import https_setup
from pilot.util.timing import add_to_pilot_timing
from pilot.util.workernode import is_virtual_machine, display_architecture_info


def pilot_version_banner():
"""
Print a pilot version banner.

:return:
"""

logger = logging.getLogger(__name__)

version = '*** PanDA Pilot version %s ***' % get_pilot_version()
logger.info('*' * len(version))
logger.info(version)
logger.info('*' * len(version))
logger.info('')

if is_virtual_machine():
logger.info('pilot is running in a VM')

display_architecture_info()
logger.info('*' * len(version))


def main():
Expand Down
25 changes: 17 additions & 8 deletions pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ class ErrorCodes:
MIDDLEWAREIMPORTFAILURE = 1342
NOOUTPUTINJOBREPORT = 1343
RESOURCEUNAVAILABLE = 1344
SINGULARITYFAILEDUSERNAMESPACE = 1345
TRANSFORMNOTFOUND = 1346
UNSUPPORTEDSL5OS = 1347
SINGULARITYRESOURCEUNAVAILABLE = 1348
UNRECOGNIZEDTRFARGUMENTS = 1349
EMPTYOUTPUTFILE = 1350

_error_messages = {
GENERALERROR: "General pilot error, consult batch log",
Expand Down Expand Up @@ -232,8 +238,13 @@ class ErrorCodes:
BADQUEUECONFIGURATION: "Bad queue configuration detected",
MIDDLEWAREIMPORTFAILURE: "Failed to import middleware (consult Pilot log)",
NOOUTPUTINJOBREPORT: "Found no output in job report",
RESOURCEUNAVAILABLE: "Resource temporarily unavailable"

RESOURCEUNAVAILABLE: "Resource temporarily unavailable",
SINGULARITYFAILEDUSERNAMESPACE: "Singularity: Failed to create user namespace",
TRANSFORMNOTFOUND: "Transform not found",
UNSUPPORTEDSL5OS: "Unsupported SL5 OS",
SINGULARITYRESOURCEUNAVAILABLE: "Singularity: Resource temporarily unavailable", # not the same as RESOURCEUNAVAILABLE
UNRECOGNIZEDTRFARGUMENTS: "Unrecognized transform arguments",
EMPTYOUTPUTFILE: "Empty output file detected"
}

put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
Expand Down Expand Up @@ -352,15 +363,13 @@ def extract_stderr_msg(self, stderr):
"""

msg = ""
pattern = r"ERROR +\: (.+)"
found = re.findall(pattern, stderr)
if len(found) > 0:
msg = found[0]
else:
pattern = r"WARNING\: (.+)"
patterns = [r"ERROR +\: (.+)", r"Warning\: (.+)", r"WARNING\: (.+)"]

for pattern in patterns:
found = re.findall(pattern, stderr)
if len(found) > 0:
msg = found[0]
break

return msg

Expand Down
4 changes: 2 additions & 2 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def _stage_in(args, job):
event_type += "_a"
rse = get_rse(job.indata)
localsite = remotesite = rse
trace_report = TraceReport(pq='', localSite=localsite, remoteSite=remotesite, dataset="", eventType=event_type)
trace_report = TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), localSite=localsite, remoteSite=remotesite, dataset="", eventType=event_type)
trace_report.init(job)

# now that the trace report has been created, remove any files that are not to be transferred (DBRelease files) from the indata list
Expand Down Expand Up @@ -647,7 +647,7 @@ def _do_stageout(job, xdata, activity, title):
event_type += "_a"
rse = get_rse(job.outdata)
localsite = remotesite = rse
trace_report = TraceReport(pq='', localSite=localsite, remoteSite=remotesite, dataset="", eventType=event_type)
trace_report = TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), localSite=localsite, remoteSite=remotesite, dataset="", eventType=event_type)
trace_report.init(job)

try:
Expand Down
19 changes: 6 additions & 13 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pilot.info import infosys, JobData, InfoService, JobInfoProvider
from pilot.util import https
from pilot.util.auxiliary import get_batchsystem_jobid, get_job_scheduler_id, get_pilot_id, get_logger, \
set_pilot_state, get_pilot_state, check_for_final_server_update
set_pilot_state, get_pilot_state, check_for_final_server_update, pilot_version_banner, is_virtual_machine
from pilot.util.config import config
from pilot.util.common import should_abort
from pilot.util.constants import PILOT_PRE_GETJOB, PILOT_POST_GETJOB, PILOT_KILL_SIGNAL, LOG_TRANSFER_NOT_DONE, \
Expand All @@ -45,8 +45,7 @@
from pilot.util.proxy import get_distinguished_name
from pilot.util.queuehandling import scan_for_jobs, put_in_queue
from pilot.util.timing import add_to_pilot_timing, timing_report, get_postgetjob_time, get_time_since, time_stamp
from pilot.util.workernode import get_disk_space, collect_workernode_info, get_node_name, is_virtual_machine, \
get_cpu_model
from pilot.util.workernode import get_disk_space, collect_workernode_info, get_node_name, get_cpu_model

import logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -876,8 +875,6 @@ def proceed_with_getjob(timefloor, starttime, jobnumber, getjob_requests, harves

currenttime = time.time()

logger.debug('proceed_with_getjob called with getjob_requests=%d' % getjob_requests)

# should the proxy be verified?
if verify_proxy:
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
Expand All @@ -897,11 +894,7 @@ def proceed_with_getjob(timefloor, starttime, jobnumber, getjob_requests, harves
traces.pilot['error_code'] = errors.NOLOCALSPACE
return False

if harvester:
maximum_getjob_requests = 60 # 1 s apart
else:
maximum_getjob_requests = config.Pilot.maximum_getjob_requests

maximum_getjob_requests = 60 if harvester else config.Pilot.maximum_getjob_requests # 1 s apart (if harvester)
if getjob_requests > int(maximum_getjob_requests):
logger.warning('reached maximum number of getjob requests (%s) -- will abort pilot' %
config.Pilot.maximum_getjob_requests)
Expand Down Expand Up @@ -1280,7 +1273,6 @@ def retrieve(queues, traces, args):
time.sleep(0.5)
getjob_requests += 1

logger.debug('getjob_requests=%d' % getjob_requests)
if not proceed_with_getjob(timefloor, starttime, jobnumber, getjob_requests, args.harvester, args.verify_proxy, traces):
# do not set graceful stop if pilot has not finished sending the final job update
# i.e. wait until SERVER_UPDATE is DONE_FINAL
Expand Down Expand Up @@ -1356,7 +1348,7 @@ def retrieve(queues, traces, args):
logging.handlers = []
logging.shutdown()
establish_logging(args)

pilot_version_banner()
getjob_requests = 0
break
time.sleep(0.5)
Expand Down Expand Up @@ -1427,7 +1419,8 @@ def has_job_completed(queues):
log.info("job %s has completed" % job.jobid)

# cleanup of any remaining processes
job.zombies.append(job.pid)
if job.pid:
job.zombies.append(job.pid)
cleanup(job)

return True
Expand Down
10 changes: 10 additions & 0 deletions pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,16 @@ def perform_initial_payload_error_analysis(job, exit_code):
log.warning("extracted message from stderr:\n%s" % msg)
if "Failed invoking the NEWUSER namespace runtime" in msg:
ec = errors.SINGULARITYNEWUSERNAMESPACE
elif "Failed to create user namespace" in msg:
ec = errors.SINGULARITYFAILEDUSERNAMESPACE
elif "command not found" in msg:
ec = errors.TRANSFORMNOTFOUND
elif "SL5 is unsupported" in msg:
ec = errors.UNSUPPORTEDSL5OS
elif "resource temporarily unavailable" in msg:
ec = errors.SINGULARITYRESOURCEUNAVAILABLE
elif "unrecognized arguments" in msg:
ec = errors.UNRECOGNIZEDTRFARGUMENTS

if not ec:
ec = errors.resolve_transform_error(exit_code, stderr)
Expand Down
6 changes: 3 additions & 3 deletions pilot/info/jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class JobData(BaseData):
accessmode = "" # direct access instruction from jobparams
processingtype = "" # e.g. nightlies
maxcpucount = 0 # defines what is a looping job (seconds)
allownooutput = [] # used to disregard empty files from job report
allownooutput = "" # used to disregard empty files from job report

# set by the pilot (not from job definition)
workdir = "" # working directoty for this job
Expand Down Expand Up @@ -143,8 +143,8 @@ class JobData(BaseData):
'cpuconsumptionunit', 'homepackage', 'jobsetid', 'payload', 'processingtype',
'swrelease', 'zipmap', 'imagename', 'accessmode', 'transfertype',
'datasetin', ## TO BE DEPRECATED: moved to FileSpec (job.indata)
'infilesguids', 'memorymonitor'],
list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'allownooutput', 'zombies'],
'infilesguids', 'memorymonitor', 'allownooutput'],
list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'zombies'],
dict: ['status', 'fileinfo', 'metadata', 'utilities', 'overwrite_queuedata', 'sizes'],
bool: ['is_eventservice', 'is_eventservicemerge', 'noexecstrcnv', 'debug', 'usecontainer']
}
Expand Down
124 changes: 122 additions & 2 deletions pilot/user/atlas/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,10 @@ def update_job_data(job):
# extract output files from the job report if required, in case the trf has created additional (overflow) files
# also make sure all guids are assigned (use job report value if present, otherwise generate the guid)
if job.metadata and not job.is_eventservice and not job.is_analysis():
extract_output_files(job)
extract_output_files(job) # keep this for now, complicated to merge with verify_output_files?
verify_output_files(job)
else:
if not job.allownooutput: # i.e. if it's an empty list, do nothing
if not job.allownooutput: # i.e. if it's an empty list/string, do nothing
log.debug("will not try to extract output files from jobReport for user job (and allowNoOut list is empty)")
else:
# remove the files listed in allowNoOutput if they don't exist
Expand Down Expand Up @@ -743,6 +744,125 @@ def extract_output_files(job):
job.outdata = extra


def verify_output_files(job): # noqa: C901
"""
Make sure that the known output files from the job definition are listed in the job report and number of processed events
is greater than zero. If the output file is not listed in the job report, then if the file is listed in allowNoOutput
remove it from stage-out, otherwise fail the job.

Note from Rod: fail scenario: The output file is not in output:[] or is there with zero events. Then if allownooutput is not
set - fail the job. If it is set, then do not store the output, and finish ok.

:param job: job object.
:return: Boolean (and potentially updated job.outdata list)
"""

failed = False
log = get_logger(job.jobid)

# get list of output files from the job definition
lfns_jobdef = []
for fspec in job.outdata:
lfns_jobdef.append(fspec.lfn)
if not lfns_jobdef:
log.debug('empty output file list from job definition (nothing to verify)')
return True

# get list of output files from job report
# (if None is returned, it means the job report is from an old release and does not contain an output list)
output = job.metadata.get('files', {}).get('output', None)
if not output and output is not None:
# ie empty list, output=[] - are all known output files in allowNoOutput?
log.warning('encountered an empty output file list in job report, consulting allowNoOutput list')
failed = False
for lfn in lfns_jobdef:
if lfn not in job.allownooutput:
failed = True
log.warning('lfn %s is not in allowNoOutput list - job will fail' % lfn)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.MISSINGOUTPUTFILE)
break
else:
log.info('lfn %s listed in allowNoOutput - will be removed from stage-out' % lfn)
remove_from_stageout(lfn, job)

elif output is None:
# ie job report is ancient / output could not be extracted
log.warning('output file list could not be extracted from job report (nothing to verify)')
else:
output_jobrep = {} # {lfn: nentries, ..}
log.debug('extracted output file list from job report - make sure all known output files are listed')
failed = False
# first collect the output files from the job report
for dat in output:
for fdat in dat.get('subFiles', []):
# get the lfn
name = fdat.get('name', None)

# get the number of processed events
nentries = fdat.get('nentries', None)

# add the output file info to the dictionary
output_jobrep[name] = nentries

# now make sure that the known output files are in the job report dictionary
for lfn in lfns_jobdef:
if lfn not in output_jobrep and lfn not in job.allownooutput:
log.warning('output file %s from job definition is not present in job report and is not listed in allowNoOutput - job will fail' % lfn)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.MISSINGOUTPUTFILE)
failed = True
break
if lfn not in output_jobrep and lfn in job.allownooutput:
log.warning('output file %s from job definition is not present in job report but is listed in allowNoOutput - remove from stage-out' % lfn)
remove_from_stageout(lfn, job)
else:
nentries = output_jobrep[lfn]
if nentries is not None and nentries == 0 and lfn not in job.allownooutput:
log.warning('output file %s is listed in job report, has zero events and is not listed in allowNoOutput - job will fail' % lfn)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.EMPTYOUTPUTFILE)
failed = True
break
if nentries == 0 and lfn in job.allownooutput:
log.warning('output file %s is listed in job report, nentries=0 and is listed in allowNoOutput - remove from stage-out' % lfn)
remove_from_stageout(lfn, job)
elif not nentries and lfn not in job.allownooutput:
log.warning('output file %s is listed in job report, nentries is not set and is not listed in allowNoOutput - ignore' % lfn)
elif not nentries and lfn in job.allownooutput:
log.warning('output file %s is listed in job report, nentries is None but is listed in allowNoOutput - remove from stage-out' % lfn)
remove_from_stageout(lfn, job)
elif nentries:
log.info('output file %s has %d events' % (lfn, nentries))
else: # should not reach this step
log.warning('case not handled for output file %s with %s events (ignore)' % (lfn, str(nentries)))

status = True if not failed else False

if status:
log.info('output file verification succeeded')
else:
log.warning('output file verification failed')

return status


def remove_from_stageout(lfn, job):
"""
From the given lfn from the stage-out list.

:param lfn: local file name (string).
:param job: job object
:return: [updated job object]
"""

log = get_logger(job.jobid)
outdata = []
for fspec in job.outdata:
if fspec.lfn == lfn:
log.info('removing %s from stage-out list' % lfn)
else:
outdata.append(fspec)
job.outdata = outdata


def remove_no_output_files(job):
"""
Remove files from output file list if they are listed in allowNoOutput and do not exist.
Expand Down
3 changes: 0 additions & 3 deletions pilot/user/atlas/diagnose.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ def is_user_code_missing(job):

return scan_file(stdout,
error_messages,
job.jobid,
warning_message="identified an \'%s\' message in %s" % (error_messages[0], os.path.basename(stdout)))


Expand All @@ -176,7 +175,6 @@ def is_out_of_space(job):

return scan_file(stderr,
error_messages,
job.jobid,
warning_message="identified a \'%s\' message in %s" % (error_messages[0], os.path.basename(stderr)))


Expand Down Expand Up @@ -229,7 +227,6 @@ def is_nfssqlite_locking_problem(job):

return scan_file(stdout,
error_messages,
job.jobid,
warning_message="identified an NFS/Sqlite locking problem in %s" % os.path.basename(stdout))


Expand Down
3 changes: 3 additions & 0 deletions pilot/user/atlas/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def create_input_file_metadata(file_dictionary, workdir, filename="PoolFileCatal
if '&' in xml:
xml = xml.replace('&', '&')

# stitch in the DOCTYPE
xml = xml.replace('<POOLFILECATALOG>', '<!DOCTYPE POOLFILECATALOG SYSTEM "InMemory">\n<POOLFILECATALOG>')

write_file(os.path.join(workdir, filename), xml, mute=False)

return xml
Expand Down
2 changes: 1 addition & 1 deletion pilot/user/atlas/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def verify_proxy(limit=None):
else:
envsetup = ''
#envsetup += ". %s;" % (arcproxy_setup)
envsetup += ". %s/atlas.cern.ch/repo/ATLASLocalRootBase/user/atlasLocalSetup.sh;" % get_file_system_root_path()
envsetup += ". %s/atlas.cern.ch/repo/ATLASLocalRootBase/user/atlasLocalSetup.sh --quiet;" % get_file_system_root_path()
if os.environ.get('ALRB_noGridMW', '').lower() != "yes":
envsetup += "lsetup emi;"
else:
Expand Down
Loading