Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DF/091: improve error handling #320

Merged
merged 13 commits into from
Apr 13, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Utils/Dataflow/019_esFormat/esFormat.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ function getAction($row) {

if (isset($row['_update']) and $row['_update'] === true) {
$action = 'update';
} elseif (isset($row['_incomplete']) and $row['_incomplete'] === true) {
$action = 'update';
} else {
$action = $DEFAULT_ACTION;
}
Expand Down
107 changes: 77 additions & 30 deletions Utils/Dataflow/091_datasetsRucio/datasets_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
sys.stderr.write("(TRACE) Set VIRTUAL_ENV: %s\n"
% os.environ["VIRTUAL_ENV"])
import rucio.client
from rucio.common.exception import RucioException
from rucio.common.exception import (RucioException,
DataIdentifierNotFound)
except ImportError, err:
sys.stderr.write("(ERROR) Failed to import Rucio module: %s\n" % err)
except Exception, err:
Expand All @@ -43,6 +44,7 @@
import pyDKB
from pyDKB.dataflow import messageType
from pyDKB.dataflow.exceptions import DataflowException
from pyDKB.common.types import logLevel
except Exception, err:
sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err)
sys.exit(1)
Expand Down Expand Up @@ -140,15 +142,33 @@ def process_output_ds(stage, message):
if type(datasets) != list:
datasets = [datasets]

for dataset in datasets:
ds = get_output_ds_info(dataset)
mfields = META_FIELDS[OUTPUT]
for ds_name in datasets:
incompl = None
try:
ds = get_ds_info(ds_name, mfields)
except RucioException, err:
stage.log(["Mark message as incomplete (failed to get information"
" from Rucio for: %s)." % ds_name,
"Reason: %s." % str(err)],
Evildoor marked this conversation as resolved.
Show resolved Hide resolved
logLevel.WARN)
incompl = True
ds = {}

ds['datasetname'] = ds_name
ds['taskid'] = json_str.get('taskid')
if not add_es_index_info(ds):
sys.stderr.write("(WARN) Skip message (not enough info"
" for ES indexing).\n")
return True
continue
del(ds['taskid'])
stage.output(pyDKB.dataflow.communication.messages.JSONMessage(ds))

if not is_data_complete(ds, mfields.values()):
incompl = True

msg = pyDKB.dataflow.communication.messages.JSONMessage(ds)
msg.incomplete(incompl)
stage.output(msg)

return True

Expand Down Expand Up @@ -181,7 +201,7 @@ def skip_process_output_ds(stage, message):
if not add_es_index_info(ds):
sys.stderr.write("(WARN) Skip message (not enough info"
" for ES indexing).\n")
return True
continue
del(ds['taskid'])
out_msg = pyDKB.dataflow.communication.messages.JSONMessage(ds)
out_msg.incomplete(True)
Expand All @@ -200,43 +220,54 @@ def process_input_ds(stage, message):
data = message.content()
mfields = META_FIELDS[INPUT]
ds_name = data.get(SRC_FIELD[INPUT])
incompl = None
if ds_name:
try:
mdata = get_metadata(ds_name, mfields.keys())
adjust_metadata(mdata)
for mkey in mdata:
data[mfields[mkey]] = mdata[mkey]
except RucioException:
data[mfields['bytes']] = -1
data[mfields['deleted']] = True
stage.output(pyDKB.dataflow.communication.messages.JSONMessage(data))
ds = get_ds_info(ds_name, mfields)
except RucioException, err:
stage.log(["Mark message as incomplete (failed to get information"
" from Rucio for: %s)." % ds_name,
"Reason: %s." % str(err)],
logLevel.WARN)
incompl = True
data.update(ds)
if not is_data_complete(data, mfields.values()):
incompl = True

msg = pyDKB.dataflow.communication.messages.JSONMessage(data)
msg.incomplete(incompl)
stage.output(msg)

return True


def get_output_ds_info(dataset):
def get_ds_info(dataset, mfields):
""" Construct dictionary with dataset info.

Dict format:
{"deleted": true | false,
"datasetname": "<DS_NAME>",
"bytes": <BYTES>}
{<deleted>: true | false,
<bytes>: <BYTES>,
<events>: <EVENTS>,
...}

:param dataset: dataset name
:return: dict
:type dataset: str
:param mfields: fields to get from Rucio metadata
with aliases to be used instead of Rucio field names:
``{<rucio_field>: <alias>, ...}``
:type fields: dict

:return: dict with dataset info
:rtype: dict
"""
ds_dict = {}
ds_dict['datasetname'] = dataset
try:
mfields = META_FIELDS[OUTPUT]
mdata = get_metadata(dataset, mfields.keys())
adjust_metadata(mdata)
for mkey in mfields:
ds_dict[mfields[mkey]] = mdata[mkey]
except RucioException:
# if dataset wasn't found in Rucio, it means that it has been deleted
# from the Rucio catalog. In this case 'deleted' is set to TRUE and
# the length of file is set to -1
ds_dict[mfields['bytes']] = -1
for mkey in mdata:
if mkey in mfields:
ds_dict[mfields[mkey]] = mdata[mkey]
except DataIdentifierNotFound, err:
ds_dict[mfields['deleted']] = True
return ds_dict

Expand Down Expand Up @@ -298,17 +329,33 @@ def adjust_metadata(mdata):
" 'dict' (get '%s')" % mdata.__class__.__name__)
for key in mdata:
if mdata[key] == 'null':
mdata[key] = None
del(mdata[key])
if 'bytes' in mdata.keys():
val = mdata['bytes']
if val is None:
mdata['bytes'] = -1
# 'bytes' is None when, e.g., dataset is a container, and all the
# datasets within the container are already deleted
# (yet the container itself still exists)
mdata['deleted'] = True
else:
mdata['deleted'] = False
return mdata


def is_data_complete(data, fields):
""" Check if data contains all the required fields.

:param data: data to be checked
:type data: dict
:param fields: list of fields data must contain
:type fields: list

:return: True/False
:rtype: bool
"""
return set(fields).issubset(set(data.keys()))


def add_es_index_info(data):
""" Update data with required for ES indexing info.

Expand Down