Skip to content
This repository has been archived by the owner on Jan 30, 2024. It is now read-only.

2.2.0 (25) #217

Merged
merged 47 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
cfc3a94
New version 2.1.26
PalNilsson Sep 9, 2019
3ce9386
Corrected repo name, pilot->panda-pilot
PalNilsson Sep 9, 2019
e259677
Testing container use in lsm stage-in/out
PalNilsson Sep 10, 2019
ece7d70
do_use_container() now supports containers in stage-in/out
PalNilsson Sep 10, 2019
d05410c
Added some protection against unset job object in container code
PalNilsson Sep 10, 2019
7b781b3
Job object is now optional in singularity_wrapper(). Middleware image…
PalNilsson Sep 11, 2019
e4625b2
Set usercontainer=False in lsm
PalNilsson Sep 11, 2019
5e1a7e6
Merge remote-tracking branch 'upstream/next' into next
PalNilsson Sep 12, 2019
0203236
Updated build number after merge with AA PR
PalNilsson Sep 12, 2019
593a9d6
Now confirming that the worker node has a proper SC_CLK_TCK (problems…
PalNilsson Sep 13, 2019
2f6a8d3
Inclreased sleep from 0.1 to 1 s in thread counting function
PalNilsson Sep 13, 2019
12bf682
Merge remote-tracking branch 'upstream/next' into next
PalNilsson Sep 13, 2019
1231039
Updated build number after merge with AA PR 2
PalNilsson Sep 13, 2019
0e15fb4
Correction in ExtendedConfig to resolve hasattr() difference between …
PalNilsson Sep 18, 2019
0990399
Correction in ExtendedConfig to resolve DictProxyType difference betw…
PalNilsson Sep 18, 2019
395763d
Merge remote-tracking branch 'upstream/next' into next
PalNilsson Sep 19, 2019
3fb50f6
Merge with AA PR; fix for type in ES code
PalNilsson Sep 19, 2019
6f02707
Corrected ad32->adler32 which led to MD5SUM error being reported inst…
PalNilsson Sep 23, 2019
193df85
Cleanup
PalNilsson Sep 23, 2019
4034323
Cleanup
PalNilsson Sep 24, 2019
c132631
Additional cleanup
PalNilsson Sep 24, 2019
ebb76fb
Following an update in AdeS auto-setup, now using RUCIO_LOCAL_SITE_ID…
PalNilsson Sep 24, 2019
c0636ec
Simplification of pilot arguments: now using resource name from queue…
PalNilsson Sep 24, 2019
a444fb8
Corrected syntax
PalNilsson Sep 24, 2019
07c04ec
Updated build number after merge with David's PR
PalNilsson Sep 24, 2019
0b63f3f
Updated build number, 2.1.26 -> 2.2.0
PalNilsson Sep 24, 2019
de12931
Removed debug messages
PalNilsson Sep 25, 2019
c9dd73c
Added fix for objectstore copytool + added timeout decorator to rucio…
PalNilsson Sep 26, 2019
d577e2c
Now using file size based time-outs for rucio api transfers on the pi…
PalNilsson Sep 26, 2019
8b17d7c
Now using turl = fspec.turl or fspec.surl as recommended by Alexey
PalNilsson Sep 26, 2019
75a2398
Merge with David's PR
PalNilsson Oct 1, 2019
7aaf2f4
Updated build number after merge with David's code
PalNilsson Oct 3, 2019
6347b84
Corrected usages of format_diagnostics() (wrong argument) after merge…
PalNilsson Oct 3, 2019
360e346
Changed free_space_limit from 5 GB to 2 GB as requested by H. Severini
PalNilsson Oct 4, 2019
acdc8c3
Added ATHENA_CORE_NUMBER to add_athena_proc_number()
PalNilsson Oct 4, 2019
c9c6e88
Added ATHENA_CORE_NUMBER to add_athena_proc_number()
PalNilsson Oct 4, 2019
07995b0
Corrected PSS+Swap -> pss+swap in leak calculation. Merge with David'…
PalNilsson Oct 7, 2019
7e0ff22
Corrected diagnostics
PalNilsson Oct 7, 2019
828ffa9
Improved formatting of error diagnostics in format_diagnostics()
PalNilsson Oct 8, 2019
3e8f999
Added handling for STATFILEPROBLEM, new error code 1352
PalNilsson Oct 8, 2019
008a7d8
Avoiding using timeout function since it leads to SIGTERMs. To be inv…
PalNilsson Oct 9, 2019
80c279e
Corrected spelling error
PalNilsson Oct 9, 2019
4e12971
Removed timeout usages
PalNilsson Oct 9, 2019
b5a98ae
Now updating localsite and remotesite before trace is sent (necessary…
PalNilsson Oct 10, 2019
0386148
Temporarily removed the traces_copy_out from the rucio download clien…
PalNilsson Oct 10, 2019
3508141
Updated comment
PalNilsson Oct 10, 2019
a70cf96
Clarified logic in add_athena_proc_number()
PalNilsson Oct 10, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.25.11
2.2.0.25
2 changes: 1 addition & 1 deletion pilot/api/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def get_table(self, filename, header=None, separator="\t", convert_to_float=True

return get_table_from_file(filename, header=header, separator=separator, convert_to_float=convert_to_float)

def get_fitted_data(self, filename, x_name='Time', y_name='PSS+Swap', precision=2, tails=True):
def get_fitted_data(self, filename, x_name='Time', y_name='pss+swap', precision=2, tails=True):
"""
Return a properly formatted job metrics string with analytics data.
Currently the function returns a fit for PSS+Swap vs time, whose slope measures memory leaks.
Expand Down
6 changes: 3 additions & 3 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,9 @@ def transfer(self, files, activity='default', **kwargs): # noqa: C901
self.logger.warning('caught time-out exception: %s' % caught_errors[0])
else:
code = errors.STAGEINFAILED if self.mode == 'stage-in' else errors.STAGEOUTFAILED # is it stage-in/out?
self.logger.fatal('caught_errors=%s' % str(caught_errors))
self.logger.fatal('code=%s' % str(code))
raise PilotException('failed to transfer files using copytools=%s' % (copytools), errmsg, code=code)
details = str(caught_errors) + ":" + 'failed to transfer files using copytools=%s' % copytools
self.logger.fatal(details)
raise PilotException(details, code=code)

self.logger.debug('result=%s' % str(result))
return result
Expand Down
30 changes: 26 additions & 4 deletions pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class ErrorCodes:
UNRECOGNIZEDTRFARGUMENTS = 1349
EMPTYOUTPUTFILE = 1350
UNRECOGNIZEDTRFSTDERR = 1351
STATFILEPROBLEM = 1352

_error_messages = {
GENERALERROR: "General pilot error, consult batch log",
Expand Down Expand Up @@ -246,7 +247,8 @@ class ErrorCodes:
SINGULARITYRESOURCEUNAVAILABLE: "Singularity: Resource temporarily unavailable", # not the same as RESOURCEUNAVAILABLE
UNRECOGNIZEDTRFARGUMENTS: "Unrecognized transform arguments",
EMPTYOUTPUTFILE: "Empty output file detected",
UNRECOGNIZEDTRFSTDERR: "Unrecognized fatal error in transform stderr"
UNRECOGNIZEDTRFSTDERR: "Unrecognized fatal error in transform stderr",
STATFILEPROBLEM: "Failed to stat proc file for CPU consumption calculation"
}

put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
Expand Down Expand Up @@ -407,12 +409,32 @@ def format_diagnostics(self, code, diag):
standard_message = self._error_messages[code] + ":"
except Exception:
standard_message = ""

# extract the relevant info for reporting exceptions
if "Traceback" in diag:
pattern = 'details:(.+)'
found = re.findall(pattern, diag)
if found:
diag = found[0]
diag = diag.replace("[PilotException(\'", '')
diag = diag.replace('[PilotException(\"', '')
diag = diag.replace(' ', ' ')

try:
if diag:
if len(diag) + len(standard_message) > max_message_length:
error_message = standard_message + diag[-(max_message_length - len(standard_message)):]
# ensure that the message to be displayed on the PanDA monitor is not longer than max_message_length
# if it is, then reformat it so that the standard message is always displayed first.
# e.g. "Failed to stage-in file:abcdefghijklmnopqrstuvwxyz0123456789"
if standard_message in diag:
if len(diag) > max_message_length:
error_message = standard_message + diag[-(max_message_length - len(standard_message)):]
else:
error_message = standard_message + diag[len(standard_message):][-max_message_length:]
else:
error_message = standard_message + diag
if len(diag) + len(standard_message) > max_message_length:
error_message = standard_message + diag[-(max_message_length - len(standard_message)):]
else:
error_message = standard_message + diag
else:
error_message = standard_message
except Exception:
Expand Down
4 changes: 2 additions & 2 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def _stage_in(args, job):
import traceback
error_msg = traceback.format_exc()
log.error(error_msg)
msg = errors.format_diagnostics(error.get_error_code(), error.get_last_error())
msg = errors.format_diagnostics(error.get_error_code(), error_msg)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code(), msg=msg)
except Exception as error:
log.error('failed to stage-in: error=%s' % error)
Expand Down Expand Up @@ -659,7 +659,7 @@ def _do_stageout(job, xdata, activity, title):
import traceback
error_msg = traceback.format_exc()
log.error(error_msg)
msg = errors.format_diagnostics(error.get_error_code(), error.get_last_error())
msg = errors.format_diagnostics(error.get_error_code(), error_msg)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code(), msg=msg)
except Exception:
import traceback
Expand Down
5 changes: 1 addition & 4 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,11 +832,8 @@ def get_dispatcher_dictionary(args):
# override for RC dev pilots
job_label = get_job_label(args)

logger.debug('infosys.queuedata.resource=%s' % infosys.queuedata.resource)
logger.debug('args.resource=%s' % args.resource)

data = {
'siteName': args.resource, # replace it with `infosys.queuedata.resource` to remove redundant '-r' option of pilot.py
'siteName': infosys.queuedata.resource, # next: remove redundant '-r' option of pilot.py
'computingElement': args.queue,
'prodSourceLabel': job_label,
'diskSpace': _diskspace,
Expand Down
15 changes: 9 additions & 6 deletions pilot/copytool/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
logger = logging.getLogger(__name__)


def get_timeout(filesize):
def get_timeout(filesize, add=0):
"""
Get a proper time-out limit based on the file size.

:param filesize: file size (int).
:return:
:param add: optional additional time to be added [s] (int)
:return: time-out in seconds (int).
"""

timeout_max = 3 * 3600 # 3 hours
timeout_min = 300 # self.timeout

timeout = timeout_min + int(filesize / 0.5e6) # approx < 0.5 Mb/sec
timeout = timeout_min + int(filesize / 0.5e6) + add # approx < 0.5 Mb/sec

return min(timeout, timeout_max)

Expand Down Expand Up @@ -57,13 +58,15 @@ def verify_catalog_checksum(fspec, path):
state = 'UNKNOWN_CHECKSUM_TYPE'
else:
checksum_local = calculate_checksum(path, algorithm=checksum_type)
if checksum_type == 'ad32':
checksum_type = 'adler32'
logger.info('checksum (catalog): %s (type: %s)' % (checksum_catalog, checksum_type))
logger.info('checksum (local): %s' % checksum_local)
if checksum_local and checksum_local != '' and checksum_local != checksum_catalog:
diagnostics = 'checksum verification failed: checksum (catalog)=%s != checksum (local)=%s' % \
(checksum_catalog, checksum_local)
diagnostics = 'checksum verification failed for LFN=%s: checksum (catalog)=%s != checksum (local)=%s' % \
(fspec.lfn, checksum_catalog, checksum_local)
logger.warning(diagnostics)
fspec.status_code = ErrorCodes.GETADMISMATCH if checksum_type == 'ad32' else ErrorCodes.GETMD5MISMATCH
fspec.status_code = ErrorCodes.GETADMISMATCH if checksum_type == 'adler32' else ErrorCodes.GETMD5MISMATCH
fspec.status = 'failed'
state = 'AD_MISMATCH' if checksum_type == 'ad32' else 'MD_MISMATCH'
else:
Expand Down
2 changes: 1 addition & 1 deletion pilot/copytool/gfal.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def copy_in(files, **kwargs):
if not check_for_gfal():
raise StageInFailure("No GFAL2 tools found")

localsite = os.environ.get('DQ2_LOCAL_SITE_ID', None)
localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None))
for fspec in files:
# update the trace report
localsite = localsite if localsite else fspec.ddmendpoint
Expand Down
4 changes: 2 additions & 2 deletions pilot/copytool/lsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def copy_in(files, **kwargs):
copysetup = get_copysetup(copytools, 'lsm')
trace_report = kwargs.get('trace_report')
allow_direct_access = kwargs.get('allow_direct_access')
localsite = os.environ.get('DQ2_LOCAL_SITE_ID', None)
localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None))

for fspec in files:
# update the trace report
Expand Down Expand Up @@ -315,7 +315,7 @@ def move(source, destination, dst_in=True, copysetup="", options=None):
cmd += "lsm-put %s" % args

try:
exit_code, stdout, stderr = execute(cmd) #, timeout=get_timeout(fspec.filesize))
exit_code, stdout, stderr = execute(cmd, usecontainer=False, copytool=True) #, timeout=get_timeout(fspec.filesize))
except Exception as e:
if dst_in:
exit_code = ErrorCodes.STAGEINFAILED
Expand Down
8 changes: 6 additions & 2 deletions pilot/copytool/objectstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,14 @@ def copy_in(files, **kwargs):
cmd += ['/usr/bin/env', 'rucio', '-v', 'download', '--no-subdir', '--dir', dst]
if require_replicas:
cmd += ['--rse', fspec.replicas[0][0]]
if fspec.surl:

# a copytool module should consider fspec.turl for transfers, and could failback to fspec.surl,
# but normally fspec.turl (transfer url) is mandatory and already populated by the top workflow
turl = fspec.turl or fspec.surl
if turl:
if fspec.ddmendpoint:
cmd.extend(['--rse', fspec.ddmendpoint])
cmd.extend(['--pfn', fspec.surl])
cmd.extend(['--pfn', turl])
cmd += ['%s:%s' % (fspec.scope, fspec.lfn)]

rcode, stdout, stderr = execute(" ".join(cmd), **kwargs)
Expand Down
30 changes: 21 additions & 9 deletions pilot/copytool/rucio.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# - Alexey Anisenkov, anisyonk@cern.ch, 2018
# - Paul Nilsson, paul.nilsson@cern.ch, 2018
# - Tomas Javurek, tomas.javurek@cern.ch, 2019
# - David Cameron, david.cameron@cern.ch, 2019

from __future__ import absolute_import

Expand All @@ -19,6 +20,7 @@

from .common import resolve_common_transfer_errors, verify_catalog_checksum, get_timeout
from pilot.common.exception import PilotException, StageOutFailure, ErrorCodes
#from pilot.util.timer import timeout

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -49,7 +51,7 @@ def verify_stage_out(fspec):
return rsemgr.exists(rse_settings, [uploaded_file])


# @timeout(seconds=600)
#@timeout(seconds=10800)
def copy_in(files, **kwargs):
"""
Download given files using rucio copytool.
Expand All @@ -66,7 +68,7 @@ def copy_in(files, **kwargs):
# don't spoil the output, we depend on stderr parsing
os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'

localsite = os.environ.get('DQ2_LOCAL_SITE_ID', None)
localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None))
for fspec in files:
logger.info('rucio copytool, downloading file with scope:%s lfn:%s' % (str(fspec.scope), str(fspec.lfn)))
# update the trace report
Expand All @@ -90,6 +92,8 @@ def copy_in(files, **kwargs):

trace_report_out = []
try:
#transfer_timeout = get_timeout(fspec.filesize, add=10) # give the API a chance to do the time-out first
#timeout(transfer_timeout)(_stage_in_api)(dst, fspec, trace_report, trace_report_out)
_stage_in_api(dst, fspec, trace_report, trace_report_out)
except Exception as error:
error_msg = str(error)
Expand Down Expand Up @@ -130,7 +134,7 @@ def copy_in(files, **kwargs):
return files


# @timeout(seconds=600)
#@timeout(seconds=10800)
def copy_out(files, **kwargs):
"""
Upload given files using rucio copytool.
Expand All @@ -147,8 +151,11 @@ def copy_out(files, **kwargs):
ignore_errors = kwargs.pop('ignore_errors', False)
trace_report = kwargs.get('trace_report')

localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None))
for fspec in files:
logger.info('rucio copytool, uploading file with scope: %s and lfn: %s' % (str(fspec.scope), str(fspec.lfn)))
localsite = localsite if localsite else fspec.ddmendpoint
trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint)
trace_report.update(scope=fspec.scope, dataset=fspec.dataset, url=fspec.surl, filesize=fspec.filesize)
trace_report.update(catStart=time(), filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
fspec.status_code = 0
Expand All @@ -161,6 +168,8 @@ def copy_out(files, **kwargs):
logger.info('the file will be uploaded to %s' % str(fspec.ddmendpoint))
trace_report_out = []
try:
#transfer_timeout = get_timeout(fspec.filesize, add=10) # give the API a chance to do the time-out first
#timeout(transfer_timeout)(_stage_out_api)(fspec, summary_file_path, trace_report, trace_report_out)
_stage_out_api(fspec, summary_file_path, trace_report, trace_report_out)
except Exception as error:
error_msg = str(error)
Expand Down Expand Up @@ -248,12 +257,16 @@ def _stage_in_api(dst, fspec, trace_report, trace_report_out):
trace_pattern = {}
if trace_report:
trace_pattern = trace_report
result = []

# download client raises an exception if any file failed
if fspec.turl:
result = download_client.download_pfns([f], 1, trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out)
# restore the following line when it is supported on the rucio client
#result = download_client.download_pfns([f], 1, trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out)
result = download_client.download_pfns([f], 1, trace_custom_fields=trace_pattern)
else:
result = download_client.download_dids([f], trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out)
# restore the following line when it is supported on the rucio client
#result = download_client.download_dids([f], trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out)
result = download_client.download_dids([f], trace_custom_fields=trace_pattern)

logger.debug('Rucio download client returned %s' % result)

Expand Down Expand Up @@ -295,11 +308,12 @@ def _stage_out_api(fspec, summary_file_path, trace_report, trace_report_out):
# upload client raises an exception if any file failed
try:
# TODO: Add traces_copy_out=trace_report_out when supported in rucio
upload_client.upload([f], summary_file_path=summary_file_path)
result = upload_client.upload([f], summary_file_path=summary_file_path)
except UnboundLocalError:
logger.warning('rucio still needs a bug fix of the summary in the uploadclient')

logger.debug('Rucio upload client returned %s' % result)

try:
file_exists = verify_stage_out(fspec)
logger.info('File exists at the storage: %s' % str(file_exists))
Expand All @@ -309,5 +323,3 @@ def _stage_out_api(fspec, summary_file_path, trace_report, trace_report_out):
msg = 'stageOut: File existence verification failed with: %s' % str(e)
logger.info(msg)
raise StageOutFailure(msg)

return 'DONE'
2 changes: 1 addition & 1 deletion pilot/copytool/xrdcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def copy_in(files, **kwargs):
coption = _resolve_checksum_option(setup, **kwargs)
trace_report = kwargs.get('trace_report')

localsite = os.environ.get('DQ2_LOCAL_SITE_ID', None)
localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', os.environ.get('DQ2_LOCAL_SITE_ID', None))
for fspec in files:
# update the trace report
localsite = localsite if localsite else fspec.ddmendpoint
Expand Down
41 changes: 29 additions & 12 deletions pilot/user/atlas/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,31 +293,45 @@ def get_generic_payload_command(cmd, job, prepareasetup, userjob):

def add_athena_proc_number(cmd):
"""
Add the ATHENA_PROC_NUMBER to the payload command if necessary
Add the ATHENA_PROC_NUMBER and ATHENA_CORE_NUMBER to the payload command if necessary.

:param cmd: payload execution command (string).
:return: updated payload execution command (string).
"""

# get the values if they exist
try:
value1 = int(os.environ['ATHENA_PROC_NUMBER_JOB'])
except Exception as e:
logger.warning('failed to convert ATHENA_PROC_NUMBER_JOB to int: %s' % e)
value1 = None
try:
value2 = int(os.environ['ATHENA_CORE_NUMBER'])
except Exception as e:
logger.warning('failed to convert ATHENA_CORE_NUMBER to int: %s' % e)
value2 = None

if "ATHENA_PROC_NUMBER" not in cmd:
if "ATHENA_PROC_NUMBER" in os.environ:
cmd = 'export ATHENA_PROC_NUMBER=%s;' % os.environ['ATHENA_PROC_NUMBER'] + cmd
elif "ATHENA_PROC_NUMBER_JOB" in os.environ:
try:
value = int(os.environ['ATHENA_PROC_NUMBER_JOB'])
except Exception:
logger.warning("failed to convert ATHENA_PROC_NUMBER_JOB=%s to int" %
os.environ['ATHENA_PROC_NUMBER_JOB'])
elif "ATHENA_PROC_NUMBER_JOB" in os.environ and value1:
if value1 > 1:
cmd = 'export ATHENA_PROC_NUMBER=%d;' % value1 + cmd
else:
if value > 1:
cmd = 'export ATHENA_PROC_NUMBER=%d;' % value + cmd
else:
logger.info("will not add ATHENA_PROC_NUMBER to cmd since the value is %d" % value)
logger.info("will not add ATHENA_PROC_NUMBER to cmd since the value is %s" % str(value1))
else:
logger.warning("don't know how to set ATHENA_PROC_NUMBER (could not find it in os.environ)")
else:
logger.info("ATHENA_PROC_NUMBER already in job command")

if 'ATHENA_CORE_NUMBER' in os.environ and value2:
if value2 > 1:
cmd = 'export ATHENA_CORE_NUMBER=%d;' % value2 + cmd
else:
logger.info("will not add ATHENA_CORE_NUMBER to cmd since the value is %s" % str(value2))
else:
logger.warning('there is no ATHENA_CORE_NUMBER in os.environ (cannot add it to payload command)')

return cmd


Expand Down Expand Up @@ -1597,9 +1611,12 @@ def verify_ncores(corecount):
# ATHENA_PROC_NUMBER_JOB will always be the value from the job definition)
if athena_proc_number:
logger.info("encountered a set ATHENA_PROC_NUMBER (%d), will not overwrite it" % athena_proc_number)
logger.info('set ATHENA_CORE_NUMBER to same value as ATHENA_PROC_NUMBER')
os.environ['ATHENA_CORE_NUMBER'] = "%s" % athena_proc_number
else:
os.environ['ATHENA_PROC_NUMBER_JOB'] = "%s" % corecount
logger.info("set ATHENA_PROC_NUMBER_JOB to %s (ATHENA_PROC_NUMBER will not be overwritten)" % corecount)
os.environ['ATHENA_CORE_NUMBER'] = "%s" % corecount
logger.info("set ATHENA_PROC_NUMBER_JOB and ATHENA_CORE_NUMBER to %s (ATHENA_PROC_NUMBER will not be overwritten)" % corecount)


def verify_job(job):
Expand Down
Loading