diff --git a/PILOTVERSION b/PILOTVERSION index cec8c58b7..76c7bc1b0 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -2.1.25.11 \ No newline at end of file +2.2.0.25 \ No newline at end of file diff --git a/pilot/api/analytics.py b/pilot/api/analytics.py index 9f2d05303..e2bf347c2 100644 --- a/pilot/api/analytics.py +++ b/pilot/api/analytics.py @@ -114,7 +114,7 @@ def get_table(self, filename, header=None, separator="\t", convert_to_float=True return get_table_from_file(filename, header=header, separator=separator, convert_to_float=convert_to_float) - def get_fitted_data(self, filename, x_name='Time', y_name='PSS+Swap', precision=2, tails=True): + def get_fitted_data(self, filename, x_name='Time', y_name='pss+swap', precision=2, tails=True): """ Return a properly formatted job metrics string with analytics data. Currently the function returns a fit for PSS+Swap vs time, whose slope measures memory leaks. diff --git a/pilot/api/data.py b/pilot/api/data.py index 44cef134e..7813ac325 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -18,7 +18,6 @@ import time from pilot.info import infosys -from pilot.info.storageactivitymaps import get_ddm_activity from pilot.common.exception import PilotException, ErrorCodes, SizeTooLarge, NoLocalSpace, ReplicasNotFound from pilot.util.filehandling import calculate_checksum from pilot.util.math import convert_mb_to_b @@ -109,6 +108,48 @@ def get_preferred_replica(self, replicas, allowed_schemas): if replica and (not schema or replica.startswith('%s://' % schema)): return replica + def prepare_sources(self, files, activities=None): + """ + Customize/prepare source data for each entry in `files` optionally checking data for requested `activities` + (custom StageClient could extend the logic if need) + :param files: list of `FileSpec` objects to be processed + :param activities: string or ordered list of activities to resolve `astorages` (optional) + :return: None + """ + + return + + def prepare_inputddms(self, files, activities=None): + """ + Populates filespec.inputddms for each entry from `files` list + :param files: list of `FileSpec` objects + :param activities: sting or ordered list of activities to resolve astorages (optional) + :return: None + """ + + activities = activities or 'read_lan' + if isinstance(activities, basestring): + activities = [activities] + + astorages = self.infosys.queuedata.astorages if self.infosys and self.infosys.queuedata else {} + + storages = [] + for a in activities: + storages = astorages.get(a, []) + if storages: + break + + #activity = activities[0] + #if not storages: ## ignore empty astorages + # raise PilotException("Failed to resolve input sources: no associated storages defined for activity=%s (%s)" + # % (activity, ','.join(activities)), code=ErrorCodes.NOSTORAGE, state='NO_ASTORAGES_DEFINED') + + for fdat in files: + if not fdat.inputddms: + fdat.inputddms = storages + if not fdat.inputddms and fdat.ddmendpoint: + fdat.inputddms = [fdat.ddmendpoint] + def resolve_replicas(self, files): # noqa: C901 """ Populates filespec.replicas for each entry from `files` list @@ -117,8 +158,7 @@ def resolve_replicas(self, files): # noqa: C901 :return: `files` """ - logger = self.logger ## the function could be static if logger will be moved outside - + logger = self.logger xfiles = [] #ddmconf = self.infosys.resolve_storage_data() @@ -131,11 +171,6 @@ def resolve_replicas(self, files): # noqa: C901 #fdat.accessmode = 'copy' ### quick hack to avoid changing logic below for DIRECT access handling ## REVIEW AND FIX ME LATER #fdat.allowremoteinputs = False ### quick hack to avoid changing logic below for DIRECT access handling ## REVIEW AND FIX ME LATER - - if not fdat.inputddms and self.infosys.queuedata: - fdat.inputddms = self.infosys.queuedata.astorages.get('pr', {}) ## FIX ME LATER: change to proper activity=read_lan - if not fdat.inputddms and fdat.ddmendpoint: - fdat.inputddms = [fdat.ddmendpoint] xfiles.append(fdat) if not xfiles: # no files for replica look-up @@ -319,9 +354,6 @@ def transfer(self, files, activity='default', **kwargs): # noqa: C901 if 'default' not in activity: activity.append('default') - # stage-in/out? - #mode = kwargs.get('mode', '') - copytools = None for aname in activity: copytools = self.acopytools.get(aname) @@ -332,6 +364,18 @@ def transfer(self, files, activity='default', **kwargs): # noqa: C901 raise PilotException('failed to resolve copytool by preferred activities=%s, acopytools=%s' % (activity, self.acopytools)) + # populate inputddms if need + self.prepare_inputddms(files) + + # initialize ddm_activity name for requested files if not set + for fspec in files: + if fspec.ddm_activity: # skip already initialized data + continue + if self.mode == 'stage-in': + fspec.ddm_activity = filter(None, ['read_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'read_wan']) + else: + fspec.ddm_activity = filter(None, ['write_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'write_wan']) + result, caught_errors = None, [] for name in copytools: @@ -379,24 +423,116 @@ def transfer(self, files, activity='default', **kwargs): # noqa: C901 break if not result: + # Propagate message from first error back up + errmsg = str(caught_errors[0]) if caught_errors else '' if caught_errors and "Cannot authenticate" in str(caught_errors): code = ErrorCodes.STAGEINAUTHENTICATIONFAILURE elif caught_errors and "bad queue configuration" in str(caught_errors): code = ErrorCodes.BADQUEUECONFIGURATION - elif caught_errors and isinstance(caught_errors[-1], PilotException): + elif caught_errors and isinstance(caught_errors[0], PilotException): code = caught_errors[0].get_error_code() - elif caught_errors and isinstance(caught_errors[-1], TimeoutException): + errmsg = caught_errors[0].get_last_error() + elif caught_errors and isinstance(caught_errors[0], TimeoutException): code = errors.STAGEINTIMEOUT if self.mode == 'stage-in' else errors.STAGEOUTTIMEOUT # is it stage-in/out? self.logger.warning('caught time-out exception: %s' % caught_errors[0]) else: - code = ErrorCodes.STAGEINFAILED - self.logger.fatal('caught_errors=%s' % str(caught_errors)) - self.logger.fatal('code=%s' % str(code)) - raise PilotException('failed to transfer files using copytools=%s, error=%s' % (copytools, caught_errors), code=code) + code = errors.STAGEINFAILED if self.mode == 'stage-in' else errors.STAGEOUTFAILED # is it stage-in/out? + details = str(caught_errors) + ":" + 'failed to transfer files using copytools=%s' % copytools + self.logger.fatal(details) + raise PilotException(details, code=code) self.logger.debug('result=%s' % str(result)) return result + def require_protocols(self, files, copytool, activity): + """ + Populates fspec.protocols and fspec.turl for each entry in `files` according to preferred fspec.ddm_activity + :param files: list of `FileSpec` objects + :param activity: str or ordered list of transfer activity names to resolve acopytools related data + :return: None + """ + + allowed_schemas = getattr(copytool, 'allowed_schemas', None) + + if self.infosys and self.infosys.queuedata: + copytool_name = copytool.__name__.rsplit('.', 1)[-1] + allowed_schemas = self.infosys.queuedata.resolve_allowed_schemas(activity, copytool_name) or allowed_schemas + + files = self.resolve_protocols(files) + ddmconf = self.infosys.resolve_storage_data() + + for fspec in files: + + protocols = self.resolve_protocol(fspec, allowed_schemas) + if not protocols: # no protocols found + error = 'Failed to resolve protocol for file=%s, allowed_schemas=%s, fspec=%s' % (fspec.lfn, allowed_schemas, fspec) + self.logger.error("resolve_protocol: %s" % error) + raise PilotException(error, code=ErrorCodes.NOSTORAGEPROTOCOL) + + # take first available protocol for copytool: FIX ME LATER if need (do iterate over all allowed protocols?) + protocol = protocols[0] + + self.logger.info("Resolved protocol to be used for transfer lfn=%s: data=%s" % (protocol, fspec.lfn)) + + resolve_surl = getattr(copytool, 'resolve_surl', None) + if not callable(resolve_surl): + resolve_surl = self.resolve_surl + + r = resolve_surl(fspec, protocol, ddmconf) ## pass ddmconf for possible custom look up at the level of copytool + if r.get('surl'): + fspec.turl = r['surl'] + if r.get('ddmendpoint'): + fspec.ddmendpoint = r['ddmendpoint'] + + def resolve_protocols(self, files): + """ + Populates filespec.protocols for each entry from `files` according to preferred `fspec.ddm_activity` value + :param files: list of `FileSpec` objects + fdat.protocols = [dict(endpoint, path, flavour), ..] + :return: `files` + """ + + ddmconf = self.infosys.resolve_storage_data() + + for fdat in files: + ddm = ddmconf.get(fdat.ddmendpoint) + if not ddm: + error = 'Failed to resolve output ddmendpoint by name=%s (from PanDA), please check configuration.' % fdat.ddmendpoint + self.logger.error("resolve_protocols: %s, fspec=%s" % (error, fdat)) + raise PilotException(error, code=ErrorCodes.NOSTORAGE) + + protocols = [] + for aname in fdat.ddm_activity: + protocols = ddm.arprotocols.get(aname) + if protocols: + break + + fdat.protocols = protocols + + return files + + @classmethod + def resolve_protocol(self, fspec, allowed_schemas=None): + """ + Resolve protocols according to allowed schema + :param fspec: `FileSpec` instance + :param allowed_schemas: list of allowed schemas or any if None + :return: list of dict(endpoint, path, flavour) + """ + + if not fspec.protocols: + return [] + + protocols = [] + + allowed_schemas = allowed_schemas or [None] + for schema in allowed_schemas: + for pdat in fspec.protocols: + if schema is None or pdat.get('endpoint', '').startswith("%s://" % schema): + protocols.append(pdat) + + return protocols + class StageInClient(StagingClient): @@ -413,7 +549,7 @@ def resolve_replica(self, fspec, primary_schemas=None, allowed_schemas=None): """ if not fspec.replicas: - self.logger.warning('resolve_replicas() recevied no fspec.replicas') + self.logger.warning('resolve_replicas() received no fspec.replicas') return allowed_schemas = allowed_schemas or [None] @@ -521,8 +657,10 @@ def transfer_files(self, copytool, files, activity=None, **kwargs): if allow_direct_access: self.set_accessmodes_for_direct_access(files, direct_access_type) - if getattr(copytool, 'require_replicas', False) and files and files[0].replicas is None: - files = self.resolve_replicas(files) + if getattr(copytool, 'require_replicas', False) and files: + if files[0].replicas is None: ## look up replicas only once + files = self.resolve_replicas(files) + allowed_schemas = getattr(copytool, 'allowed_schemas', None) if self.infosys and self.infosys.queuedata: @@ -531,8 +669,7 @@ def transfer_files(self, copytool, files, activity=None, **kwargs): for fspec in files: resolve_replica = getattr(copytool, 'resolve_replica', None) - if not callable(resolve_replica): - resolve_replica = self.resolve_replica + resolve_replica = self.resolve_replica if not callable(resolve_replica) else resolve_replica ## prepare schemas which will be used to look up first the replicas allowed for direct access mode primary_schemas = self.direct_localinput_allowed_schemas if fspec.accessmode == 'direct' else None @@ -550,6 +687,10 @@ def transfer_files(self, copytool, files, activity=None, **kwargs): self.logger.info("[stage-in] found replica to be used for lfn=%s: ddmendpoint=%s, pfn=%s" % (fspec.lfn, fspec.ddmendpoint, fspec.turl)) + # prepare files (resolve protocol/transfer url) + if getattr(copytool, 'require_input_protocols', False) and files: + self.require_protocols(files, copytool, activity) + if not copytool.is_valid_for_copy_in(files): msg = 'input is not valid for transfers using copytool=%s' % copytool self.logger.warning(msg) @@ -628,57 +769,6 @@ class StageOutClient(StagingClient): mode = "stage-out" - def resolve_protocols(self, files, activity): - """ - Populates filespec.protocols for each entry from `files` according to requested `activity` - :param files: list of `FileSpec` objects - :param activity: ordered list of preferred activity names to resolve SE protocols - fdat.protocols = [dict(endpoint, path, flavour), ..] - :return: `files` - """ - - ddmconf = self.infosys.resolve_storage_data() - - if isinstance(activity, basestring): - activity = [activity] - - for fdat in files: - ddm = ddmconf.get(fdat.ddmendpoint) - if not ddm: - raise Exception("Failed to resolve output ddmendpoint by name=%s (from PanDA), please check configuration. fdat=%s" % (fdat.ddmendpoint, fdat)) - - protocols = [] - for aname in activity: - aname = get_ddm_activity(aname) - protocols = ddm.arprotocols.get(aname) - if protocols: - break - - fdat.protocols = protocols - - return files - - def resolve_protocol(self, fspec, allowed_schemas=None): - """ - Resolve protocols according to allowed schema - :param fspec: `FileSpec` instance - :param allowed_schemas: list of allowed schemas or any if None - :return: list of dict(endpoint, path, flavour) - """ - - if not fspec.protocols: - return [] - - protocols = [] - - allowed_schemas = allowed_schemas or [None] - for schema in allowed_schemas: - for pdat in fspec.protocols: - if schema is None or pdat.get('endpoint', '').startswith("%s://" % schema): - protocols.append(pdat) - - return protocols - def prepare_destinations(self, files, activities): """ Resolve destination RSE (filespec.ddmendpoint) for each entry from `files` according to requested `activities` @@ -772,7 +862,7 @@ def resolve_surl(self, fspec, protocol, ddmconf, **kwargs): surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), self.get_path(fspec.scope, fspec.lfn)) return {'surl': surl} - def transfer_files(self, copytool, files, activity, **kwargs): # noqa: C901 + def transfer_files(self, copytool, files, activity, **kwargs): """ Automatically stage out files using the selected copy tool module. @@ -792,7 +882,7 @@ def transfer_files(self, copytool, files, activity, **kwargs): # noqa: C901 if not os.path.isfile(pfn) or not os.access(pfn, os.R_OK): msg = "Error: output pfn file does not exist: %s" % pfn self.logger.error(msg) - self.trace_report.update(clientState='NO_REPLICA', stateReason=msg) + self.trace_report.update(clientState='MISSINGOUTPUTFILE', stateReason=msg) self.trace_report.send() raise PilotException(msg, code=ErrorCodes.MISSINGOUTPUTFILE, state="FILE_INFO_FAIL") if not fspec.filesize: @@ -810,38 +900,7 @@ def transfer_files(self, copytool, files, activity, **kwargs): # noqa: C901 # prepare files (resolve protocol/transfer url) if getattr(copytool, 'require_protocols', True) and files: - - ddmconf = self.infosys.resolve_storage_data() - allowed_schemas = getattr(copytool, 'allowed_schemas', None) - - if self.infosys and self.infosys.queuedata: - copytool_name = copytool.__name__.rsplit('.', 1)[-1] - allowed_schemas = self.infosys.queuedata.resolve_allowed_schemas(activity, copytool_name) or allowed_schemas - - files = self.resolve_protocols(files, activity) - - for fspec in files: - - protocols = self.resolve_protocol(fspec, allowed_schemas) - if not protocols: # no protocols found - error = 'Failed to resolve protocol for file=%s, allowed_schemas=%s, fspec=%s' % (fspec.lfn, allowed_schemas, fspec) - self.logger.error("resolve_protocol: %s" % error) - raise PilotException(error, code=ErrorCodes.NOSTORAGEPROTOCOL) - - # take first available protocol for copytool: FIX ME LATER if need (do iterate over all allowed protocols?) - protocol = protocols[0] - - self.logger.info("resolved protocol to be used for transfer: data=%s" % protocol) - - resolve_surl = getattr(copytool, 'resolve_surl', None) - if not callable(resolve_surl): - resolve_surl = self.resolve_surl - - r = resolve_surl(fspec, protocol, ddmconf, activity=activity) ## pass ddmconf & activity for possible custom look up at the level of copytool - if r.get('surl'): - fspec.turl = r['surl'] - if r.get('ddmendpoint'): - fspec.ddmendpoint = r['ddmendpoint'] + self.require_protocols(files, copytool, activity) if not copytool.is_valid_for_copy_out(files): self.logger.warning('Input is not valid for transfers using copytool=%s' % copytool) diff --git a/pilot/api/es_data.py b/pilot/api/es_data.py index d21944f08..44e034814 100644 --- a/pilot/api/es_data.py +++ b/pilot/api/es_data.py @@ -6,6 +6,7 @@ # # Authors: # - Wen Guan, wen.guan@cern,ch, 2018 +# - Alexey Anisenkov, anisyonk@cern.ch, 2019 import traceback import logging @@ -17,6 +18,7 @@ logger = logging.getLogger(__name__) +## THIS CLASS CAN BE DEPREATED AND REMOVED (anisyonk) class StagingESClient(StagingClient): """ Base ES Staging Client @@ -125,7 +127,8 @@ def transfer(self, files, activity=['pw'], **kwargs): # noqa: C901 return result -class StageInESClient(StagingESClient, StageInClient): +## THIS CLASS CAN BE DEPREATED AND REMOVED (anisyonk) +class StageInESClientDeprecateME(StagingESClient, StageInClient): def process_storage_id(self, files): """ @@ -157,7 +160,8 @@ def transfer_files(self, copytool, files, activity, ddmendpoint, **kwargs): return super(StageInESClient, self).transfer_files(copytool, files, activity=activity, ddmendpoint=ddmendpoint, **kwargs) -class StageOutESClient(StagingESClient, StageOutClient): +## THIS CLASS CAN BE DEPREATED AND REMOVED (anisyonk) +class StageOutESClientDeprecateME(StagingESClient, StageOutClient): def transfer_files(self, copytool, files, activity, ddmendpoint, **kwargs): """ @@ -179,3 +183,45 @@ def transfer_files(self, copytool, files, activity, ddmendpoint, **kwargs): fspec.ddmendpoint = ddmendpoint return super(StageOutESClient, self).transfer_files(copytool, files, activity, **kwargs) + + +class StageInESClient(StageInClient): + + def __init__(self, *argc, **kwargs): + super(StageInESClient, self).__init__(*argc, **kwargs) + + self.copytool_modules.setdefault('objectstore', {'module_name': 'objectstore'}) + self.acopytools.setdefault('es_events_read', ['objectstore']) + + def prepare_sources(self, files, activities=None): + """ + Customize/prepare source data for each entry in `files` optionally checking data for requested `activities` + (custom StageClient could extend this logic if need) + :param files: list of `FileSpec` objects to be processed + :param activities: string or ordered list of activities to resolve `astorages` (optional) + :return: None + + If storage_id is specified, replace ddmendpoint by parsing storage_id + """ + + if not self.infosys: + self.logger.warning('infosys instance is not initialized: skip calling prepare_sources()') + return + + for fspec in files: + if fspec.storage_token: ## FIX ME LATER: no need to parse each time storage_id, all this staff should be applied in FileSpec clean method + storage_id, path_convention = fspec.get_storage_id_and_path_convention() + if path_convention and path_convention == 1000: + fspec.scope = 'transient' + if storage_id: + fspec.ddmendpoint = self.infosys.get_ddmendpoint(storage_id) + logger.info("Processed file with storage id: %s" % fspec) + + +class StageOutESClient(StageOutClient): + + def __init__(self, *argc, **kwargs): + super(StageOutESClient, self).__init__(*argc, **kwargs) + + self.copytool_modules.setdefault('objectstore', {'module_name': 'objectstore'}) + self.acopytools.setdefault('es_events', ['objectstore']) diff --git a/pilot/common/errorcodes.py b/pilot/common/errorcodes.py index 38784eac4..dbe5e079b 100644 --- a/pilot/common/errorcodes.py +++ b/pilot/common/errorcodes.py @@ -134,6 +134,7 @@ class ErrorCodes: UNRECOGNIZEDTRFARGUMENTS = 1349 EMPTYOUTPUTFILE = 1350 UNRECOGNIZEDTRFSTDERR = 1351 + STATFILEPROBLEM = 1352 _error_messages = { GENERALERROR: "General pilot error, consult batch log", @@ -246,7 +247,8 @@ class ErrorCodes: SINGULARITYRESOURCEUNAVAILABLE: "Singularity: Resource temporarily unavailable", # not the same as RESOURCEUNAVAILABLE UNRECOGNIZEDTRFARGUMENTS: "Unrecognized transform arguments", EMPTYOUTPUTFILE: "Empty output file detected", - UNRECOGNIZEDTRFSTDERR: "Unrecognized fatal error in transform stderr" + UNRECOGNIZEDTRFSTDERR: "Unrecognized fatal error in transform stderr", + STATFILEPROBLEM: "Failed to stat proc file for CPU consumption calculation" } put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181] @@ -402,16 +404,37 @@ def format_diagnostics(self, code, diag): :return: formatted error diagnostics (string). """ + max_message_length = 256 try: standard_message = self._error_messages[code] + ":" except Exception: standard_message = "" + + # extract the relevant info for reporting exceptions + if "Traceback" in diag: + pattern = 'details:(.+)' + found = re.findall(pattern, diag) + if found: + diag = found[0] + diag = diag.replace("[PilotException(\'", '') + diag = diag.replace('[PilotException(\"', '') + diag = diag.replace(' ', ' ') + try: if diag: - if len(diag) >= len(standard_message): - error_message = standard_message + diag[-(len(diag) - len(standard_message)):] + # ensure that the message to be displayed on the PanDA monitor is not longer than max_message_length + # if it is, then reformat it so that the standard message is always displayed first. + # e.g. "Failed to stage-in file:abcdefghijklmnopqrstuvwxyz0123456789" + if standard_message in diag: + if len(diag) > max_message_length: + error_message = standard_message + diag[-(max_message_length - len(standard_message)):] + else: + error_message = standard_message + diag[len(standard_message):][-max_message_length:] else: - error_message = standard_message + diag[-(len(standard_message) - len(diag)):] + if len(diag) + len(standard_message) > max_message_length: + error_message = standard_message + diag[-(max_message_length - len(standard_message)):] + else: + error_message = standard_message + diag else: error_message = standard_message except Exception: diff --git a/pilot/common/exception.py b/pilot/common/exception.py index 0f64de849..ea10afcca 100644 --- a/pilot/common/exception.py +++ b/pilot/common/exception.py @@ -69,6 +69,11 @@ def get_detail(self): def get_error_code(self): return self._errorCode + def get_last_error(self): + if self.args: + return self.args[-1] + return self._message + class NotImplemented(PilotException): """ diff --git a/pilot/control/data.py b/pilot/control/data.py index 99688e156..d239dccf5 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -159,7 +159,7 @@ def _stage_in(args, job): client = StageInClient(job.infosys, logger=log, trace_report=trace_report) activity = 'pr' kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job) #, mode='stage-in') - + client.prepare_sources(job.indata) client.transfer(job.indata, activity=activity, **kwargs) except PilotException as error: import traceback diff --git a/pilot/control/job.py b/pilot/control/job.py index 6bdb8c4c0..9b40a8fa1 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -832,11 +832,8 @@ def get_dispatcher_dictionary(args): # override for RC dev pilots job_label = get_job_label(args) - logger.debug('infosys.queuedata.resource=%s' % infosys.queuedata.resource) - logger.debug('args.resource=%s' % args.resource) - data = { - 'siteName': args.resource, # replace it with `infosys.queuedata.resource` to remove redundant '-r' option of pilot.py + 'siteName': infosys.queuedata.resource, # next: remove redundant '-r' option of pilot.py 'computingElement': args.queue, 'prodSourceLabel': job_label, 'diskSpace': _diskspace, diff --git a/pilot/copytool/common.py b/pilot/copytool/common.py index 81f6e2ac5..135fb7e07 100644 --- a/pilot/copytool/common.py +++ b/pilot/copytool/common.py @@ -18,18 +18,19 @@ logger = logging.getLogger(__name__) -def get_timeout(filesize): +def get_timeout(filesize, add=0): """ Get a proper time-out limit based on the file size. :param filesize: file size (int). - :return: + :param add: optional additional time to be added [s] (int) + :return: time-out in seconds (int). """ timeout_max = 3 * 3600 # 3 hours timeout_min = 300 # self.timeout - timeout = timeout_min + int(filesize / 0.5e6) # approx < 0.5 Mb/sec + timeout = timeout_min + int(filesize / 0.5e6) + add # approx < 0.5 Mb/sec return min(timeout, timeout_max) @@ -57,13 +58,15 @@ def verify_catalog_checksum(fspec, path): state = 'UNKNOWN_CHECKSUM_TYPE' else: checksum_local = calculate_checksum(path, algorithm=checksum_type) + if checksum_type == 'ad32': + checksum_type = 'adler32' logger.info('checksum (catalog): %s (type: %s)' % (checksum_catalog, checksum_type)) logger.info('checksum (local): %s' % checksum_local) if checksum_local and checksum_local != '' and checksum_local != checksum_catalog: - diagnostics = 'checksum verification failed: checksum (catalog)=%s != checksum (local)=%s' % \ - (checksum_catalog, checksum_local) + diagnostics = 'checksum verification failed for LFN=%s: checksum (catalog)=%s != checksum (local)=%s' % \ + (fspec.lfn, checksum_catalog, checksum_local) logger.warning(diagnostics) - fspec.status_code = ErrorCodes.GETADMISMATCH if checksum_type == 'ad32' else ErrorCodes.GETMD5MISMATCH + fspec.status_code = ErrorCodes.GETADMISMATCH if checksum_type == 'adler32' else ErrorCodes.GETMD5MISMATCH fspec.status = 'failed' state = 'AD_MISMATCH' if checksum_type == 'ad32' else 'MD_MISMATCH' else: @@ -143,7 +146,7 @@ def output_line_scan(ret, output): """ for line in output.split('\n'): - m = re.search("Details\s*:\s*(?P.*)", line) + m = re.search("[Dd]etails\s*:\s*(?P.*)", line) if m: ret['error'] = m.group('error') elif 'service_unavailable' in line: @@ -197,8 +200,8 @@ def resolve_common_transfer_errors(output, is_stagein=True): ret = get_error_info(ErrorCodes.SERVICENOTAVAILABLE, 'SERVICE_ERROR', output) elif "Network is unreachable" in output: ret = get_error_info(ErrorCodes.UNREACHABLENETWORK, 'NETWORK_UNREACHABLE', output) - else: - # reg exp the output - ret = output_line_scan(ret, output) + + # reg exp the output to get real error message + ret = output_line_scan(ret, output) return ret diff --git a/pilot/copytool/gfal.py b/pilot/copytool/gfal.py index 8c213e596..0b95f4be6 100644 --- a/pilot/copytool/gfal.py +++ b/pilot/copytool/gfal.py @@ -24,7 +24,7 @@ require_replicas = True ## indicate if given copytool requires input replicas to be resolved -allowed_schemas = ['srm', 'gsiftp', 'https', 'davs'] # prioritized list of supported schemas for transfers by given copytool +allowed_schemas = ['srm', 'gsiftp', 'https', 'davs', 'root'] # prioritized list of supported schemas for transfers by given copytool def is_valid_for_copy_in(files): @@ -57,7 +57,7 @@ def copy_in(files, **kwargs): if not check_for_gfal(): raise StageInFailure("No GFAL2 tools found") - localsite = os.environ.get('DQ2_LOCAL_SITE_ID', None) + localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None)) for fspec in files: # update the trace report localsite = localsite if localsite else fspec.ddmendpoint diff --git a/pilot/copytool/lsm.py b/pilot/copytool/lsm.py index 22cdecc87..73b11fb28 100644 --- a/pilot/copytool/lsm.py +++ b/pilot/copytool/lsm.py @@ -75,7 +75,7 @@ def copy_in(files, **kwargs): copysetup = get_copysetup(copytools, 'lsm') trace_report = kwargs.get('trace_report') allow_direct_access = kwargs.get('allow_direct_access') - localsite = os.environ.get('DQ2_LOCAL_SITE_ID', None) + localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None)) for fspec in files: # update the trace report @@ -315,7 +315,7 @@ def move(source, destination, dst_in=True, copysetup="", options=None): cmd += "lsm-put %s" % args try: - exit_code, stdout, stderr = execute(cmd) #, timeout=get_timeout(fspec.filesize)) + exit_code, stdout, stderr = execute(cmd, usecontainer=False, copytool=True) #, timeout=get_timeout(fspec.filesize)) except Exception as e: if dst_in: exit_code = ErrorCodes.STAGEINFAILED diff --git a/pilot/copytool/objectstore.py b/pilot/copytool/objectstore.py index d3e95e68b..a42c9575e 100644 --- a/pilot/copytool/objectstore.py +++ b/pilot/copytool/objectstore.py @@ -6,6 +6,7 @@ # # Authors: # - Wen Guan, wen.guan@cern.ch, 2018 +# - Alexey Anisenkov, anisyonk@cern.ch, 2019 import os import json @@ -13,7 +14,7 @@ from .common import resolve_common_transfer_errors from pilot.common.exception import PilotException, ErrorCodes -from pilot.info.storageactivitymaps import get_ddm_activity +#from pilot.info.storageactivitymaps import get_ddm_activity from pilot.util.container import execute from pilot.util.ruciopath import get_rucio_path @@ -21,6 +22,8 @@ # can be disable for Rucio if allowed to use all RSE for input require_replicas = False ## indicates if given copytool requires input replicas to be resolved + +require_input_protocols = True ## indicates if given copytool requires input protocols and manual generation of input replicas require_protocols = True ## indicates if given copytool requires protocols to be resolved first for stage-out allowed_schemas = ['srm', 'gsiftp', 'https', 'davs', 'root', 's3', 's3+rucio'] @@ -52,39 +55,39 @@ def resolve_surl(fspec, protocol, ddmconf, **kwargs): surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), get_rucio_path(fspec.scope, fspec.lfn)) elif ddm.type in ['OS_ES', 'OS_LOGS']: surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), fspec.lfn) - fspec.protocol_id = protocol.get('id', None) + fspec.protocol_id = protocol.get('id') else: raise PilotException('resolve_surl(): Failed to construct SURL for non deterministic ddm=%s: NOT IMPLEMENTED', fspec.ddmendpoint) return {'surl': surl} - -def resolve_protocol(fspec, activity, ddm): - """ - Rosolve protocols to be used to transfer the file with corressponding activity - - :param fspec: file spec data - :param activity: actvitiy name as string - :param ddm: ddm storage data - :return: protocol as dictionary - """ - - logger.info("Resolving protocol for file(lfn: %s, ddmendpoint: %s) with activity(%s)" % (fspec.lfn, fspec.ddmendpoint, activity)) - - activity = get_ddm_activity(activity) - protocols = ddm.arprotocols.get(activity) - protocols_allow = [] - for schema in allowed_schemas: - for protocol in protocols: - if schema is None or protocol.get('endpoint', '').startswith("%s://" % schema): - protocols_allow.append(protocol) - if not protocols_allow: - err = "No available allowed protocols for file(lfn: %s, ddmendpoint: %s) with activity(%s)" % (fspec.lfn, fspec.ddmendpoint, activity) - logger.error(err) - raise PilotException(err) - protocol = protocols_allow[0] - logger.info("Resolved protocol for file(lfn: %s, ddmendpoint: %s) with activity(%s): %s" % (fspec.lfn, fspec.ddmendpoint, activity, protocol)) - return protocol +## redundant logic, can be removed (anisyonk) +#def resolve_protocol(fspec, activity, ddm): +# """ +# Rosolve protocols to be used to transfer the file with corressponding activity +# +# :param fspec: file spec data +# :param activity: actvitiy name as string +# :param ddm: ddm storage data +# :return: protocol as dictionary +# """ +# +# logger.info("Resolving protocol for file(lfn: %s, ddmendpoint: %s) with activity(%s)" % (fspec.lfn, fspec.ddmendpoint, activity)) +# +# activity = get_ddm_activity(activity) +# protocols = ddm.arprotocols.get(activity) +# protocols_allow = [] +# for schema in allowed_schemas: +# for protocol in protocols: +# if schema is None or protocol.get('endpoint', '').startswith("%s://" % schema): +# protocols_allow.append(protocol) +# if not protocols_allow: +# err = "No available allowed protocols for file(lfn: %s, ddmendpoint: %s) with activity(%s)" % (fspec.lfn, fspec.ddmendpoint, activity) +# logger.error(err) +# raise PilotException(err) +# protocol = protocols_allow[0] +# logger.info("Resolved protocol for file(lfn: %s, ddmendpoint: %s) with activity(%s): %s" % (fspec.lfn, fspec.ddmendpoint, activity, protocol)) +# return protocol def copy_in(files, **kwargs): @@ -99,31 +102,43 @@ def copy_in(files, **kwargs): os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]' ddmconf = kwargs.pop('ddmconf', {}) - activity = kwargs.pop('activity', None) + #activity = kwargs.pop('activity', None) # trace_report = kwargs.get('trace_report') for fspec in files: cmd = [] logger.info("To transfer file: %s" % fspec) - ddm = ddmconf.get(fspec.ddmendpoint) - if ddm: - protocol = resolve_protocol(fspec, activity, ddm) - surls = resolve_surl(fspec, protocol, ddmconf) - if 'surl' in surls: - fspec.surl = surls['surl'] - ddm_special_setup = ddm.get_special_setup(protocol.get('id', None)) - if ddm_special_setup: - cmd += [ddm_special_setup] + if fspec.protocol_id: + ddm = ddmconf.get(fspec.ddmendpoint) + if ddm: + ddm_special_setup = ddm.get_special_setup(fspec.protocol_id) + if ddm_special_setup: + cmd = [ddm_special_setup] + + # redundant logic: to be cleaned (anisyonk) + #ddm = ddmconf.get(fspec.ddmendpoint) + #if ddm: + # protocol = resolve_protocol(fspec, activity, ddm) + # surls = resolve_surl(fspec, protocol, ddmconf) + # if 'surl' in surls: + # fspec.surl = surls['surl'] + # ddm_special_setup = ddm.get_special_setup(fspec.protocol_id) + # if ddm_special_setup: + # cmd += [ddm_special_setup] dst = fspec.workdir or kwargs.get('workdir') or '.' cmd += ['/usr/bin/env', 'rucio', '-v', 'download', '--no-subdir', '--dir', dst] if require_replicas: cmd += ['--rse', fspec.replicas[0][0]] - if fspec.surl: + + # a copytool module should consider fspec.turl for transfers, and could failback to fspec.surl, + # but normally fspec.turl (transfer url) is mandatory and already populated by the top workflow + turl = fspec.turl or fspec.surl + if turl: if fspec.ddmendpoint: cmd.extend(['--rse', fspec.ddmendpoint]) - cmd.extend(['--pfn', fspec.surl]) + cmd.extend(['--pfn', turl]) cmd += ['%s:%s' % (fspec.scope, fspec.lfn)] rcode, stdout, stderr = execute(" ".join(cmd), **kwargs) diff --git a/pilot/copytool/rucio.py b/pilot/copytool/rucio.py index 3cba9f14a..3feda3f96 100644 --- a/pilot/copytool/rucio.py +++ b/pilot/copytool/rucio.py @@ -9,6 +9,7 @@ # - Alexey Anisenkov, anisyonk@cern.ch, 2018 # - Paul Nilsson, paul.nilsson@cern.ch, 2018 # - Tomas Javurek, tomas.javurek@cern.ch, 2019 +# - David Cameron, david.cameron@cern.ch, 2019 from __future__ import absolute_import @@ -18,7 +19,8 @@ from time import time from .common import resolve_common_transfer_errors, verify_catalog_checksum, get_timeout -from pilot.common.exception import PilotException, ErrorCodes +from pilot.common.exception import PilotException, StageOutFailure, ErrorCodes +#from pilot.util.timer import timeout logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -37,7 +39,19 @@ def is_valid_for_copy_out(files): return True ## FIX ME LATER -# @timeout(seconds=600) +def verify_stage_out(fspec): + """ + Checks that the uploaded file is physically at the destination. + :param fspec: file specifications + """ + from rucio.rse import rsemanager as rsemgr + rse_settings = rsemgr.get_rse_info(fspec.ddmendpoint) + uploaded_file = {'name': fspec.lfn, 'scope': fspec.scope} + logger.info('Checking file: %s' % str(fspec.lfn)) + return rsemgr.exists(rse_settings, [uploaded_file]) + + +#@timeout(seconds=10800) def copy_in(files, **kwargs): """ Download given files using rucio copytool. @@ -54,7 +68,7 @@ def copy_in(files, **kwargs): # don't spoil the output, we depend on stderr parsing os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]' - localsite = os.environ.get('DQ2_LOCAL_SITE_ID', None) + localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None)) for fspec in files: logger.info('rucio copytool, downloading file with scope:%s lfn:%s' % (str(fspec.scope), str(fspec.lfn))) # update the trace report @@ -76,27 +90,27 @@ def copy_in(files, **kwargs): dst = fspec.workdir or kwargs.get('workdir') or '.' logger.info('the file will be stored in %s' % str(dst)) - error_msg = None - rucio_state = None + trace_report_out = [] try: - rucio_state = _stage_in_api(dst, fspec, trace_report) + #transfer_timeout = get_timeout(fspec.filesize, add=10) # give the API a chance to do the time-out first + #timeout(transfer_timeout)(_stage_in_api)(dst, fspec, trace_report, trace_report_out) + _stage_in_api(dst, fspec, trace_report, trace_report_out) except Exception as error: error_msg = str(error) + # Try to get a better error message from the traces + if trace_report_out and trace_report_out[0].get('stateReason'): + error_msg = trace_report_out[0].get('stateReason') + logger.info('rucio returned an error: %s' % error_msg) - if error_msg: - logger.info('stderr = %s' % error_msg) - else: - logger.info('rucio client state: %s' % str(rucio_state)) - - if error_msg: ## error occurred - error = resolve_common_transfer_errors(error_msg, is_stagein=True) + error_details = resolve_common_transfer_errors(error_msg, is_stagein=True) fspec.status = 'failed' - fspec.status_code = error.get('rcode') - trace_report.update(clientState=rucio_state or 'STAGEIN_ATTEMPT_FAILED', - stateReason=error_msg, timeEnd=time()) + fspec.status_code = error_details.get('rcode') + trace_report.update(clientState=error_details.get('state', 'STAGEIN_ATTEMPT_FAILED'), + stateReason=error_details.get('error'), timeEnd=time()) if not ignore_errors: trace_report.send() - raise PilotException(error_msg, code=error.get('rcode'), state='FAILED') + msg = ' %s:%s, %s' % (fspec.scope, fspec.lfn, error_details.get('error')) + raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state')) # verify checksum; compare local checksum with catalog value (fspec.checksum), use same checksum type destination = os.path.join(dst, fspec.lfn) @@ -120,7 +134,7 @@ def copy_in(files, **kwargs): return files -# @timeout(seconds=600) +#@timeout(seconds=10800) def copy_out(files, **kwargs): """ Upload given files using rucio copytool. @@ -137,10 +151,14 @@ def copy_out(files, **kwargs): ignore_errors = kwargs.pop('ignore_errors', False) trace_report = kwargs.get('trace_report') + localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None)) for fspec in files: logger.info('rucio copytool, uploading file with scope: %s and lfn: %s' % (str(fspec.scope), str(fspec.lfn))) + localsite = localsite if localsite else fspec.ddmendpoint + trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint) trace_report.update(scope=fspec.scope, dataset=fspec.dataset, url=fspec.surl, filesize=fspec.filesize) trace_report.update(catStart=time(), filename=fspec.lfn, guid=fspec.guid.replace('-', '')) + fspec.status_code = 0 summary_file_path = None cwd = fspec.workdir or kwargs.get('workdir') or '.' @@ -148,30 +166,27 @@ def copy_out(files, **kwargs): summary_file_path = os.path.join(cwd, 'rucio_upload.json') logger.info('the file will be uploaded to %s' % str(fspec.ddmendpoint)) - rucio_state = None - error_msg = None + trace_report_out = [] try: - rucio_state = _stage_out_api(fspec, summary_file_path, trace_report) + #transfer_timeout = get_timeout(fspec.filesize, add=10) # give the API a chance to do the time-out first + #timeout(transfer_timeout)(_stage_out_api)(fspec, summary_file_path, trace_report, trace_report_out) + _stage_out_api(fspec, summary_file_path, trace_report, trace_report_out) except Exception as error: error_msg = str(error) + # Try to get a better error message from the traces + if trace_report_out and trace_report_out[0].get('stateReason'): + error_msg = trace_report_out[0].get('stateReason') + logger.info('rucio returned an error: %s' % error_msg) - if error_msg: - logger.info('stderr = %s' % error_msg) - else: - logger.info('rucio client state: %s' % str(rucio_state)) - - fspec.status_code = 0 - - if error_msg: ## error occurred - error = resolve_common_transfer_errors(error_msg, is_stagein=False) + error_details = resolve_common_transfer_errors(error_msg, is_stagein=False) fspec.status = 'failed' - fspec.status_code = error.get('rcode') - trace_report.update(clientState=error.get('state', None) or 'STAGEOUT_ATTEMPT_FAILED', - stateReason=error.get('error', 'unknown error'), - timeEnd=time()) + fspec.status_code = error_details.get('rcode') + trace_report.update(clientState=error_details.get('state', 'STAGEOUT_ATTEMPT_FAILED'), + stateReason=error_details.get('error'), timeEnd=time()) if not ignore_errors: trace_report.send() - raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state')) + msg = ' %s:%s, %s' % (fspec.scope, fspec.lfn, error_details.get('error')) + raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state')) if summary: # resolve final pfn (turl) from the summary JSON if not os.path.exists(summary_file_path): @@ -201,7 +216,7 @@ def copy_out(files, **kwargs): logger.info('local checksum (%s) = remote checksum (%s)' % (local_checksum, adler32)) else: logger.warning('checksum could not be verified: local checksum (%s), remote checksum (%s)' % - str(local_checksum), str(adler32)) + (str(local_checksum), str(adler32))) if not fspec.status_code: fspec.status_code = 0 fspec.status = 'transferred' @@ -213,7 +228,7 @@ def copy_out(files, **kwargs): # stageIn using rucio api. -def _stage_in_api(dst, fspec, trace_report): +def _stage_in_api(dst, fspec, trace_report, trace_report_out): # init. download client from rucio.client.downloadclient import DownloadClient @@ -242,20 +257,21 @@ def _stage_in_api(dst, fspec, trace_report): trace_pattern = {} if trace_report: trace_pattern = trace_report - result = [] + + # download client raises an exception if any file failed if fspec.turl: + # restore the following line when it is supported on the rucio client + #result = download_client.download_pfns([f], 1, trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out) result = download_client.download_pfns([f], 1, trace_custom_fields=trace_pattern) else: + # restore the following line when it is supported on the rucio client + #result = download_client.download_dids([f], trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out) result = download_client.download_dids([f], trace_custom_fields=trace_pattern) - client_state = 'FAILED' - if result: - client_state = result[0].get('clientState', 'FAILED') - - return client_state + logger.debug('Rucio download client returned %s' % result) -def _stage_out_api(fspec, summary_file_path, trace_report): +def _stage_out_api(fspec, summary_file_path, trace_report, trace_report_out): # init. download client from rucio.client.uploadclient import UploadClient @@ -288,14 +304,22 @@ def _stage_out_api(fspec, summary_file_path, trace_report): # process with the upload logger.info('_stage_out_api: %s' % str(f)) result = None + + # upload client raises an exception if any file failed try: - result = upload_client.upload([f], summary_file_path) + # TODO: Add traces_copy_out=trace_report_out when supported in rucio + result = upload_client.upload([f], summary_file_path=summary_file_path) except UnboundLocalError: logger.warning('rucio still needs a bug fix of the summary in the uploadclient') - result = 0 - client_state = 'FAILED' - if result == 0: - client_state = 'DONE' + logger.debug('Rucio upload client returned %s' % result) - return client_state + try: + file_exists = verify_stage_out(fspec) + logger.info('File exists at the storage: %s' % str(file_exists)) + if not file_exists: + raise StageOutFailure('stageOut: Physical check after upload failed.') + except Exception as e: + msg = 'stageOut: File existence verification failed with: %s' % str(e) + logger.info(msg) + raise StageOutFailure(msg) diff --git a/pilot/copytool/xrdcp.py b/pilot/copytool/xrdcp.py index a5ba2df5d..129f36e73 100644 --- a/pilot/copytool/xrdcp.py +++ b/pilot/copytool/xrdcp.py @@ -138,7 +138,7 @@ def copy_in(files, **kwargs): coption = _resolve_checksum_option(setup, **kwargs) trace_report = kwargs.get('trace_report') - localsite = os.environ.get('DQ2_LOCAL_SITE_ID', None) + localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None)) for fspec in files: # update the trace report localsite = localsite if localsite else fspec.ddmendpoint diff --git a/pilot/eventservice/workexecutor/plugins/genericexecutor.py b/pilot/eventservice/workexecutor/plugins/genericexecutor.py index 5de13cbde..0cf347113 100644 --- a/pilot/eventservice/workexecutor/plugins/genericexecutor.py +++ b/pilot/eventservice/workexecutor/plugins/genericexecutor.py @@ -6,6 +6,7 @@ # # Authors: # - Wen Guan, wen.guan@cern.ch, 2018 +# - Alexey Anisenkov, anisyonk@cern.ch, 2019 import json import os @@ -13,12 +14,14 @@ import traceback from pilot.api.es_data import StageOutESClient -from pilot.common import exception +from pilot.common.exception import PilotException, StageOutFailure, ErrorCodes + from pilot.eventservice.esprocess.esprocess import ESProcess from pilot.info.filespec import FileSpec from pilot.info import infosys from pilot.util.auxiliary import get_logger from pilot.util.container import execute + from .baseexecutor import BaseExecutor import logging @@ -165,33 +168,76 @@ def stageout_es_real(self, output_file): :param output_file: output file name. """ + job = self.get_job() log = get_logger(job.jobid, logger) log.info('prepare to stage-out eventservice files') error = None + file_data = {'scope': 'transient', + 'lfn': os.path.basename(output_file), + } + file_spec = FileSpec(filetype='output', **file_data) + xdata = [file_spec] + kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job) + + try_failover = False + activity = ['es_events', 'pw'] ## FIX ME LATER: replace `pw` with `write_lan` once AGIS is updated (acopytools) + try: - file_data = {'scope': 'transient', - 'lfn': os.path.basename(output_file), - } - file_spec = FileSpec(filetype='output', **file_data) - xdata = [file_spec] client = StageOutESClient(job.infosys, logger=log) - kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job) - client.transfer(xdata, activity=['es_events', 'pw'], **kwargs) - except exception.PilotException, error: + try_failover = True + + client.prepare_destinations(xdata, activity) ## IF ES job should be allowed to write only at `es_events` astorages, then fix activity names here + client.transfer(xdata, activity=activity, **kwargs) + except PilotException as error: log.error(error.get_detail()) - except Exception, e: - import traceback + except Exception as e: log.error(traceback.format_exc()) - error = exception.StageOutFailure("stageOut failed with error=%s" % e) + error = StageOutFailure("stageOut failed with error=%s" % e) log.info('Summary of transferred files:') log.info(" -- lfn=%s, status_code=%s, status=%s" % (file_spec.lfn, file_spec.status_code, file_spec.status)) - if error or file_spec.status != 'transferred': + if error: log.error('Failed to stage-out eventservice file(%s): error=%s' % (output_file, error.get_detail())) + elif file_spec.status != 'transferred': + msg = 'Failed to stage-out ES file(%s): logic corrupted: unknown internal error, fspec=%s' % (output_file, file_spec) + log.error(msg) + raise StageOutFailure(msg) + + failover_storage_activity = ['es_failover', 'pw'] + + if try_failover and error and error.get_error_code() not in [ErrorCodes.MISSINGOUTPUTFILE]: ## try to failover to other storage + + xdata2 = [FileSpec(filetype='output', **file_data)] + + try: + client.prepare_destinations(xdata2, failover_storage_activity) + if xdata2[0].ddmendpoint != xdata[0].ddmendpoint: ## skip transfer to same output storage + msg = 'Will try to failover ES transfer to astorage with activity=%s, rse=%s' % (failover_storage_activity, xdata2[0].ddmendpoint) + log.info(msg) + client.transfer(xdata2, activity=activity, **kwargs) + + log.info('Summary of transferred files (failover transfer):') + log.info(" -- lfn=%s, status_code=%s, status=%s" % (xdata2[0].lfn, xdata2[0].status_code, xdata2[0].status)) + + except PilotException as e: + if e.get_error_code() == ErrorCodes.NOSTORAGE: + log.info('Failover ES storage is not defined for activity=%s .. skipped' % failover_storage_activity) + else: + log.error('Transfer to failover storage=%s failed .. skipped, error=%s' % (xdata2[0].ddmendpoint, e.get_detail())) + except Exception as e: + log.error('Failover ES stageout failed .. skipped') + log.error(traceback.format_exc()) + + if xdata2[0].status == 'transferred': + error = None + file_spec = xdata2[0] + + if error: raise error + storage_id = infosys.get_storage_id(file_spec.ddmendpoint) return file_spec.ddmendpoint, storage_id, file_spec.filesize, file_spec.checksum diff --git a/pilot/info/filespec.py b/pilot/info/filespec.py index e6ddfd424..0718af09e 100644 --- a/pilot/info/filespec.py +++ b/pilot/info/filespec.py @@ -65,12 +65,13 @@ class FileSpec(BaseData): workdir = None # used to declare file-specific work dir (location of given local file when it's used for transfer by copytool) protocol_id = None # id of the protocol to be used to construct turl is_tar = False # whether it's a tar file or not + ddm_activity = None # DDM activity names (e.g. [read_lan, read_wan]) which should be used to resolve appropriate protocols from StorageData.arprotocols # specify the type of attributes for proper data validation and casting _keys = {int: ['filesize', 'mtime', 'status_code'], str: ['lfn', 'guid', 'checksum', 'scope', 'dataset', 'ddmendpoint', 'filetype', 'surl', 'turl', 'status', 'workdir', 'accessmode', 'allowremoteinputs', 'storage_token'], - list: ['replicas', 'inputddms'], + list: ['replicas', 'inputddms', 'ddm_activity'], bool: [] } diff --git a/pilot/info/jobinfo.py b/pilot/info/jobinfo.py index 2f51767bd..cd05f2af5 100644 --- a/pilot/info/jobinfo.py +++ b/pilot/info/jobinfo.py @@ -83,6 +83,7 @@ def resolve_storage_data(self, ddmendpoints=[], **kwargs): master_data = self.job.overwrite_storagedata or {} data.update((k, v) for k, v in master_data.iteritems() if k in set(ddmendpoints or master_data) & set(master_data)) - logger.info('storagedata: following data extracted from Job definition will be used: %s' % data) + if data: + logger.info('storagedata: following data extracted from Job definition will be used: %s' % data) return data diff --git a/pilot/info/queuedata.py b/pilot/info/queuedata.py index 41717d07a..30140141b 100644 --- a/pilot/info/queuedata.py +++ b/pilot/info/queuedata.py @@ -129,7 +129,7 @@ def load(self, data): def resolve_allowed_schemas(self, activity, copytool=None): """ Resolve list of allowed schemas for given activity and requested copytool based on `acopytools_schemas` settings - :param activity: ordered list of activity names to look up data + :param activity: str or ordered list of transfer activity names to resolve acopytools related data :return: list of protocol schemes """ diff --git a/pilot/info/storageactivitymaps.py b/pilot/info/storageactivitymaps.py index 8846645dd..8f0ebe8a1 100644 --- a/pilot/info/storageactivitymaps.py +++ b/pilot/info/storageactivitymaps.py @@ -6,6 +6,7 @@ # Authors: # - Wen Guan, wen.guan@cern.ch, 2018 +## THIS FILE IS DEPRECATED AND CAN BE REMOVED LATER (anisyonk) import logging logger = logging.getLogger(__name__) diff --git a/pilot/test/test_esstager.py b/pilot/test/test_esstager.py index 830bfbbfb..d8a189832 100644 --- a/pilot/test/test_esstager.py +++ b/pilot/test/test_esstager.py @@ -73,7 +73,8 @@ def test_stageout_es_events(self): workdir = os.path.dirname(output_file) client = StageOutESClient(infoservice) kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False) - client.transfer(xdata, activity=['es_events'], **kwargs) + client.prepare_destinations(xdata, activity='es_events') + client.transfer(xdata, activity='es_events', **kwargs) except exception.PilotException, error: logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc())) except Exception, e: @@ -115,6 +116,7 @@ def test_stageout_es_events_pw(self): workdir = os.path.dirname(output_file) client = StageOutESClient(infoservice) kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False) + client.prepare_destinations(xdata, activity=['es_events', 'pw']) # allow to write to `es_events` and `pw` astorages client.transfer(xdata, activity=['es_events', 'pw'], **kwargs) except exception.PilotException, error: logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc())) @@ -157,6 +159,7 @@ def test_stageout_es_events_non_exist_pw(self): workdir = os.path.dirname(output_file) client = StageOutESClient(infoservice) kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False) + client.prepare_destinations(xdata, activity=['es_events_non_exist', 'pw']) # allow to write to `es_events_non_exist` and `pw` astorages client.transfer(xdata, activity=['es_events_non_exist', 'pw'], **kwargs) except exception.PilotException, error: logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc())) @@ -199,6 +202,7 @@ def test_stageout_stagein(self): workdir = os.path.dirname(output_file) client = StageOutESClient(infoservice) kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False) + client.prepare_destinations(xdata, activity=['es_events', 'pw']) # allow to write to `es_events` and `pw` astorages client.transfer(xdata, activity=['es_events', 'pw'], **kwargs) except exception.PilotException, error: logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc())) @@ -227,6 +231,7 @@ def test_stageout_stagein(self): workdir = os.path.dirname(output_file) client = StageInESClient(infoservice) kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False) + client.prepare_sources(xdata) client.transfer(xdata, activity=['es_events_read'], **kwargs) except exception.PilotException, error: logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc())) @@ -269,6 +274,7 @@ def test_stageout_noexist_activity_stagein(self): workdir = os.path.dirname(output_file) client = StageOutESClient(infoservice) kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False) + client.prepare_destinations(xdata, activity=['es_events_no_exist', 'pw']) # allow to write to `es_events_no_exist` and `pw` astorages client.transfer(xdata, activity=['es_events_no_exist', 'pw'], **kwargs) except exception.PilotException, error: logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc())) @@ -297,6 +303,7 @@ def test_stageout_noexist_activity_stagein(self): workdir = os.path.dirname(output_file) client = StageInESClient(infoservice) kwargs = dict(workdir=workdir, cwd=workdir, usecontainer=False) + client.prepare_sources(xdata) client.transfer(xdata, activity=['es_events_read'], **kwargs) except exception.PilotException, error: logger.error("Pilot Exeception: %s, %s" % (error.get_detail(), traceback.format_exc())) diff --git a/pilot/test/test_esworkexecutor.py b/pilot/test/test_esworkexecutor.py index 8c81c9889..7417ed19c 100644 --- a/pilot/test/test_esworkexecutor.py +++ b/pilot/test/test_esworkexecutor.py @@ -90,6 +90,7 @@ def setUpClass(cls): # download input files client = StageInESClient(job.infosys, logger=logger) kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job) + client.prepare_sources(job.indata) client.transfer(job.indata, activity='pr', **kwargs) # get the payload command from the user specific code diff --git a/pilot/test/test_utils.py b/pilot/test/test_utils.py index ee2e88ec1..5b3ce1472 100644 --- a/pilot/test/test_utils.py +++ b/pilot/test/test_utils.py @@ -25,7 +25,7 @@ def setUp(self): self.mac = True from pilot.info import infosys - infosys.init("AGLT2_TEST-condor") + infosys.init("AGLT2_TEST") def test_collect_workernode_info(self): """ diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index 567636dfc..8c336fd8f 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -293,31 +293,45 @@ def get_generic_payload_command(cmd, job, prepareasetup, userjob): def add_athena_proc_number(cmd): """ - Add the ATHENA_PROC_NUMBER to the payload command if necessary + Add the ATHENA_PROC_NUMBER and ATHENA_CORE_NUMBER to the payload command if necessary. :param cmd: payload execution command (string). :return: updated payload execution command (string). """ + # get the values if they exist + try: + value1 = int(os.environ['ATHENA_PROC_NUMBER_JOB']) + except Exception as e: + logger.warning('failed to convert ATHENA_PROC_NUMBER_JOB to int: %s' % e) + value1 = None + try: + value2 = int(os.environ['ATHENA_CORE_NUMBER']) + except Exception as e: + logger.warning('failed to convert ATHENA_CORE_NUMBER to int: %s' % e) + value2 = None + if "ATHENA_PROC_NUMBER" not in cmd: if "ATHENA_PROC_NUMBER" in os.environ: cmd = 'export ATHENA_PROC_NUMBER=%s;' % os.environ['ATHENA_PROC_NUMBER'] + cmd - elif "ATHENA_PROC_NUMBER_JOB" in os.environ: - try: - value = int(os.environ['ATHENA_PROC_NUMBER_JOB']) - except Exception: - logger.warning("failed to convert ATHENA_PROC_NUMBER_JOB=%s to int" % - os.environ['ATHENA_PROC_NUMBER_JOB']) + elif "ATHENA_PROC_NUMBER_JOB" in os.environ and value1: + if value1 > 1: + cmd = 'export ATHENA_PROC_NUMBER=%d;' % value1 + cmd else: - if value > 1: - cmd = 'export ATHENA_PROC_NUMBER=%d;' % value + cmd - else: - logger.info("will not add ATHENA_PROC_NUMBER to cmd since the value is %d" % value) + logger.info("will not add ATHENA_PROC_NUMBER to cmd since the value is %s" % str(value1)) else: logger.warning("don't know how to set ATHENA_PROC_NUMBER (could not find it in os.environ)") else: logger.info("ATHENA_PROC_NUMBER already in job command") + if 'ATHENA_CORE_NUMBER' in os.environ and value2: + if value2 > 1: + cmd = 'export ATHENA_CORE_NUMBER=%d;' % value2 + cmd + else: + logger.info("will not add ATHENA_CORE_NUMBER to cmd since the value is %s" % str(value2)) + else: + logger.warning('there is no ATHENA_CORE_NUMBER in os.environ (cannot add it to payload command)') + return cmd @@ -1597,9 +1611,12 @@ def verify_ncores(corecount): # ATHENA_PROC_NUMBER_JOB will always be the value from the job definition) if athena_proc_number: logger.info("encountered a set ATHENA_PROC_NUMBER (%d), will not overwrite it" % athena_proc_number) + logger.info('set ATHENA_CORE_NUMBER to same value as ATHENA_PROC_NUMBER') + os.environ['ATHENA_CORE_NUMBER'] = "%s" % athena_proc_number else: os.environ['ATHENA_PROC_NUMBER_JOB'] = "%s" % corecount - logger.info("set ATHENA_PROC_NUMBER_JOB to %s (ATHENA_PROC_NUMBER will not be overwritten)" % corecount) + os.environ['ATHENA_CORE_NUMBER'] = "%s" % corecount + logger.info("set ATHENA_PROC_NUMBER_JOB and ATHENA_CORE_NUMBER to %s (ATHENA_PROC_NUMBER will not be overwritten)" % corecount) def verify_job(job): diff --git a/pilot/user/atlas/container.py b/pilot/user/atlas/container.py index dead6c354..c79be4df0 100644 --- a/pilot/user/atlas/container.py +++ b/pilot/user/atlas/container.py @@ -14,7 +14,7 @@ from pilot.user.atlas.setup import get_asetup from pilot.user.atlas.setup import get_file_system_root_path -from pilot.info import infosys +from pilot.info import InfoService, infosys from pilot.util.auxiliary import get_logger from pilot.util.config import config from pilot.util.filehandling import write_file @@ -32,13 +32,11 @@ def do_use_container(**kwargs): :return: True if function has decided that a container should be used, False otherwise (boolean). """ - use_container = False - - # return use_container - # to force no container use: return False + use_container = False - job = kwargs.get('job') + job = kwargs.get('job', False) + copytool = kwargs.get('copytool', False) if job: # for user jobs, TRF option --containerImage must have been used, ie imagename must be set if job.is_analysis() and job.imagename: @@ -48,6 +46,9 @@ def do_use_container(**kwargs): container_name = queuedata.container_type.get("pilot") if container_name == 'singularity': use_container = True + elif copytool: + # override for copytools - use a container for stage-in/out + use_container = True return use_container @@ -64,7 +65,7 @@ def wrapper(executable, **kwargs): workdir = kwargs.get('workdir', '.') pilot_home = os.environ.get('PILOT_HOME', '') - job = kwargs.get('job') + job = kwargs.get('job', None) logger.info('container wrapper called') @@ -72,11 +73,11 @@ def wrapper(executable, **kwargs): workdir = pilot_home # if job.imagename (from --containerimage ) is set, then always use raw singularity - if config.Container.setup_type == "ALRB" and not job.imagename: + if config.Container.setup_type == "ALRB" and job and not job.imagename: fctn = alrb_wrapper else: fctn = singularity_wrapper - return fctn(executable, workdir, job) + return fctn(executable, workdir, job=job) # def use_payload_container(job): @@ -127,7 +128,7 @@ def get_grid_image_for_singularity(platform): """ Return the full path to the singularity grid image - :param platform (string): E.g. "x86_64-slc6" + :param platform: E.g. "x86_64-slc6" (string). :return: full path to grid image (string). """ @@ -181,7 +182,7 @@ def get_middleware_type(): return middleware_type -def alrb_wrapper(cmd, workdir, job): +def alrb_wrapper(cmd, workdir, job=None): """ Wrap the given command with the special ALRB setup for containers E.g. cmd = /bin/bash hello_world.sh @@ -196,6 +197,10 @@ def alrb_wrapper(cmd, workdir, job): :return: prepended command with singularity execution command (string). """ + if not job: + logger.warning('the ALRB wrapper did not get a job object - cannot proceed') + return cmd + log = get_logger(job.jobid) queuedata = job.infosys.queuedata @@ -310,27 +315,32 @@ def remove_container_string(job_params): return job_params, container_path -def singularity_wrapper(cmd, workdir, job): +def singularity_wrapper(cmd, workdir, job=None): """ Prepend the given command with the singularity execution command E.g. cmd = /bin/bash hello_world.sh -> singularity_command = singularity exec -B /bin/bash hello_world.sh singularity exec -B /cvmfs/atlas.cern.ch/repo/images/singularity/x86_64-slc6.img