Skip to content

Commit

Permalink
DF/091: improve error handling ('_incomlete' messages).
Browse files Browse the repository at this point in the history
Changes in error handling.
1. If Rucio returns `DatasetIdentifierNotFound`, set `deleted` to
   `True` (previously was treated as any other `RucioException`).
2. In case of `RucioException` mark message as "incomplete" -- this
   record will be written to ES in "update" mode, not "insert"
   (previously `deleted` and `bytes` would be set to `True` and `-1`).
3. If some of required fields can not be extracted from Rucio (or have
   value `null`), they are left unset (previously would be set to
   `None`.
4. If in the result message some fields of those that were supposed to
   be added at this stage are missed, the message is marked as
   `_incomplete`.

What is not good in this logic: if dataset was removed from Rucio, it
will always be marked as "requiring update". But for now we have only
this option: to use "update" instead of "insert", we need to know that
the message is incomplete; and when the message coming to Stage 019 is
incomplete -- it should be marked as "requiring update" for further
investigation.

Maybe the logic could be extended and "_incomplete" should be turned
into two different markers: "update since the original source removed
useful data" and "update since we failed to connect to the original
source", but... not now.
  • Loading branch information
mgolosova committed Feb 17, 2020
1 parent bfc096f commit 5be0448
Showing 1 changed file with 58 additions and 14 deletions.
72 changes: 58 additions & 14 deletions Utils/Dataflow/091_datasetsRucio/datasets_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
sys.stderr.write("(TRACE) Set VIRTUAL_ENV: %s\n"
% os.environ["VIRTUAL_ENV"])
import rucio.client
from rucio.common.exception import RucioException
from rucio.common.exception import (RucioException,
DataIdentifierNotFound)
except ImportError, err:
sys.stderr.write("(ERROR) Failed to import Rucio module: %s\n" % err)
except Exception, err:
Expand All @@ -43,6 +44,7 @@
import pyDKB
from pyDKB.dataflow import messageType
from pyDKB.dataflow.exceptions import DataflowException
from pyDKB.common.types import logLevel
except Exception, err:
sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err)
sys.exit(1)
Expand Down Expand Up @@ -142,15 +144,31 @@ def process_output_ds(stage, message):

mfields = META_FIELDS[OUTPUT]
for ds_name in datasets:
ds = get_ds_info(ds_name, mfields)
incompl = None
try:
ds = get_ds_info(ds_name, mfields)
except RucioException, err:
stage.log(["Mark message as incomplete (failed to get information"
" from Rucio for: %s)." % ds_name,
"Reason: %s." % str(err)],
logLevel.WARN)
incompl = True
ds = {}

ds['datasetname'] = ds_name
ds['taskid'] = json_str.get('taskid')
if not add_es_index_info(ds):
sys.stderr.write("(WARN) Skip message (not enough info"
" for ES indexing).\n")
continue
del(ds['taskid'])
stage.output(pyDKB.dataflow.communication.messages.JSONMessage(ds))

if not is_data_complete(ds, mfields.values()):
incompl = True

msg = pyDKB.dataflow.communication.messages.JSONMessage(ds)
msg.incomplete(incompl)
stage.output(msg)

return True

Expand Down Expand Up @@ -202,10 +220,23 @@ def process_input_ds(stage, message):
data = message.content()
mfields = META_FIELDS[INPUT]
ds_name = data.get(SRC_FIELD[INPUT])
incompl = None
if ds_name:
ds = get_ds_info(ds_name, mfields)
try:
ds = get_ds_info(ds_name, mfields)
except RucioException, err:
stage.log(["Mark message as incomplete (failed to get information"
" from Rucio for: %s)." % ds_name,
"Reason: %s." % str(err)],
logLevel.WARN)
incompl = True
data.update(ds)
stage.output(pyDKB.dataflow.communication.messages.JSONMessage(data))
if not is_data_complete(data, mfields.values()):
incompl = True

msg = pyDKB.dataflow.communication.messages.JSONMessage(data)
msg.incomplete(incompl)
stage.output(msg)

return True

Expand Down Expand Up @@ -233,13 +264,10 @@ def get_ds_info(dataset, mfields):
try:
mdata = get_metadata(dataset, mfields.keys())
adjust_metadata(mdata)
for mkey in mfields:
ds_dict[mfields[mkey]] = mdata[mkey]
except RucioException:
# if dataset wasn't found in Rucio, it means that it has been deleted
# from the Rucio catalog. In this case 'deleted' is set to TRUE and
# the length of file is set to -1
ds_dict[mfields['bytes']] = -1
for mkey in mdata:
if mkey in mfields:
ds_dict[mfields[mkey]] = mdata[mkey]
except DataIdentifierNotFound, err:
ds_dict[mfields['deleted']] = True
return ds_dict

Expand Down Expand Up @@ -301,17 +329,33 @@ def adjust_metadata(mdata):
" 'dict' (get '%s')" % mdata.__class__.__name__)
for key in mdata:
if mdata[key] == 'null':
mdata[key] = None
del(mdata[key])
if 'bytes' in mdata.keys():
val = mdata['bytes']
if val is None:
mdata['bytes'] = -1
# 'bytes' is None when, e.g., dataset is a container, and all the
# datasets within the container are already deleted
# (yet the container itself still exists)
mdata['deleted'] = True
else:
mdata['deleted'] = False
return mdata


def is_data_complete(data, fields):
""" Check if data contains all the required fields.
:param data: data to be checked
:type data: dict
:param fields: list of fields data must contain
:type fields: list
:return: True/False
:rtype: bool
"""
return set(fields).issubset(set(data.keys()))


def add_es_index_info(data):
""" Update data with required for ES indexing info.
Expand Down

0 comments on commit 5be0448

Please sign in to comment.