Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

Commit

Permalink
Adding a rucio client wrapper class && getFileCountDataset from rucio
Browse files Browse the repository at this point in the history
Check file presence at both systems - Rucio & Phedex

Recalculate missing_phedex with corrections for files managed by Rucio.

Typo

Check filecount on block level, fetch filecount from metadata instead if len(filenames)

Adding 'account=unified' in default config && typo

Split unified config lists to relval and nonrelval
  • Loading branch information
todor-ivanov authored and tivanov committed May 15, 2020
1 parent 5cca7f2 commit 6e337dd
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 3 deletions.
96 changes: 96 additions & 0 deletions RucioClient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#!/usr/bin/env python
"""
Encapsulates requests to Rucio API
Requieres:
rucio-client
Environment:
export X509_USER_PROXY=/tmp/x509up_$UID
export RUCIO_HOME=~/.local/
${RUCIO_HOME}/rucio.cfg
"""

from rucio.client import Client

class RucioClient(Client):
"""
A wrapper class for the Rucio client.
"""
def __init__(self, **kwargs):
"""
Default configuration provided directly into the constructor to avoid
the need of an external configuration file.
All arguments passed to the constructor supersede the defaults.
"""

defaultConfig = {
'rucio_host': 'http://cms-rucio.cern.ch',
'auth_host': 'https://cms-rucio-auth.cern.ch',
'auth_type': 'x509_proxy',
'ca_cert': '/etc/grid-security/certificates/',
'account': 'unified'
}

defaultConfig.update(kwargs)

super(RucioClient, self).__init__(**defaultConfig)
self.scope = 'cms'

def getFileCountDataset(self, dataset):
"""
Returns the number of files registered in Rucio
"""
try:
files = list(self.list_files(self.scope, dataset))
except Exception as e:
print(str(e))
return 0
return len(files)

def getFileNamesDataset(self, dataset):
"""
Returns a set of file names in a dataset registered in Rucio
"""
try:
files = list(self.list_files(self.scope, dataset))
except Exception as e:
print(str(e))
return []
fileNames = [_file['name'] for _file in files]
return fileNames

def getBlockNamesDataset(self, dataset):
"""
Returns a set of block names in a dataset registerd in Rucio
"""
try:
blockNames = [block['name'] for block in self.list_content(self.scope, dataset)]
except Exception as e:
print(str(e))
return []
return blockNames

def getFileCountBlock(self, block):
"""
Returns the number of files in a block registered in Rucio
"""
try:
numFiles = self.get_metadata(self.scope, block)['length']
except Exception as e:
print(str(e))
return 0
return numFiles

def getFileCountPerBlock(self, dataset):
"""
Returns the number of files per block in a dataset registered in Rucio
"""
# we need blocks to be a list of tuples so we can create a set out of this
try:
blocks = []
for block in self.getBlockNamesDataset(dataset):
blocks.append((block, self.getFileCountBlock(block)))
except Exception as e:
print(str(e))
return 0
return blocks

1 change: 1 addition & 0 deletions Unified/RucioClient.py
49 changes: 46 additions & 3 deletions Unified/checkor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import time
import random
import math
from RucioClient import RucioClient
from McMClient import McMClient
from JIRAClient import JIRAClient
from htmlor import htmlor
Expand Down Expand Up @@ -1012,10 +1013,43 @@ def upward( ns ):

time_point("checked custodiality", sub_lap=True)

## presence in phedex
## presence in phedex and/or rucio
phedex_presence ={}
rucioClient = RucioClient()
for output in wfi.request['OutputDatasets']:
phedex_presence[output] = phedexClient.getFileCountDataset(url, output )
_,dsn,process_string,tier = output.split('/')
if tier in set(UC.get('tiers_to_rucio_relval')) | set(UC.get('tiers_to_rucio_nonrelval')):
# - creates lists of tuples ot the type: ('blockName', numFiles)
# for all blockNames per Dataset known to both Phedex and Rucio
# - creates the union of the two sets in order to avoid any duplicates
# (files present in both systems)
# - sums the number of files for the union set
# - assigns the value to 'phedex_presence' even though the full sum
# of the files is present in both systems - this way we avoid
# changing the code for the rest of the consistency checks
phedex_filecount_pb = phedexClient.getFileCountPerBlock(url, output)
rucio_filecount_pb = rucioClient.getFileCountPerBlock(output)
all_filecount_pb = set(phedex_filecount_pb) | set(rucio_filecount_pb)
all_blocks = set(map(lambda x: x[0], phedex_filecount_pb)) | set(map(lambda x: x[0], rucio_filecount_pb))

# bellow we will misscount in case there are same blocks in both
# Rucio and Phedex but with different number of files in the two
# systems - they will enter the sum twice, because the two tuples
# will be concidered as two different blocks from the two subsets
# hence the following check:
if len(all_blocks) == len(all_filecount_pb):
phedex_presence[output] = sum(map(lambda x: x[1], all_filecount_pb))
else:
# TODO: to check if we need to rise a higher level of alarm here.
msg = "There are inconsistences of number of files per block"
msg += "between Phedex and Rucio for dataset: {}".format(output)
wfi.sendLog('checkor', msg)
phedex_presence[output] = 0
# we do not announce this output untill the discrepancy from above is resolved
del(all_filecount_pb)
del(all_blocks)
else:
phedex_presence[output] = phedexClient.getFileCountDataset(url, output)

one_output_not_in_phedex = any([Nfiles==0 for Nfiles in phedex_presence.values()])
if one_output_not_in_phedex and 'announce' in assistance_tags:
Expand Down Expand Up @@ -1055,7 +1089,16 @@ def upward( ns ):
assistance_tags.add('filemismatch')
#print this for show and tell if no recovery on-going
for out in dbs_presence:
_,_,missing_phedex,missing_dbs = getDatasetFiles(url, out)
dbs_filenames,phedex_filenames,missing_phedex,missing_dbs = getDatasetFiles(url, out)

# Corrections to the lists of files present in Phedex for the data Tiers managed by Rucio
_,dsn,process_string,tier = output.split('/')
if tier in UC.get('tiers_to_rucio'):
# Here recalculating the filenames as a union of the phedex_files | rucio_files
all_filenames = set(phedex_filenames) | set(rucioClient.getFileNamesDataset(out))
missing_phedex = list(set(dbs_filenames) - all_filenames)
missing_dbs = list(all_filenames - set(dbs_filenames))

if missing_phedex:
wfi.sendLog('checkor',"These %d files are missing in phedex, or extra in dbs, showing %s only\n%s"%(len(missing_phedex),show_N_only,
"\n".join( missing_phedex[:show_N_only] )))
Expand Down
37 changes: 37 additions & 0 deletions phedexClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,43 @@ def getFileCountDataset(url, dataset):
for block in result['phedex']['block']:
files += block['files']
return files

def getFileCountPerBlock(url, dataset):
"""
Returns the number of files per block in a dataset registered in phedex
"""
result = phedexGet(url, '/phedex/datasvc/json/prod/blockreplicas?dataset='+dataset, auth=False)
if 'block' not in result['phedex']:
return {}
elif not result['phedex']['block']:
return {}
# we need blocks to be a list of tuples so we can create a set out of this
blocks = []
#check all blocks
for block in result['phedex']['block']:
# blocks.append({'name':block['name'],
# 'files':block['files']})
blocks.append((block['name'],block['files']))

return blocks


def getFileNamesDataset(url, dataset):
"""
Returns a set of file names in a dataset registered in phedex
"""
result = phedexGet(url, '/phedex/datasvc/json/prod/filereplicas?dataset='+dataset, auth=False)
if 'block' not in result['phedex']:
return set()
elif not result['phedex']['block']:
return set()
files = []
# check all blocks
for block in result['phedex']['block']:
for _file in block['file']:
files.append(_file['name'])
return set(files)


def getTransferPercentage(url, dataset, site):
"""
Expand Down

0 comments on commit 6e337dd

Please sign in to comment.