Skip to content
This repository has been archived by the owner on Jan 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #142 from PanDAWMS/next
Browse files Browse the repository at this point in the history
2.1.7
  • Loading branch information
PalNilsson authored May 7, 2019
2 parents 79caead + 521610e commit 0625c8c
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 26 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.6.7
2.1.7.4
98 changes: 76 additions & 22 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
from pilot.util.proxy import get_distinguished_name
from pilot.util.queuehandling import scan_for_jobs, put_in_queue
from pilot.util.timing import add_to_pilot_timing, timing_report, get_postgetjob_time, get_time_since, time_stamp
from pilot.util.workernode import get_disk_space, collect_workernode_info, get_node_name, is_virtual_machine
from pilot.util.workernode import get_disk_space, collect_workernode_info, get_node_name, is_virtual_machine, \
get_cpu_model

import logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -337,27 +338,6 @@ def get_data_structure(job, state, args, xml=None, metadata=None):
'siteName': args.site,
'node': get_node_name()}

# error codes
pilot_error_code = job.piloterrorcode
pilot_error_codes = job.piloterrorcodes
if pilot_error_codes != []:
log.warning('pilotErrorCodes = %s (will report primary/first error code)' % str(pilot_error_codes))
data['pilotErrorCode'] = pilot_error_codes[0]
else:
data['pilotErrorCode'] = pilot_error_code

# add error info
pilot_error_diag = job.piloterrordiag
pilot_error_diags = job.piloterrordiags
if pilot_error_diags != []:
log.warning('pilotErrorDiags = %s (will report primary/first error diag)' % str(pilot_error_diags))
data['pilotErrorDiag'] = pilot_error_diags[0]
else:
data['pilotErrorDiag'] = pilot_error_diag
data['transExitCode'] = job.transexitcode
data['exeErrorCode'] = job.exeerrorcode
data['exeErrorDiag'] = job.exeerrordiag

data['attemptNr'] = job.attemptnr

schedulerid = get_job_scheduler_id()
Expand Down Expand Up @@ -397,15 +377,89 @@ def get_data_structure(job, state, args, xml=None, metadata=None):
if stdout_tail:
data['stdout'] = stdout_tail

# add the core count
if job.corecount and job.corecount != 'null' and job.corecount != 'NULL':
data['coreCount'] = job.corecount

# get the number of events, should report in heartbeat in case of preempted.
if job.nevents != 0:
data['nEvents'] = job.nevents
log.info("total number of processed events: %d (read)" % job.nevents)
else:
log.info("payload/TRF did not report the number of read events")

# get the CU consumption time
constime = get_cpu_consumption_time(job.cpuconsumptiontime)
if constime and constime != -1:
data['cpuConsumptionTime'] = constime
data['cpuConsumptionUnit'] = job.cpuconsumptionunit + "+" + get_cpu_model()
data['cpuConversionFactor'] = job.cpuconversionfactor

# add memory information if available
add_memory_info(data, job.workdir)

if state == 'finished' or state == 'failed':
add_timing_and_extracts(data, job, state, args)
add_error_codes(data, job)

return data


def add_error_codes(data, job):
"""
Add error codes to data structure.
:param data: data dictionary.
:param job: job object.
:return:
"""

log = get_logger(job.jobid, logger)

# error codes
pilot_error_code = job.piloterrorcode
pilot_error_codes = job.piloterrorcodes
if pilot_error_codes != []:
log.warning('pilotErrorCodes = %s (will report primary/first error code)' % str(pilot_error_codes))
data['pilotErrorCode'] = pilot_error_codes[0]
else:
data['pilotErrorCode'] = pilot_error_code

# add error info
pilot_error_diag = job.piloterrordiag
pilot_error_diags = job.piloterrordiags
if pilot_error_diags != []:
log.warning('pilotErrorDiags = %s (will report primary/first error diag)' % str(pilot_error_diags))
data['pilotErrorDiag'] = pilot_error_diags[0]
else:
data['pilotErrorDiag'] = pilot_error_diag
data['transExitCode'] = job.transexitcode
data['exeErrorCode'] = job.exeerrorcode
data['exeErrorDiag'] = job.exeerrordiag


def get_cpu_consumption_time(cpuconsumptiontime):
"""
Get the CPU consumption time.
The function makes sure that the value exists and is within allowed limits (< 10^9).
:param cpuconsumptiontime: CPU consumption time (int/None).
:return: properly set CPU consumption time (int/None).
"""

constime = None

try:
constime = int(cpuconsumptiontime)
except Exception:
constime = None
if constime and constime > 10 ** 9:
logger.warning("unrealistic cpuconsumptiontime: %d (reset to -1)" % constime)
constime = -1

return constime


def add_timing_and_extracts(data, job, state, args):
"""
Add timing info and log extracts to data structure for a completed job (finished or failed) to be sent to server.
Expand Down
3 changes: 2 additions & 1 deletion pilot/info/queuedata.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class QueueData(BaseData):

direct_access_lan = False
direct_access_wan = False
use_pcache = False

maxwdir = 0 # in MB
maxrss = 0
Expand All @@ -86,7 +87,7 @@ class QueueData(BaseData):
str: ['name', 'appdir', 'catchall', 'platform', 'container_options', 'container_type',
'resource', 'state', 'status', 'site'],
dict: ['copytools', 'acopytools', 'astorages', 'aprotocols', 'acopytools_schemas'],
bool: ['direct_access_lan', 'direct_access_wan']
bool: ['direct_access_lan', 'direct_access_wan', 'use_pcache']
}

def __init__(self, data):
Expand Down
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# Pilot version
RELEASE = '2' # released number should be fixed at 2 for Pilot 2
VERSION = '1' # version number is '1' for first real Pilot 2 release, '0' until then, increased for bigger updates
REVISION = '6' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '7' # build number should be reset to '1' for every new development cycle
REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '5' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
50 changes: 50 additions & 0 deletions pilot/util/workernode.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# - Paul Nilsson, paul.nilsson@cern.ch, 2017

import os
import re
from string import find

from pilot.util.container import execute
Expand Down Expand Up @@ -208,3 +209,52 @@ def display_architecture_info():
dump("$MACHTYPE", cmd="echo")
else:
logger.info("\n%s" % stdout)


def get_cpu_model():
"""
Get cpu model and cache size from /proc/cpuinfo.
Example.
model name : Intel(R) Xeon(TM) CPU 2.40GHz
cache size : 512 KB
gives the return string "Intel(R) Xeon(TM) CPU 2.40GHz 512 KB".
:return: cpu model (string).
"""

cpumodel = ""
cpucache = ""
modelstring = ""

re_model = re.compile('^model name\s+:\s+(\w.+)')
re_cache = re.compile('^cache size\s+:\s+(\d+ KB)')

with open("/proc/cpuinfo", "r") as f:

# loop over all lines in cpuinfo
for line in f.readlines():
# try to grab cpumodel from current line
model = re_model.search(line)
if model:
# found cpu model
cpumodel = model.group(1)

# try to grab cache size from current line
cache = re_cache.search(line)
if cache:
# found cache size
cpucache = cache.group(1)

# stop after 1st pair found - can be multiple cpus
if cpumodel and cpucache:
# create return string
modelstring = cpumodel + " " + cpucache
break

# default return string if no info was found
if not modelstring:
modelstring = "UNKNOWN"

return modelstring

0 comments on commit 0625c8c

Please sign in to comment.