Skip to content

Commit

Permalink
Merge pull request 5942 from hotfix/v4.3.4 into master
Browse files Browse the repository at this point in the history
  • Loading branch information
Project Collection Build Service (51degrees) authored and Project Collection Build Service (51degrees) committed Oct 1, 2021
2 parents 6ebdc0d + 07a4164 commit 413f4db
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 25 deletions.
41 changes: 25 additions & 16 deletions fiftyone_pipeline_core/fiftyone_pipeline_core/flowdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
# ********************************************************************

from .evidence import Evidence
from .flowerror import FlowError
from .messages import Messages
import traceback

import sys

class FlowData:

Expand Down Expand Up @@ -73,17 +75,24 @@ def process(self):
flow_element.process(self)

except Exception:

self.set_error(flow_element.datakey, traceback.format_exc())

flow_error = FlowError(flow_element.datakey, sys.exc_info()[1], traceback.format_exc())
self.set_error(flow_error)

# Set processed flag to true. flowdata can only be processed once

self.processed = True
return self

else:
self.setError("error", "FlowData already processed")
flow_error = FlowError("global", Exception(Messages.FLOW_DATA_PROCESSED), Messages.FLOW_DATA_PROCESSED)
self.set_error(flow_error)

if self.errors and self.pipeline.suppress_process_exceptions is False:
errored_flow_element_key = next(iter(self.errors))
flowErrorObj = self.errors[errored_flow_element_key]
raise flowErrorObj.exception_instance

return self

def get_from_element(self, flow_element):

Expand Down Expand Up @@ -169,25 +178,25 @@ def set_element_data(self, element_data):
self.data[element_data.flow_element.datakey] = element_data


def set_error(self, key, error):

def set_error(self, flow_error):
"""!
Set error (should be keyed by flowElement datakey)
@type key: string
@param key: a flowElement.datakey
@type error: string
@param error: Error message
@type error: FlowError
@param error: Flow Error
"""

if key not in self.errors:
self.errors[key] = list()
self.errors[flow_error.flow_element] = flow_error

log_message = "Error occurred during processing"

self.errors[key].append(error)
if flow_error.flow_element:

log_message = log_message + " of " + flow_error.flow_element + ". \n" + flow_error.exception_traceback

self.pipeline.log("error", error)
self.pipeline.log("error", log_message)


def get_evidence_datakey(self):
Expand Down
41 changes: 41 additions & 0 deletions fiftyone_pipeline_core/fiftyone_pipeline_core/flowerror.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# *********************************************************************
# This Original Work is copyright of 51 Degrees Mobile Experts Limited.
# Copyright 2019 51 Degrees Mobile Experts Limited, 5 Charlotte Close,
# Caversham, Reading, Berkshire, United Kingdom RG4 7BY.
#
# This Original Work is licensed under the European Union Public Licence (EUPL)
# v.1.2 and is subject to its terms as set out below.
#
# If a copy of the EUPL was not distributed with this file, You can obtain
# one at https://opensource.org/licenses/EUPL-1.2.
#
# The 'Compatible Licences' set out in the Appendix to the EUPL (as may be
# amended by the European Commission) shall be deemed incompatible for
# the purposes of the Work and the provisions of the compatibility
# clause in Article 5 of the EUPL shall not apply.
#
# If using the Work as, or as part of, a network application, by
# including the attribution notice(s) required under Article 5 of the EUPL
# in the end user terms of the application under an appropriate heading,
# such notice(s) shall fulfill the requirements of that article.
# ********************************************************************

class FlowError:

"""!
An error that occurred during the processing of an element.
"""

def __init__(self, flow_element, exception_instance, exception_traceback ):

"""!
FlowError constructor.
* @param {string} The FlowElement at which the exception occurred.
* @param {Exception} Exception Instance
* @param {String} Exception Backtrace
"""

self.flow_element = flow_element
self.exception_instance = exception_instance
self.exception_traceback = exception_traceback
5 changes: 4 additions & 1 deletion fiftyone_pipeline_core/fiftyone_pipeline_core/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ class Messages():
# Property not found in flowData. This takes the element datakey
# and property names as format arguments.
PROPERTY_NOT_FOUND = \
"Property '{}' is not present in the FlowData against '{}' ElementData. "
"Property '{}' is not present in the FlowData against '{}' ElementData. "

# FlowData already processed.
FLOW_DATA_PROCESSED = "FlowData already processed"
9 changes: 8 additions & 1 deletion fiftyone_pipeline_core/fiftyone_pipeline_core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Pipeline:
"""

def __init__(self, flow_elements, logger=Logger()):
def __init__(self, flow_elements, logger=Logger(), suppress_process_exceptions = False):

"""!
Pipeline constructor.
Expand All @@ -44,6 +44,11 @@ def __init__(self, flow_elements, logger=Logger()):
@type logger: Logger
@param logger: A logger to attach to the pipeline
@type suppress_process_exceptions: boolean
@param suppress_process_exceptions: If true then pipeline will suppress
exceptions added to FlowData.errors otherwise will throw the exception
occurred during the processing of first element.
@rtype: Pipeline
@return: Returns a Pipeline
Expand All @@ -57,6 +62,8 @@ def __init__(self, flow_elements, logger=Logger()):

self.flow_elements_display_list = []

self.suppress_process_exceptions = suppress_process_exceptions

for flow_element in flow_elements:

# Notify element that it has been registered in the pipeline
Expand Down
4 changes: 3 additions & 1 deletion fiftyone_pipeline_core/tests/classes/testpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# Test Pipeline builder for use with unit tests
class TestPipeline():

def __init__(self):
def __init__(self, suppressException = True):

logger = MemoryLogger("info")
self.flowElement1 = ExampleFlowElement1()
Expand All @@ -25,6 +25,8 @@ def __init__(self):
.add(ExampleFlowElement2())\
.add_logger(logger)\
.build()

self.pipeline.suppress_process_exceptions = suppressException
self.flowdata = self.pipeline.create_flowdata()
self.flowdata.evidence.add("header.user-agent", "test")
self.flowdata.evidence.add("some.other-evidence", "test")
Expand Down
37 changes: 32 additions & 5 deletions fiftyone_pipeline_core/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from classes.stopflowdata import StopFlowData
from classes.errorflowdata import ErrorFlowData


######################################
# The Tests

Expand Down Expand Up @@ -62,7 +61,7 @@ def testEvidenceKeyFilter(self):
self.assertTrue(nullEvidence == None)


# # Test Getter methods
# Test Getter methods
def testGet(self):

testPipeline = TestPipeline()
Expand Down Expand Up @@ -96,15 +95,44 @@ def testStopFlowData(self):
self.assertEqual(message, "There is no element data for example2 against this flow data. Available element data keys are: ['example1', 'error', 'apv', 'stop', 'example2']")


# Test exception is thrown when not suppressed.
def testErrors_dont_suppress_exception(self):

try:
testPipeline = TestPipeline(False)
self.assertFalse("Exception is expected.")
except Exception as e:
self.assertTrue(str(e) is not None)

# Test errors are returned
def testErrors(self):

testPipeline = TestPipeline()
getValue = testPipeline.flowdata.errors["error"]
self.assertTrue(getValue is not None)
self.assertTrue(getValue.flow_element == "error")
self.assertTrue(getValue.exception_instance is not None)
self.assertTrue(getValue.exception_traceback is not None)

# Test Already Processed FlowData
def testErrors_already_processed(self):

flowElement1 = ExampleFlowElement1()
logger = MemoryLogger("info")
pipeline = (PipelineBuilder())\
.add(flowElement1)\
.add_logger(logger)\
.build()
flowdata = pipeline.create_flowdata()
flowdata.process()

# Test aspectPropertyValue wrapper
try:
flowdata.process()
self.assertFalse("Exception is expected.")
except Exception as e:
self.assertEqual("FlowData already processed", str(e))


# Test aspectPropertyValue wrapper
def testAPV(self):

testPipeline = TestPipeline()
Expand Down Expand Up @@ -144,5 +172,4 @@ def test_build_from_config(self):

getValue = fd.get("example1").get("integer")
self.assertTrue(getValue == 5)


0 comments on commit 413f4db

Please sign in to comment.