diff --git a/PILOTVERSION b/PILOTVERSION index cec8c58b7..76c7bc1b0 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -2.1.25.11 \ No newline at end of file +2.2.0.25 \ No newline at end of file diff --git a/pilot/api/analytics.py b/pilot/api/analytics.py index 9f2d05303..e2bf347c2 100644 --- a/pilot/api/analytics.py +++ b/pilot/api/analytics.py @@ -114,7 +114,7 @@ def get_table(self, filename, header=None, separator="\t", convert_to_float=True return get_table_from_file(filename, header=header, separator=separator, convert_to_float=convert_to_float) - def get_fitted_data(self, filename, x_name='Time', y_name='PSS+Swap', precision=2, tails=True): + def get_fitted_data(self, filename, x_name='Time', y_name='pss+swap', precision=2, tails=True): """ Return a properly formatted job metrics string with analytics data. Currently the function returns a fit for PSS+Swap vs time, whose slope measures memory leaks. diff --git a/pilot/api/data.py b/pilot/api/data.py index 5d6bba36c..7813ac325 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -437,9 +437,9 @@ def transfer(self, files, activity='default', **kwargs): # noqa: C901 self.logger.warning('caught time-out exception: %s' % caught_errors[0]) else: code = errors.STAGEINFAILED if self.mode == 'stage-in' else errors.STAGEOUTFAILED # is it stage-in/out? - self.logger.fatal('caught_errors=%s' % str(caught_errors)) - self.logger.fatal('code=%s' % str(code)) - raise PilotException('failed to transfer files using copytools=%s' % (copytools), errmsg, code=code) + details = str(caught_errors) + ":" + 'failed to transfer files using copytools=%s' % copytools + self.logger.fatal(details) + raise PilotException(details, code=code) self.logger.debug('result=%s' % str(result)) return result diff --git a/pilot/common/errorcodes.py b/pilot/common/errorcodes.py index 4c430c560..dbe5e079b 100644 --- a/pilot/common/errorcodes.py +++ b/pilot/common/errorcodes.py @@ -134,6 +134,7 @@ class ErrorCodes: UNRECOGNIZEDTRFARGUMENTS = 1349 EMPTYOUTPUTFILE = 1350 UNRECOGNIZEDTRFSTDERR = 1351 + STATFILEPROBLEM = 1352 _error_messages = { GENERALERROR: "General pilot error, consult batch log", @@ -246,7 +247,8 @@ class ErrorCodes: SINGULARITYRESOURCEUNAVAILABLE: "Singularity: Resource temporarily unavailable", # not the same as RESOURCEUNAVAILABLE UNRECOGNIZEDTRFARGUMENTS: "Unrecognized transform arguments", EMPTYOUTPUTFILE: "Empty output file detected", - UNRECOGNIZEDTRFSTDERR: "Unrecognized fatal error in transform stderr" + UNRECOGNIZEDTRFSTDERR: "Unrecognized fatal error in transform stderr", + STATFILEPROBLEM: "Failed to stat proc file for CPU consumption calculation" } put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181] @@ -407,12 +409,32 @@ def format_diagnostics(self, code, diag): standard_message = self._error_messages[code] + ":" except Exception: standard_message = "" + + # extract the relevant info for reporting exceptions + if "Traceback" in diag: + pattern = 'details:(.+)' + found = re.findall(pattern, diag) + if found: + diag = found[0] + diag = diag.replace("[PilotException(\'", '') + diag = diag.replace('[PilotException(\"', '') + diag = diag.replace(' ', ' ') + try: if diag: - if len(diag) + len(standard_message) > max_message_length: - error_message = standard_message + diag[-(max_message_length - len(standard_message)):] + # ensure that the message to be displayed on the PanDA monitor is not longer than max_message_length + # if it is, then reformat it so that the standard message is always displayed first. + # e.g. "Failed to stage-in file:abcdefghijklmnopqrstuvwxyz0123456789" + if standard_message in diag: + if len(diag) > max_message_length: + error_message = standard_message + diag[-(max_message_length - len(standard_message)):] + else: + error_message = standard_message + diag[len(standard_message):][-max_message_length:] else: - error_message = standard_message + diag + if len(diag) + len(standard_message) > max_message_length: + error_message = standard_message + diag[-(max_message_length - len(standard_message)):] + else: + error_message = standard_message + diag else: error_message = standard_message except Exception: diff --git a/pilot/control/data.py b/pilot/control/data.py index 8a601279a..d239dccf5 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -165,7 +165,7 @@ def _stage_in(args, job): import traceback error_msg = traceback.format_exc() log.error(error_msg) - msg = errors.format_diagnostics(error.get_error_code(), error.get_last_error()) + msg = errors.format_diagnostics(error.get_error_code(), error_msg) job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code(), msg=msg) except Exception as error: log.error('failed to stage-in: error=%s' % error) @@ -659,7 +659,7 @@ def _do_stageout(job, xdata, activity, title): import traceback error_msg = traceback.format_exc() log.error(error_msg) - msg = errors.format_diagnostics(error.get_error_code(), error.get_last_error()) + msg = errors.format_diagnostics(error.get_error_code(), error_msg) job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code(), msg=msg) except Exception: import traceback diff --git a/pilot/control/job.py b/pilot/control/job.py index 6bdb8c4c0..9b40a8fa1 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -832,11 +832,8 @@ def get_dispatcher_dictionary(args): # override for RC dev pilots job_label = get_job_label(args) - logger.debug('infosys.queuedata.resource=%s' % infosys.queuedata.resource) - logger.debug('args.resource=%s' % args.resource) - data = { - 'siteName': args.resource, # replace it with `infosys.queuedata.resource` to remove redundant '-r' option of pilot.py + 'siteName': infosys.queuedata.resource, # next: remove redundant '-r' option of pilot.py 'computingElement': args.queue, 'prodSourceLabel': job_label, 'diskSpace': _diskspace, diff --git a/pilot/copytool/common.py b/pilot/copytool/common.py index 093fe9e4c..135fb7e07 100644 --- a/pilot/copytool/common.py +++ b/pilot/copytool/common.py @@ -18,18 +18,19 @@ logger = logging.getLogger(__name__) -def get_timeout(filesize): +def get_timeout(filesize, add=0): """ Get a proper time-out limit based on the file size. :param filesize: file size (int). - :return: + :param add: optional additional time to be added [s] (int) + :return: time-out in seconds (int). """ timeout_max = 3 * 3600 # 3 hours timeout_min = 300 # self.timeout - timeout = timeout_min + int(filesize / 0.5e6) # approx < 0.5 Mb/sec + timeout = timeout_min + int(filesize / 0.5e6) + add # approx < 0.5 Mb/sec return min(timeout, timeout_max) @@ -57,13 +58,15 @@ def verify_catalog_checksum(fspec, path): state = 'UNKNOWN_CHECKSUM_TYPE' else: checksum_local = calculate_checksum(path, algorithm=checksum_type) + if checksum_type == 'ad32': + checksum_type = 'adler32' logger.info('checksum (catalog): %s (type: %s)' % (checksum_catalog, checksum_type)) logger.info('checksum (local): %s' % checksum_local) if checksum_local and checksum_local != '' and checksum_local != checksum_catalog: - diagnostics = 'checksum verification failed: checksum (catalog)=%s != checksum (local)=%s' % \ - (checksum_catalog, checksum_local) + diagnostics = 'checksum verification failed for LFN=%s: checksum (catalog)=%s != checksum (local)=%s' % \ + (fspec.lfn, checksum_catalog, checksum_local) logger.warning(diagnostics) - fspec.status_code = ErrorCodes.GETADMISMATCH if checksum_type == 'ad32' else ErrorCodes.GETMD5MISMATCH + fspec.status_code = ErrorCodes.GETADMISMATCH if checksum_type == 'adler32' else ErrorCodes.GETMD5MISMATCH fspec.status = 'failed' state = 'AD_MISMATCH' if checksum_type == 'ad32' else 'MD_MISMATCH' else: diff --git a/pilot/copytool/gfal.py b/pilot/copytool/gfal.py index bcf3db501..0b95f4be6 100644 --- a/pilot/copytool/gfal.py +++ b/pilot/copytool/gfal.py @@ -57,7 +57,7 @@ def copy_in(files, **kwargs): if not check_for_gfal(): raise StageInFailure("No GFAL2 tools found") - localsite = os.environ.get('DQ2_LOCAL_SITE_ID', None) + localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None)) for fspec in files: # update the trace report localsite = localsite if localsite else fspec.ddmendpoint diff --git a/pilot/copytool/lsm.py b/pilot/copytool/lsm.py index 22cdecc87..73b11fb28 100644 --- a/pilot/copytool/lsm.py +++ b/pilot/copytool/lsm.py @@ -75,7 +75,7 @@ def copy_in(files, **kwargs): copysetup = get_copysetup(copytools, 'lsm') trace_report = kwargs.get('trace_report') allow_direct_access = kwargs.get('allow_direct_access') - localsite = os.environ.get('DQ2_LOCAL_SITE_ID', None) + localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None)) for fspec in files: # update the trace report @@ -315,7 +315,7 @@ def move(source, destination, dst_in=True, copysetup="", options=None): cmd += "lsm-put %s" % args try: - exit_code, stdout, stderr = execute(cmd) #, timeout=get_timeout(fspec.filesize)) + exit_code, stdout, stderr = execute(cmd, usecontainer=False, copytool=True) #, timeout=get_timeout(fspec.filesize)) except Exception as e: if dst_in: exit_code = ErrorCodes.STAGEINFAILED diff --git a/pilot/copytool/objectstore.py b/pilot/copytool/objectstore.py index 9f7f40685..a42c9575e 100644 --- a/pilot/copytool/objectstore.py +++ b/pilot/copytool/objectstore.py @@ -131,10 +131,14 @@ def copy_in(files, **kwargs): cmd += ['/usr/bin/env', 'rucio', '-v', 'download', '--no-subdir', '--dir', dst] if require_replicas: cmd += ['--rse', fspec.replicas[0][0]] - if fspec.surl: + + # a copytool module should consider fspec.turl for transfers, and could failback to fspec.surl, + # but normally fspec.turl (transfer url) is mandatory and already populated by the top workflow + turl = fspec.turl or fspec.surl + if turl: if fspec.ddmendpoint: cmd.extend(['--rse', fspec.ddmendpoint]) - cmd.extend(['--pfn', fspec.surl]) + cmd.extend(['--pfn', turl]) cmd += ['%s:%s' % (fspec.scope, fspec.lfn)] rcode, stdout, stderr = execute(" ".join(cmd), **kwargs) diff --git a/pilot/copytool/rucio.py b/pilot/copytool/rucio.py index b068e7914..3feda3f96 100644 --- a/pilot/copytool/rucio.py +++ b/pilot/copytool/rucio.py @@ -9,6 +9,7 @@ # - Alexey Anisenkov, anisyonk@cern.ch, 2018 # - Paul Nilsson, paul.nilsson@cern.ch, 2018 # - Tomas Javurek, tomas.javurek@cern.ch, 2019 +# - David Cameron, david.cameron@cern.ch, 2019 from __future__ import absolute_import @@ -19,6 +20,7 @@ from .common import resolve_common_transfer_errors, verify_catalog_checksum, get_timeout from pilot.common.exception import PilotException, StageOutFailure, ErrorCodes +#from pilot.util.timer import timeout logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -49,7 +51,7 @@ def verify_stage_out(fspec): return rsemgr.exists(rse_settings, [uploaded_file]) -# @timeout(seconds=600) +#@timeout(seconds=10800) def copy_in(files, **kwargs): """ Download given files using rucio copytool. @@ -66,7 +68,7 @@ def copy_in(files, **kwargs): # don't spoil the output, we depend on stderr parsing os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]' - localsite = os.environ.get('DQ2_LOCAL_SITE_ID', None) + localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None)) for fspec in files: logger.info('rucio copytool, downloading file with scope:%s lfn:%s' % (str(fspec.scope), str(fspec.lfn))) # update the trace report @@ -90,6 +92,8 @@ def copy_in(files, **kwargs): trace_report_out = [] try: + #transfer_timeout = get_timeout(fspec.filesize, add=10) # give the API a chance to do the time-out first + #timeout(transfer_timeout)(_stage_in_api)(dst, fspec, trace_report, trace_report_out) _stage_in_api(dst, fspec, trace_report, trace_report_out) except Exception as error: error_msg = str(error) @@ -130,7 +134,7 @@ def copy_in(files, **kwargs): return files -# @timeout(seconds=600) +#@timeout(seconds=10800) def copy_out(files, **kwargs): """ Upload given files using rucio copytool. @@ -147,8 +151,11 @@ def copy_out(files, **kwargs): ignore_errors = kwargs.pop('ignore_errors', False) trace_report = kwargs.get('trace_report') + localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None)) for fspec in files: logger.info('rucio copytool, uploading file with scope: %s and lfn: %s' % (str(fspec.scope), str(fspec.lfn))) + localsite = localsite if localsite else fspec.ddmendpoint + trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint) trace_report.update(scope=fspec.scope, dataset=fspec.dataset, url=fspec.surl, filesize=fspec.filesize) trace_report.update(catStart=time(), filename=fspec.lfn, guid=fspec.guid.replace('-', '')) fspec.status_code = 0 @@ -161,6 +168,8 @@ def copy_out(files, **kwargs): logger.info('the file will be uploaded to %s' % str(fspec.ddmendpoint)) trace_report_out = [] try: + #transfer_timeout = get_timeout(fspec.filesize, add=10) # give the API a chance to do the time-out first + #timeout(transfer_timeout)(_stage_out_api)(fspec, summary_file_path, trace_report, trace_report_out) _stage_out_api(fspec, summary_file_path, trace_report, trace_report_out) except Exception as error: error_msg = str(error) @@ -248,12 +257,16 @@ def _stage_in_api(dst, fspec, trace_report, trace_report_out): trace_pattern = {} if trace_report: trace_pattern = trace_report - result = [] + # download client raises an exception if any file failed if fspec.turl: - result = download_client.download_pfns([f], 1, trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out) + # restore the following line when it is supported on the rucio client + #result = download_client.download_pfns([f], 1, trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out) + result = download_client.download_pfns([f], 1, trace_custom_fields=trace_pattern) else: - result = download_client.download_dids([f], trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out) + # restore the following line when it is supported on the rucio client + #result = download_client.download_dids([f], trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out) + result = download_client.download_dids([f], trace_custom_fields=trace_pattern) logger.debug('Rucio download client returned %s' % result) @@ -295,11 +308,12 @@ def _stage_out_api(fspec, summary_file_path, trace_report, trace_report_out): # upload client raises an exception if any file failed try: # TODO: Add traces_copy_out=trace_report_out when supported in rucio - upload_client.upload([f], summary_file_path=summary_file_path) + result = upload_client.upload([f], summary_file_path=summary_file_path) except UnboundLocalError: logger.warning('rucio still needs a bug fix of the summary in the uploadclient') logger.debug('Rucio upload client returned %s' % result) + try: file_exists = verify_stage_out(fspec) logger.info('File exists at the storage: %s' % str(file_exists)) @@ -309,5 +323,3 @@ def _stage_out_api(fspec, summary_file_path, trace_report, trace_report_out): msg = 'stageOut: File existence verification failed with: %s' % str(e) logger.info(msg) raise StageOutFailure(msg) - - return 'DONE' diff --git a/pilot/copytool/xrdcp.py b/pilot/copytool/xrdcp.py index a5ba2df5d..129f36e73 100644 --- a/pilot/copytool/xrdcp.py +++ b/pilot/copytool/xrdcp.py @@ -138,7 +138,7 @@ def copy_in(files, **kwargs): coption = _resolve_checksum_option(setup, **kwargs) trace_report = kwargs.get('trace_report') - localsite = os.environ.get('DQ2_LOCAL_SITE_ID', None) + localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None)) for fspec in files: # update the trace report localsite = localsite if localsite else fspec.ddmendpoint diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index 567636dfc..8c336fd8f 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -293,31 +293,45 @@ def get_generic_payload_command(cmd, job, prepareasetup, userjob): def add_athena_proc_number(cmd): """ - Add the ATHENA_PROC_NUMBER to the payload command if necessary + Add the ATHENA_PROC_NUMBER and ATHENA_CORE_NUMBER to the payload command if necessary. :param cmd: payload execution command (string). :return: updated payload execution command (string). """ + # get the values if they exist + try: + value1 = int(os.environ['ATHENA_PROC_NUMBER_JOB']) + except Exception as e: + logger.warning('failed to convert ATHENA_PROC_NUMBER_JOB to int: %s' % e) + value1 = None + try: + value2 = int(os.environ['ATHENA_CORE_NUMBER']) + except Exception as e: + logger.warning('failed to convert ATHENA_CORE_NUMBER to int: %s' % e) + value2 = None + if "ATHENA_PROC_NUMBER" not in cmd: if "ATHENA_PROC_NUMBER" in os.environ: cmd = 'export ATHENA_PROC_NUMBER=%s;' % os.environ['ATHENA_PROC_NUMBER'] + cmd - elif "ATHENA_PROC_NUMBER_JOB" in os.environ: - try: - value = int(os.environ['ATHENA_PROC_NUMBER_JOB']) - except Exception: - logger.warning("failed to convert ATHENA_PROC_NUMBER_JOB=%s to int" % - os.environ['ATHENA_PROC_NUMBER_JOB']) + elif "ATHENA_PROC_NUMBER_JOB" in os.environ and value1: + if value1 > 1: + cmd = 'export ATHENA_PROC_NUMBER=%d;' % value1 + cmd else: - if value > 1: - cmd = 'export ATHENA_PROC_NUMBER=%d;' % value + cmd - else: - logger.info("will not add ATHENA_PROC_NUMBER to cmd since the value is %d" % value) + logger.info("will not add ATHENA_PROC_NUMBER to cmd since the value is %s" % str(value1)) else: logger.warning("don't know how to set ATHENA_PROC_NUMBER (could not find it in os.environ)") else: logger.info("ATHENA_PROC_NUMBER already in job command") + if 'ATHENA_CORE_NUMBER' in os.environ and value2: + if value2 > 1: + cmd = 'export ATHENA_CORE_NUMBER=%d;' % value2 + cmd + else: + logger.info("will not add ATHENA_CORE_NUMBER to cmd since the value is %s" % str(value2)) + else: + logger.warning('there is no ATHENA_CORE_NUMBER in os.environ (cannot add it to payload command)') + return cmd @@ -1597,9 +1611,12 @@ def verify_ncores(corecount): # ATHENA_PROC_NUMBER_JOB will always be the value from the job definition) if athena_proc_number: logger.info("encountered a set ATHENA_PROC_NUMBER (%d), will not overwrite it" % athena_proc_number) + logger.info('set ATHENA_CORE_NUMBER to same value as ATHENA_PROC_NUMBER') + os.environ['ATHENA_CORE_NUMBER'] = "%s" % athena_proc_number else: os.environ['ATHENA_PROC_NUMBER_JOB'] = "%s" % corecount - logger.info("set ATHENA_PROC_NUMBER_JOB to %s (ATHENA_PROC_NUMBER will not be overwritten)" % corecount) + os.environ['ATHENA_CORE_NUMBER'] = "%s" % corecount + logger.info("set ATHENA_PROC_NUMBER_JOB and ATHENA_CORE_NUMBER to %s (ATHENA_PROC_NUMBER will not be overwritten)" % corecount) def verify_job(job): diff --git a/pilot/user/atlas/container.py b/pilot/user/atlas/container.py index dead6c354..c79be4df0 100644 --- a/pilot/user/atlas/container.py +++ b/pilot/user/atlas/container.py @@ -14,7 +14,7 @@ from pilot.user.atlas.setup import get_asetup from pilot.user.atlas.setup import get_file_system_root_path -from pilot.info import infosys +from pilot.info import InfoService, infosys from pilot.util.auxiliary import get_logger from pilot.util.config import config from pilot.util.filehandling import write_file @@ -32,13 +32,11 @@ def do_use_container(**kwargs): :return: True if function has decided that a container should be used, False otherwise (boolean). """ - use_container = False - - # return use_container - # to force no container use: return False + use_container = False - job = kwargs.get('job') + job = kwargs.get('job', False) + copytool = kwargs.get('copytool', False) if job: # for user jobs, TRF option --containerImage must have been used, ie imagename must be set if job.is_analysis() and job.imagename: @@ -48,6 +46,9 @@ def do_use_container(**kwargs): container_name = queuedata.container_type.get("pilot") if container_name == 'singularity': use_container = True + elif copytool: + # override for copytools - use a container for stage-in/out + use_container = True return use_container @@ -64,7 +65,7 @@ def wrapper(executable, **kwargs): workdir = kwargs.get('workdir', '.') pilot_home = os.environ.get('PILOT_HOME', '') - job = kwargs.get('job') + job = kwargs.get('job', None) logger.info('container wrapper called') @@ -72,11 +73,11 @@ def wrapper(executable, **kwargs): workdir = pilot_home # if job.imagename (from --containerimage ) is set, then always use raw singularity - if config.Container.setup_type == "ALRB" and not job.imagename: + if config.Container.setup_type == "ALRB" and job and not job.imagename: fctn = alrb_wrapper else: fctn = singularity_wrapper - return fctn(executable, workdir, job) + return fctn(executable, workdir, job=job) # def use_payload_container(job): @@ -127,7 +128,7 @@ def get_grid_image_for_singularity(platform): """ Return the full path to the singularity grid image - :param platform (string): E.g. "x86_64-slc6" + :param platform: E.g. "x86_64-slc6" (string). :return: full path to grid image (string). """ @@ -181,7 +182,7 @@ def get_middleware_type(): return middleware_type -def alrb_wrapper(cmd, workdir, job): +def alrb_wrapper(cmd, workdir, job=None): """ Wrap the given command with the special ALRB setup for containers E.g. cmd = /bin/bash hello_world.sh @@ -196,6 +197,10 @@ def alrb_wrapper(cmd, workdir, job): :return: prepended command with singularity execution command (string). """ + if not job: + logger.warning('the ALRB wrapper did not get a job object - cannot proceed') + return cmd + log = get_logger(job.jobid) queuedata = job.infosys.queuedata @@ -310,27 +315,32 @@ def remove_container_string(job_params): return job_params, container_path -def singularity_wrapper(cmd, workdir, job): +def singularity_wrapper(cmd, workdir, job=None): """ Prepend the given command with the singularity execution command E.g. cmd = /bin/bash hello_world.sh -> singularity_command = singularity exec -B /bin/bash hello_world.sh singularity exec -B /cvmfs/atlas.cern.ch/repo/images/singularity/x86_64-slc6.img