diff --git a/ci/common-ci b/ci/common-ci index 2a7f877..6ca7c8e 160000 --- a/ci/common-ci +++ b/ci/common-ci @@ -1 +1 @@ -Subproject commit 2a7f8772af6146b408885e4e40c0a99ee25e1f7b +Subproject commit 6ca7c8e15ac6d3460b8bb1eb031b0b622398d677 diff --git a/fiftyone_pipeline_core/fiftyone_pipeline_core/flowdata.py b/fiftyone_pipeline_core/fiftyone_pipeline_core/flowdata.py index 9db5f66..293def9 100755 --- a/fiftyone_pipeline_core/fiftyone_pipeline_core/flowdata.py +++ b/fiftyone_pipeline_core/fiftyone_pipeline_core/flowdata.py @@ -21,8 +21,10 @@ # ******************************************************************** from .evidence import Evidence +from .flowerror import FlowError +from .messages import Messages import traceback - +import sys class FlowData: @@ -73,17 +75,24 @@ def process(self): flow_element.process(self) except Exception: - - self.set_error(flow_element.datakey, traceback.format_exc()) + + flow_error = FlowError(flow_element.datakey, sys.exc_info()[1], traceback.format_exc()) + self.set_error(flow_error) # Set processed flag to true. flowdata can only be processed once self.processed = True - return self else: - self.setError("error", "FlowData already processed") + flow_error = FlowError("global", Exception(Messages.FLOW_DATA_PROCESSED), Messages.FLOW_DATA_PROCESSED) + self.set_error(flow_error) + + if self.errors and self.pipeline.suppress_process_exceptions is False: + errored_flow_element_key = next(iter(self.errors)) + flowErrorObj = self.errors[errored_flow_element_key] + raise flowErrorObj.exception_instance + return self def get_from_element(self, flow_element): @@ -169,25 +178,25 @@ def set_element_data(self, element_data): self.data[element_data.flow_element.datakey] = element_data - def set_error(self, key, error): - + def set_error(self, flow_error): + """! Set error (should be keyed by flowElement datakey) - @type key: string - @param key: a flowElement.datakey - - @type error: string - @param error: Error message + @type error: FlowError + @param error: Flow Error """ - if key not in self.errors: - self.errors[key] = list() + self.errors[flow_error.flow_element] = flow_error + + log_message = "Error occurred during processing" - self.errors[key].append(error) + if flow_error.flow_element: + + log_message = log_message + " of " + flow_error.flow_element + ". \n" + flow_error.exception_traceback - self.pipeline.log("error", error) + self.pipeline.log("error", log_message) def get_evidence_datakey(self): diff --git a/fiftyone_pipeline_core/fiftyone_pipeline_core/flowerror.py b/fiftyone_pipeline_core/fiftyone_pipeline_core/flowerror.py new file mode 100644 index 0000000..7c76da9 --- /dev/null +++ b/fiftyone_pipeline_core/fiftyone_pipeline_core/flowerror.py @@ -0,0 +1,41 @@ +# ********************************************************************* +# This Original Work is copyright of 51 Degrees Mobile Experts Limited. +# Copyright 2019 51 Degrees Mobile Experts Limited, 5 Charlotte Close, +# Caversham, Reading, Berkshire, United Kingdom RG4 7BY. +# +# This Original Work is licensed under the European Union Public Licence (EUPL) +# v.1.2 and is subject to its terms as set out below. +# +# If a copy of the EUPL was not distributed with this file, You can obtain +# one at https://opensource.org/licenses/EUPL-1.2. +# +# The 'Compatible Licences' set out in the Appendix to the EUPL (as may be +# amended by the European Commission) shall be deemed incompatible for +# the purposes of the Work and the provisions of the compatibility +# clause in Article 5 of the EUPL shall not apply. +# +# If using the Work as, or as part of, a network application, by +# including the attribution notice(s) required under Article 5 of the EUPL +# in the end user terms of the application under an appropriate heading, +# such notice(s) shall fulfill the requirements of that article. +# ******************************************************************** + +class FlowError: + + """! + An error that occurred during the processing of an element. + """ + + def __init__(self, flow_element, exception_instance, exception_traceback ): + + """! + FlowError constructor. + + * @param {string} The FlowElement at which the exception occurred. + * @param {Exception} Exception Instance + * @param {String} Exception Backtrace + """ + + self.flow_element = flow_element + self.exception_instance = exception_instance + self.exception_traceback = exception_traceback diff --git a/fiftyone_pipeline_core/fiftyone_pipeline_core/messages.py b/fiftyone_pipeline_core/fiftyone_pipeline_core/messages.py index 5976809..c210c4d 100644 --- a/fiftyone_pipeline_core/fiftyone_pipeline_core/messages.py +++ b/fiftyone_pipeline_core/fiftyone_pipeline_core/messages.py @@ -43,4 +43,7 @@ class Messages(): # Property not found in flowData. This takes the element datakey # and property names as format arguments. PROPERTY_NOT_FOUND = \ - "Property '{}' is not present in the FlowData against '{}' ElementData. " \ No newline at end of file + "Property '{}' is not present in the FlowData against '{}' ElementData. " + + # FlowData already processed. + FLOW_DATA_PROCESSED = "FlowData already processed" \ No newline at end of file diff --git a/fiftyone_pipeline_core/fiftyone_pipeline_core/pipeline.py b/fiftyone_pipeline_core/fiftyone_pipeline_core/pipeline.py index f40b76a..a0711dc 100755 --- a/fiftyone_pipeline_core/fiftyone_pipeline_core/pipeline.py +++ b/fiftyone_pipeline_core/fiftyone_pipeline_core/pipeline.py @@ -33,7 +33,7 @@ class Pipeline: """ - def __init__(self, flow_elements, logger=Logger()): + def __init__(self, flow_elements, logger=Logger(), suppress_process_exceptions = False): """! Pipeline constructor. @@ -44,6 +44,11 @@ def __init__(self, flow_elements, logger=Logger()): @type logger: Logger @param logger: A logger to attach to the pipeline + @type suppress_process_exceptions: boolean + @param suppress_process_exceptions: If true then pipeline will suppress + exceptions added to FlowData.errors otherwise will throw the exception + occurred during the processing of first element. + @rtype: Pipeline @return: Returns a Pipeline @@ -57,6 +62,8 @@ def __init__(self, flow_elements, logger=Logger()): self.flow_elements_display_list = [] + self.suppress_process_exceptions = suppress_process_exceptions + for flow_element in flow_elements: # Notify element that it has been registered in the pipeline diff --git a/fiftyone_pipeline_core/tests/classes/testpipeline.py b/fiftyone_pipeline_core/tests/classes/testpipeline.py index b90d2ea..a58e22a 100644 --- a/fiftyone_pipeline_core/tests/classes/testpipeline.py +++ b/fiftyone_pipeline_core/tests/classes/testpipeline.py @@ -13,7 +13,7 @@ # Test Pipeline builder for use with unit tests class TestPipeline(): - def __init__(self): + def __init__(self, suppressException = True): logger = MemoryLogger("info") self.flowElement1 = ExampleFlowElement1() @@ -25,6 +25,8 @@ def __init__(self): .add(ExampleFlowElement2())\ .add_logger(logger)\ .build() + + self.pipeline.suppress_process_exceptions = suppressException self.flowdata = self.pipeline.create_flowdata() self.flowdata.evidence.add("header.user-agent", "test") self.flowdata.evidence.add("some.other-evidence", "test") diff --git a/fiftyone_pipeline_core/tests/test_core.py b/fiftyone_pipeline_core/tests/test_core.py index 1a868d6..060265c 100644 --- a/fiftyone_pipeline_core/tests/test_core.py +++ b/fiftyone_pipeline_core/tests/test_core.py @@ -31,7 +31,6 @@ from classes.stopflowdata import StopFlowData from classes.errorflowdata import ErrorFlowData - ###################################### # The Tests @@ -62,7 +61,7 @@ def testEvidenceKeyFilter(self): self.assertTrue(nullEvidence == None) - # # Test Getter methods + # Test Getter methods def testGet(self): testPipeline = TestPipeline() @@ -96,15 +95,44 @@ def testStopFlowData(self): self.assertEqual(message, "There is no element data for example2 against this flow data. Available element data keys are: ['example1', 'error', 'apv', 'stop', 'example2']") + # Test exception is thrown when not suppressed. + def testErrors_dont_suppress_exception(self): + + try: + testPipeline = TestPipeline(False) + self.assertFalse("Exception is expected.") + except Exception as e: + self.assertTrue(str(e) is not None) + # Test errors are returned def testErrors(self): testPipeline = TestPipeline() getValue = testPipeline.flowdata.errors["error"] - self.assertTrue(getValue is not None) + self.assertTrue(getValue.flow_element == "error") + self.assertTrue(getValue.exception_instance is not None) + self.assertTrue(getValue.exception_traceback is not None) + + # Test Already Processed FlowData + def testErrors_already_processed(self): + + flowElement1 = ExampleFlowElement1() + logger = MemoryLogger("info") + pipeline = (PipelineBuilder())\ + .add(flowElement1)\ + .add_logger(logger)\ + .build() + flowdata = pipeline.create_flowdata() + flowdata.process() - # Test aspectPropertyValue wrapper + try: + flowdata.process() + self.assertFalse("Exception is expected.") + except Exception as e: + self.assertEqual("FlowData already processed", str(e)) + + # Test aspectPropertyValue wrapper def testAPV(self): testPipeline = TestPipeline() @@ -144,5 +172,4 @@ def test_build_from_config(self): getValue = fd.get("example1").get("integer") self.assertTrue(getValue == 5) - \ No newline at end of file