Skip to content
This repository has been archived by the owner on Jan 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #198 from PanDAWMS/next
Browse files Browse the repository at this point in the history
2.1.22
  • Loading branch information
PalNilsson authored Aug 22, 2019
2 parents 9b53de0 + d45c81e commit 7f1b3d3
Show file tree
Hide file tree
Showing 16 changed files with 271 additions and 114 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.21.17
2.1.22.8
25 changes: 1 addition & 24 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,14 @@

from pilot.common.exception import PilotException
from pilot.info import infosys
from pilot.util.auxiliary import shell_exit_code
from pilot.util.auxiliary import pilot_version_banner, shell_exit_code
from pilot.util.config import config
from pilot.util.constants import SUCCESS, FAILURE, ERRNO_NOJOBS, PILOT_START_TIME, PILOT_END_TIME, get_pilot_version, \
SERVER_UPDATE_NOT_DONE
from pilot.util.filehandling import get_pilot_work_dir, mkdirs, establish_logging
from pilot.util.harvester import is_harvester_mode
from pilot.util.https import https_setup
from pilot.util.timing import add_to_pilot_timing
from pilot.util.workernode import is_virtual_machine, display_architecture_info


def pilot_version_banner():
"""
Print a pilot version banner.
:return:
"""

logger = logging.getLogger(__name__)

version = '*** PanDA Pilot version %s ***' % get_pilot_version()
logger.info('*' * len(version))
logger.info(version)
logger.info('*' * len(version))
logger.info('')

if is_virtual_machine():
logger.info('pilot is running in a VM')

display_architecture_info()
logger.info('*' * len(version))


def main():
Expand Down
25 changes: 17 additions & 8 deletions pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ class ErrorCodes:
MIDDLEWAREIMPORTFAILURE = 1342
NOOUTPUTINJOBREPORT = 1343
RESOURCEUNAVAILABLE = 1344
SINGULARITYFAILEDUSERNAMESPACE = 1345
TRANSFORMNOTFOUND = 1346
UNSUPPORTEDSL5OS = 1347
SINGULARITYRESOURCEUNAVAILABLE = 1348
UNRECOGNIZEDTRFARGUMENTS = 1349
EMPTYOUTPUTFILE = 1350

_error_messages = {
GENERALERROR: "General pilot error, consult batch log",
Expand Down Expand Up @@ -232,8 +238,13 @@ class ErrorCodes:
BADQUEUECONFIGURATION: "Bad queue configuration detected",
MIDDLEWAREIMPORTFAILURE: "Failed to import middleware (consult Pilot log)",
NOOUTPUTINJOBREPORT: "Found no output in job report",
RESOURCEUNAVAILABLE: "Resource temporarily unavailable"

RESOURCEUNAVAILABLE: "Resource temporarily unavailable",
SINGULARITYFAILEDUSERNAMESPACE: "Singularity: Failed to create user namespace",
TRANSFORMNOTFOUND: "Transform not found",
UNSUPPORTEDSL5OS: "Unsupported SL5 OS",
SINGULARITYRESOURCEUNAVAILABLE: "Singularity: Resource temporarily unavailable", # not the same as RESOURCEUNAVAILABLE
UNRECOGNIZEDTRFARGUMENTS: "Unrecognized transform arguments",
EMPTYOUTPUTFILE: "Empty output file detected"
}

put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
Expand Down Expand Up @@ -352,15 +363,13 @@ def extract_stderr_msg(self, stderr):
"""

msg = ""
pattern = r"ERROR +\: (.+)"
found = re.findall(pattern, stderr)
if len(found) > 0:
msg = found[0]
else:
pattern = r"WARNING\: (.+)"
patterns = [r"ERROR +\: (.+)", r"Warning\: (.+)", r"WARNING\: (.+)"]

for pattern in patterns:
found = re.findall(pattern, stderr)
if len(found) > 0:
msg = found[0]
break

return msg

Expand Down
4 changes: 2 additions & 2 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def _stage_in(args, job):
event_type += "_a"
rse = get_rse(job.indata)
localsite = remotesite = rse
trace_report = TraceReport(pq='', localSite=localsite, remoteSite=remotesite, dataset="", eventType=event_type)
trace_report = TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), localSite=localsite, remoteSite=remotesite, dataset="", eventType=event_type)
trace_report.init(job)

# now that the trace report has been created, remove any files that are not to be transferred (DBRelease files) from the indata list
Expand Down Expand Up @@ -647,7 +647,7 @@ def _do_stageout(job, xdata, activity, title):
event_type += "_a"
rse = get_rse(job.outdata)
localsite = remotesite = rse
trace_report = TraceReport(pq='', localSite=localsite, remoteSite=remotesite, dataset="", eventType=event_type)
trace_report = TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), localSite=localsite, remoteSite=remotesite, dataset="", eventType=event_type)
trace_report.init(job)

try:
Expand Down
19 changes: 6 additions & 13 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pilot.info import infosys, JobData, InfoService, JobInfoProvider
from pilot.util import https
from pilot.util.auxiliary import get_batchsystem_jobid, get_job_scheduler_id, get_pilot_id, get_logger, \
set_pilot_state, get_pilot_state, check_for_final_server_update
set_pilot_state, get_pilot_state, check_for_final_server_update, pilot_version_banner, is_virtual_machine
from pilot.util.config import config
from pilot.util.common import should_abort
from pilot.util.constants import PILOT_PRE_GETJOB, PILOT_POST_GETJOB, PILOT_KILL_SIGNAL, LOG_TRANSFER_NOT_DONE, \
Expand All @@ -45,8 +45,7 @@
from pilot.util.proxy import get_distinguished_name
from pilot.util.queuehandling import scan_for_jobs, put_in_queue
from pilot.util.timing import add_to_pilot_timing, timing_report, get_postgetjob_time, get_time_since, time_stamp
from pilot.util.workernode import get_disk_space, collect_workernode_info, get_node_name, is_virtual_machine, \
get_cpu_model
from pilot.util.workernode import get_disk_space, collect_workernode_info, get_node_name, get_cpu_model

import logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -876,8 +875,6 @@ def proceed_with_getjob(timefloor, starttime, jobnumber, getjob_requests, harves

currenttime = time.time()

logger.debug('proceed_with_getjob called with getjob_requests=%d' % getjob_requests)

# should the proxy be verified?
if verify_proxy:
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
Expand All @@ -897,11 +894,7 @@ def proceed_with_getjob(timefloor, starttime, jobnumber, getjob_requests, harves
traces.pilot['error_code'] = errors.NOLOCALSPACE
return False

if harvester:
maximum_getjob_requests = 60 # 1 s apart
else:
maximum_getjob_requests = config.Pilot.maximum_getjob_requests

maximum_getjob_requests = 60 if harvester else config.Pilot.maximum_getjob_requests # 1 s apart (if harvester)
if getjob_requests > int(maximum_getjob_requests):
logger.warning('reached maximum number of getjob requests (%s) -- will abort pilot' %
config.Pilot.maximum_getjob_requests)
Expand Down Expand Up @@ -1280,7 +1273,6 @@ def retrieve(queues, traces, args):
time.sleep(0.5)
getjob_requests += 1

logger.debug('getjob_requests=%d' % getjob_requests)
if not proceed_with_getjob(timefloor, starttime, jobnumber, getjob_requests, args.harvester, args.verify_proxy, traces):
# do not set graceful stop if pilot has not finished sending the final job update
# i.e. wait until SERVER_UPDATE is DONE_FINAL
Expand Down Expand Up @@ -1356,7 +1348,7 @@ def retrieve(queues, traces, args):
logging.handlers = []
logging.shutdown()
establish_logging(args)

pilot_version_banner()
getjob_requests = 0
break
time.sleep(0.5)
Expand Down Expand Up @@ -1427,7 +1419,8 @@ def has_job_completed(queues):
log.info("job %s has completed" % job.jobid)

# cleanup of any remaining processes
job.zombies.append(job.pid)
if job.pid:
job.zombies.append(job.pid)
cleanup(job)

return True
Expand Down
10 changes: 10 additions & 0 deletions pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,16 @@ def perform_initial_payload_error_analysis(job, exit_code):
log.warning("extracted message from stderr:\n%s" % msg)
if "Failed invoking the NEWUSER namespace runtime" in msg:
ec = errors.SINGULARITYNEWUSERNAMESPACE
elif "Failed to create user namespace" in msg:
ec = errors.SINGULARITYFAILEDUSERNAMESPACE
elif "command not found" in msg:
ec = errors.TRANSFORMNOTFOUND
elif "SL5 is unsupported" in msg:
ec = errors.UNSUPPORTEDSL5OS
elif "resource temporarily unavailable" in msg:
ec = errors.SINGULARITYRESOURCEUNAVAILABLE
elif "unrecognized arguments" in msg:
ec = errors.UNRECOGNIZEDTRFARGUMENTS

if not ec:
ec = errors.resolve_transform_error(exit_code, stderr)
Expand Down
6 changes: 3 additions & 3 deletions pilot/info/jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class JobData(BaseData):
accessmode = "" # direct access instruction from jobparams
processingtype = "" # e.g. nightlies
maxcpucount = 0 # defines what is a looping job (seconds)
allownooutput = [] # used to disregard empty files from job report
allownooutput = "" # used to disregard empty files from job report

# set by the pilot (not from job definition)
workdir = "" # working directoty for this job
Expand Down Expand Up @@ -143,8 +143,8 @@ class JobData(BaseData):
'cpuconsumptionunit', 'homepackage', 'jobsetid', 'payload', 'processingtype',
'swrelease', 'zipmap', 'imagename', 'accessmode', 'transfertype',
'datasetin', ## TO BE DEPRECATED: moved to FileSpec (job.indata)
'infilesguids', 'memorymonitor'],
list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'allownooutput', 'zombies'],
'infilesguids', 'memorymonitor', 'allownooutput'],
list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'zombies'],
dict: ['status', 'fileinfo', 'metadata', 'utilities', 'overwrite_queuedata', 'sizes'],
bool: ['is_eventservice', 'is_eventservicemerge', 'noexecstrcnv', 'debug', 'usecontainer']
}
Expand Down
124 changes: 122 additions & 2 deletions pilot/user/atlas/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,10 @@ def update_job_data(job):
# extract output files from the job report if required, in case the trf has created additional (overflow) files
# also make sure all guids are assigned (use job report value if present, otherwise generate the guid)
if job.metadata and not job.is_eventservice and not job.is_analysis():
extract_output_files(job)
extract_output_files(job) # keep this for now, complicated to merge with verify_output_files?
verify_output_files(job)
else:
if not job.allownooutput: # i.e. if it's an empty list, do nothing
if not job.allownooutput: # i.e. if it's an empty list/string, do nothing
log.debug("will not try to extract output files from jobReport for user job (and allowNoOut list is empty)")
else:
# remove the files listed in allowNoOutput if they don't exist
Expand Down Expand Up @@ -743,6 +744,125 @@ def extract_output_files(job):
job.outdata = extra


def verify_output_files(job): # noqa: C901
"""
Make sure that the known output files from the job definition are listed in the job report and number of processed events
is greater than zero. If the output file is not listed in the job report, then if the file is listed in allowNoOutput
remove it from stage-out, otherwise fail the job.
Note from Rod: fail scenario: The output file is not in output:[] or is there with zero events. Then if allownooutput is not
set - fail the job. If it is set, then do not store the output, and finish ok.
:param job: job object.
:return: Boolean (and potentially updated job.outdata list)
"""

failed = False
log = get_logger(job.jobid)

# get list of output files from the job definition
lfns_jobdef = []
for fspec in job.outdata:
lfns_jobdef.append(fspec.lfn)
if not lfns_jobdef:
log.debug('empty output file list from job definition (nothing to verify)')
return True

# get list of output files from job report
# (if None is returned, it means the job report is from an old release and does not contain an output list)
output = job.metadata.get('files', {}).get('output', None)
if not output and output is not None:
# ie empty list, output=[] - are all known output files in allowNoOutput?
log.warning('encountered an empty output file list in job report, consulting allowNoOutput list')
failed = False
for lfn in lfns_jobdef:
if lfn not in job.allownooutput:
failed = True
log.warning('lfn %s is not in allowNoOutput list - job will fail' % lfn)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.MISSINGOUTPUTFILE)
break
else:
log.info('lfn %s listed in allowNoOutput - will be removed from stage-out' % lfn)
remove_from_stageout(lfn, job)

elif output is None:
# ie job report is ancient / output could not be extracted
log.warning('output file list could not be extracted from job report (nothing to verify)')
else:
output_jobrep = {} # {lfn: nentries, ..}
log.debug('extracted output file list from job report - make sure all known output files are listed')
failed = False
# first collect the output files from the job report
for dat in output:
for fdat in dat.get('subFiles', []):
# get the lfn
name = fdat.get('name', None)

# get the number of processed events
nentries = fdat.get('nentries', None)

# add the output file info to the dictionary
output_jobrep[name] = nentries

# now make sure that the known output files are in the job report dictionary
for lfn in lfns_jobdef:
if lfn not in output_jobrep and lfn not in job.allownooutput:
log.warning('output file %s from job definition is not present in job report and is not listed in allowNoOutput - job will fail' % lfn)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.MISSINGOUTPUTFILE)
failed = True
break
if lfn not in output_jobrep and lfn in job.allownooutput:
log.warning('output file %s from job definition is not present in job report but is listed in allowNoOutput - remove from stage-out' % lfn)
remove_from_stageout(lfn, job)
else:
nentries = output_jobrep[lfn]
if nentries is not None and nentries == 0 and lfn not in job.allownooutput:
log.warning('output file %s is listed in job report, has zero events and is not listed in allowNoOutput - job will fail' % lfn)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.EMPTYOUTPUTFILE)
failed = True
break
if nentries == 0 and lfn in job.allownooutput:
log.warning('output file %s is listed in job report, nentries=0 and is listed in allowNoOutput - remove from stage-out' % lfn)
remove_from_stageout(lfn, job)
elif not nentries and lfn not in job.allownooutput:
log.warning('output file %s is listed in job report, nentries is not set and is not listed in allowNoOutput - ignore' % lfn)
elif not nentries and lfn in job.allownooutput:
log.warning('output file %s is listed in job report, nentries is None but is listed in allowNoOutput - remove from stage-out' % lfn)
remove_from_stageout(lfn, job)
elif nentries:
log.info('output file %s has %d events' % (lfn, nentries))
else: # should not reach this step
log.warning('case not handled for output file %s with %s events (ignore)' % (lfn, str(nentries)))

status = True if not failed else False

if status:
log.info('output file verification succeeded')
else:
log.warning('output file verification failed')

return status


def remove_from_stageout(lfn, job):
"""
From the given lfn from the stage-out list.
:param lfn: local file name (string).
:param job: job object
:return: [updated job object]
"""

log = get_logger(job.jobid)
outdata = []
for fspec in job.outdata:
if fspec.lfn == lfn:
log.info('removing %s from stage-out list' % lfn)
else:
outdata.append(fspec)
job.outdata = outdata


def remove_no_output_files(job):
"""
Remove files from output file list if they are listed in allowNoOutput and do not exist.
Expand Down
3 changes: 0 additions & 3 deletions pilot/user/atlas/diagnose.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ def is_user_code_missing(job):

return scan_file(stdout,
error_messages,
job.jobid,
warning_message="identified an \'%s\' message in %s" % (error_messages[0], os.path.basename(stdout)))


Expand All @@ -176,7 +175,6 @@ def is_out_of_space(job):

return scan_file(stderr,
error_messages,
job.jobid,
warning_message="identified a \'%s\' message in %s" % (error_messages[0], os.path.basename(stderr)))


Expand Down Expand Up @@ -229,7 +227,6 @@ def is_nfssqlite_locking_problem(job):

return scan_file(stdout,
error_messages,
job.jobid,
warning_message="identified an NFS/Sqlite locking problem in %s" % os.path.basename(stdout))


Expand Down
3 changes: 3 additions & 0 deletions pilot/user/atlas/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def create_input_file_metadata(file_dictionary, workdir, filename="PoolFileCatal
if '&' in xml:
xml = xml.replace('&', '&')

# stitch in the DOCTYPE
xml = xml.replace('<POOLFILECATALOG>', '<!DOCTYPE POOLFILECATALOG SYSTEM "InMemory">\n<POOLFILECATALOG>')

write_file(os.path.join(workdir, filename), xml, mute=False)

return xml
Expand Down
2 changes: 1 addition & 1 deletion pilot/user/atlas/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def verify_proxy(limit=None):
else:
envsetup = ''
#envsetup += ". %s;" % (arcproxy_setup)
envsetup += ". %s/atlas.cern.ch/repo/ATLASLocalRootBase/user/atlasLocalSetup.sh;" % get_file_system_root_path()
envsetup += ". %s/atlas.cern.ch/repo/ATLASLocalRootBase/user/atlasLocalSetup.sh --quiet;" % get_file_system_root_path()
if os.environ.get('ALRB_noGridMW', '').lower() != "yes":
envsetup += "lsetup emi;"
else:
Expand Down
Loading

0 comments on commit 7f1b3d3

Please sign in to comment.