Skip to content
This repository has been archived by the owner on Jan 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #219 from PanDAWMS/next
Browse files Browse the repository at this point in the history
2.2.0
  • Loading branch information
PalNilsson authored Oct 10, 2019
2 parents 0fab4cc + e9d7bb4 commit 092335b
Show file tree
Hide file tree
Showing 33 changed files with 587 additions and 294 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.25.11
2.2.0.25
2 changes: 1 addition & 1 deletion pilot/api/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def get_table(self, filename, header=None, separator="\t", convert_to_float=True

return get_table_from_file(filename, header=header, separator=separator, convert_to_float=convert_to_float)

def get_fitted_data(self, filename, x_name='Time', y_name='PSS+Swap', precision=2, tails=True):
def get_fitted_data(self, filename, x_name='Time', y_name='pss+swap', precision=2, tails=True):
"""
Return a properly formatted job metrics string with analytics data.
Currently the function returns a fit for PSS+Swap vs time, whose slope measures memory leaks.
Expand Down
273 changes: 166 additions & 107 deletions pilot/api/data.py

Large diffs are not rendered by default.

50 changes: 48 additions & 2 deletions pilot/api/es_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#
# Authors:
# - Wen Guan, wen.guan@cern,ch, 2018
# - Alexey Anisenkov, anisyonk@cern.ch, 2019

import traceback
import logging
Expand All @@ -17,6 +18,7 @@
logger = logging.getLogger(__name__)


## THIS CLASS CAN BE DEPREATED AND REMOVED (anisyonk)
class StagingESClient(StagingClient):
"""
Base ES Staging Client
Expand Down Expand Up @@ -125,7 +127,8 @@ def transfer(self, files, activity=['pw'], **kwargs): # noqa: C901
return result


class StageInESClient(StagingESClient, StageInClient):
## THIS CLASS CAN BE DEPREATED AND REMOVED (anisyonk)
class StageInESClientDeprecateME(StagingESClient, StageInClient):

def process_storage_id(self, files):
"""
Expand Down Expand Up @@ -157,7 +160,8 @@ def transfer_files(self, copytool, files, activity, ddmendpoint, **kwargs):
return super(StageInESClient, self).transfer_files(copytool, files, activity=activity, ddmendpoint=ddmendpoint, **kwargs)


class StageOutESClient(StagingESClient, StageOutClient):
## THIS CLASS CAN BE DEPREATED AND REMOVED (anisyonk)
class StageOutESClientDeprecateME(StagingESClient, StageOutClient):

def transfer_files(self, copytool, files, activity, ddmendpoint, **kwargs):
"""
Expand All @@ -179,3 +183,45 @@ def transfer_files(self, copytool, files, activity, ddmendpoint, **kwargs):
fspec.ddmendpoint = ddmendpoint

return super(StageOutESClient, self).transfer_files(copytool, files, activity, **kwargs)


class StageInESClient(StageInClient):

def __init__(self, *argc, **kwargs):
super(StageInESClient, self).__init__(*argc, **kwargs)

self.copytool_modules.setdefault('objectstore', {'module_name': 'objectstore'})
self.acopytools.setdefault('es_events_read', ['objectstore'])

def prepare_sources(self, files, activities=None):
"""
Customize/prepare source data for each entry in `files` optionally checking data for requested `activities`
(custom StageClient could extend this logic if need)
:param files: list of `FileSpec` objects to be processed
:param activities: string or ordered list of activities to resolve `astorages` (optional)
:return: None
If storage_id is specified, replace ddmendpoint by parsing storage_id
"""

if not self.infosys:
self.logger.warning('infosys instance is not initialized: skip calling prepare_sources()')
return

for fspec in files:
if fspec.storage_token: ## FIX ME LATER: no need to parse each time storage_id, all this staff should be applied in FileSpec clean method
storage_id, path_convention = fspec.get_storage_id_and_path_convention()
if path_convention and path_convention == 1000:
fspec.scope = 'transient'
if storage_id:
fspec.ddmendpoint = self.infosys.get_ddmendpoint(storage_id)
logger.info("Processed file with storage id: %s" % fspec)


class StageOutESClient(StageOutClient):

def __init__(self, *argc, **kwargs):
super(StageOutESClient, self).__init__(*argc, **kwargs)

self.copytool_modules.setdefault('objectstore', {'module_name': 'objectstore'})
self.acopytools.setdefault('es_events', ['objectstore'])
31 changes: 27 additions & 4 deletions pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class ErrorCodes:
UNRECOGNIZEDTRFARGUMENTS = 1349
EMPTYOUTPUTFILE = 1350
UNRECOGNIZEDTRFSTDERR = 1351
STATFILEPROBLEM = 1352

_error_messages = {
GENERALERROR: "General pilot error, consult batch log",
Expand Down Expand Up @@ -246,7 +247,8 @@ class ErrorCodes:
SINGULARITYRESOURCEUNAVAILABLE: "Singularity: Resource temporarily unavailable", # not the same as RESOURCEUNAVAILABLE
UNRECOGNIZEDTRFARGUMENTS: "Unrecognized transform arguments",
EMPTYOUTPUTFILE: "Empty output file detected",
UNRECOGNIZEDTRFSTDERR: "Unrecognized fatal error in transform stderr"
UNRECOGNIZEDTRFSTDERR: "Unrecognized fatal error in transform stderr",
STATFILEPROBLEM: "Failed to stat proc file for CPU consumption calculation"
}

put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
Expand Down Expand Up @@ -402,16 +404,37 @@ def format_diagnostics(self, code, diag):
:return: formatted error diagnostics (string).
"""

max_message_length = 256
try:
standard_message = self._error_messages[code] + ":"
except Exception:
standard_message = ""

# extract the relevant info for reporting exceptions
if "Traceback" in diag:
pattern = 'details:(.+)'
found = re.findall(pattern, diag)
if found:
diag = found[0]
diag = diag.replace("[PilotException(\'", '')
diag = diag.replace('[PilotException(\"', '')
diag = diag.replace(' ', ' ')

try:
if diag:
if len(diag) >= len(standard_message):
error_message = standard_message + diag[-(len(diag) - len(standard_message)):]
# ensure that the message to be displayed on the PanDA monitor is not longer than max_message_length
# if it is, then reformat it so that the standard message is always displayed first.
# e.g. "Failed to stage-in file:abcdefghijklmnopqrstuvwxyz0123456789"
if standard_message in diag:
if len(diag) > max_message_length:
error_message = standard_message + diag[-(max_message_length - len(standard_message)):]
else:
error_message = standard_message + diag[len(standard_message):][-max_message_length:]
else:
error_message = standard_message + diag[-(len(standard_message) - len(diag)):]
if len(diag) + len(standard_message) > max_message_length:
error_message = standard_message + diag[-(max_message_length - len(standard_message)):]
else:
error_message = standard_message + diag
else:
error_message = standard_message
except Exception:
Expand Down
5 changes: 5 additions & 0 deletions pilot/common/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ def get_detail(self):
def get_error_code(self):
return self._errorCode

def get_last_error(self):
if self.args:
return self.args[-1]
return self._message


class NotImplemented(PilotException):
"""
Expand Down
2 changes: 1 addition & 1 deletion pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def _stage_in(args, job):
client = StageInClient(job.infosys, logger=log, trace_report=trace_report)
activity = 'pr'
kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job) #, mode='stage-in')

client.prepare_sources(job.indata)
client.transfer(job.indata, activity=activity, **kwargs)
except PilotException as error:
import traceback
Expand Down
5 changes: 1 addition & 4 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,11 +832,8 @@ def get_dispatcher_dictionary(args):
# override for RC dev pilots
job_label = get_job_label(args)

logger.debug('infosys.queuedata.resource=%s' % infosys.queuedata.resource)
logger.debug('args.resource=%s' % args.resource)

data = {
'siteName': args.resource, # replace it with `infosys.queuedata.resource` to remove redundant '-r' option of pilot.py
'siteName': infosys.queuedata.resource, # next: remove redundant '-r' option of pilot.py
'computingElement': args.queue,
'prodSourceLabel': job_label,
'diskSpace': _diskspace,
Expand Down
23 changes: 13 additions & 10 deletions pilot/copytool/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
logger = logging.getLogger(__name__)


def get_timeout(filesize):
def get_timeout(filesize, add=0):
"""
Get a proper time-out limit based on the file size.
:param filesize: file size (int).
:return:
:param add: optional additional time to be added [s] (int)
:return: time-out in seconds (int).
"""

timeout_max = 3 * 3600 # 3 hours
timeout_min = 300 # self.timeout

timeout = timeout_min + int(filesize / 0.5e6) # approx < 0.5 Mb/sec
timeout = timeout_min + int(filesize / 0.5e6) + add # approx < 0.5 Mb/sec

return min(timeout, timeout_max)

Expand Down Expand Up @@ -57,13 +58,15 @@ def verify_catalog_checksum(fspec, path):
state = 'UNKNOWN_CHECKSUM_TYPE'
else:
checksum_local = calculate_checksum(path, algorithm=checksum_type)
if checksum_type == 'ad32':
checksum_type = 'adler32'
logger.info('checksum (catalog): %s (type: %s)' % (checksum_catalog, checksum_type))
logger.info('checksum (local): %s' % checksum_local)
if checksum_local and checksum_local != '' and checksum_local != checksum_catalog:
diagnostics = 'checksum verification failed: checksum (catalog)=%s != checksum (local)=%s' % \
(checksum_catalog, checksum_local)
diagnostics = 'checksum verification failed for LFN=%s: checksum (catalog)=%s != checksum (local)=%s' % \
(fspec.lfn, checksum_catalog, checksum_local)
logger.warning(diagnostics)
fspec.status_code = ErrorCodes.GETADMISMATCH if checksum_type == 'ad32' else ErrorCodes.GETMD5MISMATCH
fspec.status_code = ErrorCodes.GETADMISMATCH if checksum_type == 'adler32' else ErrorCodes.GETMD5MISMATCH
fspec.status = 'failed'
state = 'AD_MISMATCH' if checksum_type == 'ad32' else 'MD_MISMATCH'
else:
Expand Down Expand Up @@ -143,7 +146,7 @@ def output_line_scan(ret, output):
"""

for line in output.split('\n'):
m = re.search("Details\s*:\s*(?P<error>.*)", line)
m = re.search("[Dd]etails\s*:\s*(?P<error>.*)", line)
if m:
ret['error'] = m.group('error')
elif 'service_unavailable' in line:
Expand Down Expand Up @@ -197,8 +200,8 @@ def resolve_common_transfer_errors(output, is_stagein=True):
ret = get_error_info(ErrorCodes.SERVICENOTAVAILABLE, 'SERVICE_ERROR', output)
elif "Network is unreachable" in output:
ret = get_error_info(ErrorCodes.UNREACHABLENETWORK, 'NETWORK_UNREACHABLE', output)
else:
# reg exp the output
ret = output_line_scan(ret, output)

# reg exp the output to get real error message
ret = output_line_scan(ret, output)

return ret
4 changes: 2 additions & 2 deletions pilot/copytool/gfal.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

require_replicas = True ## indicate if given copytool requires input replicas to be resolved

allowed_schemas = ['srm', 'gsiftp', 'https', 'davs'] # prioritized list of supported schemas for transfers by given copytool
allowed_schemas = ['srm', 'gsiftp', 'https', 'davs', 'root'] # prioritized list of supported schemas for transfers by given copytool


def is_valid_for_copy_in(files):
Expand Down Expand Up @@ -57,7 +57,7 @@ def copy_in(files, **kwargs):
if not check_for_gfal():
raise StageInFailure("No GFAL2 tools found")

localsite = os.environ.get('DQ2_LOCAL_SITE_ID', None)
localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None))
for fspec in files:
# update the trace report
localsite = localsite if localsite else fspec.ddmendpoint
Expand Down
4 changes: 2 additions & 2 deletions pilot/copytool/lsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def copy_in(files, **kwargs):
copysetup = get_copysetup(copytools, 'lsm')
trace_report = kwargs.get('trace_report')
allow_direct_access = kwargs.get('allow_direct_access')
localsite = os.environ.get('DQ2_LOCAL_SITE_ID', None)
localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None))

for fspec in files:
# update the trace report
Expand Down Expand Up @@ -315,7 +315,7 @@ def move(source, destination, dst_in=True, copysetup="", options=None):
cmd += "lsm-put %s" % args

try:
exit_code, stdout, stderr = execute(cmd) #, timeout=get_timeout(fspec.filesize))
exit_code, stdout, stderr = execute(cmd, usecontainer=False, copytool=True) #, timeout=get_timeout(fspec.filesize))
except Exception as e:
if dst_in:
exit_code = ErrorCodes.STAGEINFAILED
Expand Down
Loading

0 comments on commit 092335b

Please sign in to comment.