Skip to content
This repository has been archived by the owner on Jan 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #132 from PanDAWMS/next
Browse files Browse the repository at this point in the history
2.1.4
  • Loading branch information
PalNilsson authored Apr 10, 2019
2 parents 1c27e0c + 930b6ec commit cf151e8
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 23 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.3.8
2.1.4.10
26 changes: 20 additions & 6 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
import copy
import os
import subprocess
import tarfile
#import tarfile
import time

try:
import Queue as queue # noqa: N813
except Exception:
import queue # python 3

from contextlib import closing # for Python 2.6 compatibility - to fix a problem with tarfile
#from contextlib import closing # for Python 2.6 compatibility - to fix a problem with tarfile

from pilot.api.data import StageInClient, StageOutClient
from pilot.api.es_data import StageInESClient
Expand All @@ -34,6 +34,7 @@
from pilot.util.config import config
from pilot.util.constants import PILOT_PRE_STAGEIN, PILOT_POST_STAGEIN, PILOT_PRE_STAGEOUT, PILOT_POST_STAGEOUT,\
LOG_TRANSFER_IN_PROGRESS, LOG_TRANSFER_DONE, LOG_TRANSFER_NOT_DONE, LOG_TRANSFER_FAILED, SERVER_UPDATE_RUNNING
from pilot.util.container import execute
from pilot.util.filehandling import find_executable, remove
from pilot.util.timing import add_to_pilot_timing
from pilot.util.tracereport import TraceReport
Expand Down Expand Up @@ -552,11 +553,17 @@ def create_log(job, logfile, tarball_name):

log.info('will create archive %s' % fullpath)
try:
with closing(tarfile.open(name=fullpath, mode='w:gz', dereference=True)) as archive:
archive.add(os.path.basename(job.workdir), recursive=True)
#newdirnm = "tarball_PandaJob_%s" % job.jobid
#tarballnm = "%s.tar.gz" % newdirnm
#os.rename(job.workdir, newdirnm)
cmd = "pwd;tar cvfz %s %s --dereference --one-file-system; echo $?" % (fullpath, tarball_name)
exit_code, stdout, stderr = execute(cmd)
#with closing(tarfile.open(name=fullpath, mode='w:gz', dereference=True)) as archive:
# archive.add(os.path.basename(job.workdir), recursive=True)
except Exception as e:
raise LogFileCreationFailure(e)

else:
log.debug('stdout = %s' % stdout)
log.debug('renaming %s back to %s' % (job.workdir, orgworkdir))
try:
os.rename(job.workdir, orgworkdir)
Expand Down Expand Up @@ -668,7 +675,14 @@ def _stage_out_new(job, args):

job.status['LOG_TRANSFER'] = LOG_TRANSFER_IN_PROGRESS
logfile = job.logdata[0]
create_log(job, logfile, 'tarball_PandaJob_%s_%s' % (job.jobid, job.infosys.pandaqueue))

try:
create_log(job, logfile, 'tarball_PandaJob_%s_%s' % (job.jobid, job.infosys.pandaqueue))
except LogFileCreationFailure as e:
log.warning('failed to create tar file: %s' % e)
set_pilot_state(job=job, state="failed")
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOGFILECREATIONFAILURE)
return False

if not _do_stageout(job, [logfile], ['pl', 'pw', 'w'], title='log'):
is_success = False
Expand Down
68 changes: 56 additions & 12 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,14 +294,14 @@ def get_data_structure(job, state, args, xml=None, metadata=None):
else:
data['pilotErrorCode'] = pilot_error_code

# add error info
pilot_error_diag = job.piloterrordiag
pilot_error_diags = job.piloterrordiags
if pilot_error_diags != []:
log.warning('pilotErrorDiags = %s (will report primary/first error diag)' % str(pilot_error_diags))
data['pilotErrorDiag'] = pilot_error_diags[0]
else:
data['pilotErrorDiag'] = pilot_error_diag

data['transExitCode'] = job.transexitcode
data['exeErrorCode'] = job.exeerrorcode
data['exeErrorDiag'] = job.exeerrordiag
Expand Down Expand Up @@ -345,22 +345,60 @@ def get_data_structure(job, state, args, xml=None, metadata=None):
if stdout_tail:
data['stdout'] = stdout_tail

if state == 'finished' or state == 'failed':
time_getjob, time_stagein, time_payload, time_stageout, time_total_setup = timing_report(job.jobid, args)
data['pilotTiming'] = "%s|%s|%s|%s|%s" % \
(time_getjob, time_stagein, time_payload, time_stageout, time_total_setup)
# add memory information if available
add_memory_info(data, job.workdir)

# add log extracts (for failed/holding jobs or for jobs with outbound connections)
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.diagnose' % pilot_user, globals(), locals(), [pilot_user], -1)
extracts = user.get_log_extracts(job, state)
if extracts != "":
log.warning('pilot log extracts:\n%s' % extracts)
data['pilotLog'] = extracts
if state == 'finished' or state == 'failed':
add_timing_and_extracts(data, job, state, args)

return data


def add_timing_and_extracts(data, job, state, args):
"""
Add timing info and log extracts to data structure for a completed job (finished or failed) to be sent to server.
Note: this function updates the data dictionary.
:param data: data structure (dictionary).
:param job: job object.
:param state: state of the job (string).
:param args: pilot args.
:return:
"""

time_getjob, time_stagein, time_payload, time_stageout, time_total_setup = timing_report(job.jobid, args)
data['pilotTiming'] = "%s|%s|%s|%s|%s" % \
(time_getjob, time_stagein, time_payload, time_stageout, time_total_setup)

# add log extracts (for failed/holding jobs or for jobs with outbound connections)
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.diagnose' % pilot_user, globals(), locals(), [pilot_user], -1)
extracts = user.get_log_extracts(job, state)
if extracts != "":
logger.warning('pilot log extracts:\n%s' % extracts)
data['pilotLog'] = extracts


def add_memory_info(data, workdir):
"""
Add memory information (if available) to the data structure that will be sent to the server with job updates
Note: this function updates the data dictionary.
:param data: data structure (dictionary).
:param workdir: working directory of the job (string).
:return:
"""

pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
utilities = __import__('pilot.user.%s.utilities' % pilot_user, globals(), locals(), [pilot_user], -1)
try:
utility_node = utilities.get_memory_monitor_info(workdir)
data.update(utility_node)
except Exception as e:
logger.info('memory information not available: %s' % e)
pass


def get_list_of_log_files():
"""
Return a list of log files produced by the payload.
Expand Down Expand Up @@ -593,6 +631,12 @@ def get_dispatcher_dictionary(args):
if args.resource_type != "":
data['resourceType'] = args.resource_type

# add harvester fields
if 'HARVESTER_ID' in os.environ:
data['harvester_id'] = os.environ.get('HARVESTER_ID')
if 'HARVESTER_WORKER_ID' in os.environ:
data['worker_id'] = os.environ.get('HARVESTER_WORKER_ID')

return data


Expand Down
2 changes: 1 addition & 1 deletion pilot/info/jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def is_local(self):
"""

for fspec in self.indata:
if fspec.storage_token == 'local':
if fspec.storage_token == 'local' and '.lib.' not in fspec.lfn:
return True

def clean(self):
Expand Down
5 changes: 5 additions & 0 deletions pilot/user/atlas/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,11 @@ def remove_redundant_files(workdir, outputfiles=[]):
# run a second pass to clean up any broken links
cleanup_broken_links(workdir)

# remove any present user workDir
path = os.path.join(workdir, 'workDir')
if os.path.exists(path):
remove_dir_tree(path)


def get_utility_commands_list(order=None):
"""
Expand Down
2 changes: 1 addition & 1 deletion pilot/user/atlas/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def get_memory_monitor_info(workdir, allowtxtfile=False):
node['avgSWAP'] = summary_dictionary['Avg']['avgSwap']
node['avgPSS'] = summary_dictionary['Avg']['avgPSS']
except Exception as e:
logger.warning("exception caught while parsing memory monitor file: %s" % (e))
logger.warning("exception caught while parsing memory monitor file: %s" % e)
logger.warning("will add -1 values for the memory info")
node['maxRSS'] = -1
node['maxVMEM'] = -1
Expand Down
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# Pilot version
RELEASE = '2' # released number should be fixed at 2 for Pilot 2
VERSION = '1' # version number is '1' for first real Pilot 2 release, '0' until then, increased for bigger updates
REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '8' # build number should be reset to '1' for every new development cycle
REVISION = '4' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '10' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
5 changes: 5 additions & 0 deletions pilot/util/workernode.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ def get_disk_space(queuedata):
_maxinputsize = queuedata.maxwdir
logger.debug("resolved value: queuedata.maxwdir=%s" % _maxinputsize)

# grace margin, as discussed in https://its.cern.ch/jira/browse/ATLASPANDA-482
#margin = 10.0 # percent, read later from somewhere
#_maxinputsize = int(_maxinputsize * (1 - margin / 100.0))
#logger.info("applied a %d% margin to maxwdir: %d" % (margin, _maxinputsize))

try:
du = disk_usage(os.path.abspath("."))
_diskspace = int(du[2] / (1024 * 1024)) # need to convert from B to MB
Expand Down

0 comments on commit cf151e8

Please sign in to comment.