From 109f81e02cd44d9f9a52e7108e5566a6ef168880 Mon Sep 17 00:00:00 2001 From: MarcusRostSAP Date: Wed, 5 Jun 2024 09:38:54 +0200 Subject: [PATCH] Clean up --- explainer/explainer.py | 369 -------------------- explainer/tutorial/explainer_tutorial.ipynb | 308 ---------------- tutorial/explainer_tutorial_1.ipynb | 22 +- 3 files changed, 10 insertions(+), 689 deletions(-) delete mode 100644 explainer/tutorial/explainer_tutorial.ipynb diff --git a/explainer/explainer.py b/explainer/explainer.py index ffb2811..8baa1f5 100644 --- a/explainer/explainer.py +++ b/explainer/explainer.py @@ -105,375 +105,6 @@ def __iter__(self): yield Trace(list(trace_tuple)) -class Explainer: - def __init__(self): - """ - Initializes an Explainer instance. - """ - self.constraints = [] # List to store constraints (regex patterns) - self.adherent_trace = None - - def add_constraint(self, regex): - """ - Adds a new constraint and updates the nodes list. - - :param regex: A regular expression representing the constraint. - """ - self.constraints.append(regex) - if self.contradiction(): - self.constraints.remove(regex) - print(f"Constraint {regex} contradicts the other constraints.") - - def remove_constraint(self, idx): - """ - Removes a constraint by index and updates the nodes list if necessary. - - :param idx: Index of the constraint to be removed. - """ - if 0 <= idx < len(self.constraints): - removed_regex = self.constraints.pop(idx) - removed_nodes = set(filter(str.isalpha, removed_regex)) - - # Re-evaluate nodes to keep based on remaining constraints - remaining_nodes = set(filter(str.isalpha, "".join(self.constraints))) - self.nodes = remaining_nodes - - # Optionally, remove nodes that are no longer in any constraint - for node in removed_nodes: - if node not in remaining_nodes: - self.nodes.discard(node) - - def activation(self, trace, constraints=None): - """ - Checks if any of the nodes in the trace activates any constraint. - - :param trace: A Trace instance. - :return: Boolean indicating if any constraint is activated. - """ - if not constraints: - constraints = self.constraints - con_activation = [0] * len(constraints) - activated = False - for idx, con in enumerate(constraints): - if activated: - activated = False - continue - target = self.identify_existance_constraints(con) - if target: - con_activation[idx] = 1 - continue - for event in trace: - if event in con: - con_activation[idx] = 1 - activated = True - break - return con_activation - - def identify_existance_constraints(self, pattern): - """ - Identifies existance constraints within a pattern. - - :param pattern: The constraint pattern as a string. - :return: A tuple indicating the type of existance constraint and the node involved. - """ - # Check for AtLeastOne constraint - for match in re.finditer(r"(? 100: - return f"{explanation}\n Maximum depth of {depth -1} reached" - score = self.evaluate_similarity(working_trace) - return self.operate_on_trace(working_trace, score, explanation, depth) - - def operate_on_trace(self, trace, score, explanation_path, depth=0): - """ - Finds and applies modifications to the trace to make it conformant. - - :param trace: The trace to be modified. - :param score: The similarity score of the trace. - :param explanation_path: The current explanation path. - :param depth: The current recursion depth. - :return: A string explaining why the best subtrace is non-conformant or a message indicating the maximum depth has been reached. - """ - explanation = None - counter_factuals = self.modify_subtrace(trace) - best_subtrace = None - best_score = -float("inf") - for subtrace in counter_factuals: - current_score = self.evaluate_similarity(subtrace[0]) - if current_score > best_score and current_score > score: - best_score = current_score - best_subtrace = subtrace[0] - explanation = subtrace[1] - if best_subtrace == None: - for subtrace in counter_factuals: - self.operate_on_trace(subtrace[0], score, explanation_path, depth + 1) - explanation_string = explanation_path + "\n" + str(explanation) - return self.counter_factual_helper(best_subtrace, explanation_string, depth + 1) - - def get_nodes_from_constraint(self, constraint=None): - """ - Extracts unique nodes from a constraint pattern. - - :param constraint: The constraint pattern as a string. - :return: A list of unique nodes found within the constraint. - """ - if constraint is None: - all_nodes = set() - for con in self.constraints: - all_nodes.update(re.findall(r"[A-Za-z]", con)) - return list(set(all_nodes)) - else: - return list(set(re.findall(r"[A-Za-z]", constraint))) - - def modify_subtrace(self, trace): - """ - Modifies the given trace to meet constraints by adding nodes where the pattern fails. - - Parameters: - - trace: A list of node identifiers - - Returns: - - A list of potential subtraces each modified to meet constraints. - """ - potential_subtraces = [] - possible_additions = self.get_nodes_from_constraint() - for i, s_trace in enumerate(get_iterative_subtrace(trace)): - for con in self.constraints: - new_trace_str = "".join(s_trace) - match = re.match(new_trace_str, con) - if not match: - for add in possible_additions: - potential_subtraces.append( - [ - Trace(s_trace + [add] + trace.nodes[i + 1 :]), - f"Addition (Added {add} at position {i+1}): " - + "->".join(s_trace + [add] + trace.nodes[i + 1 :]), - ] - ) - potential_subtraces.append( - [ - Trace(s_trace[:-1] + [add] + trace.nodes[i:]), - f"Addition (Added {add} at position {i}): " - + "->".join(s_trace[:-1] + [add] + trace.nodes[i:]), - ] - ) - - potential_subtraces.append( - [ - Trace(s_trace[:-1] + trace.nodes[i + 1 :]), - f"Subtraction (Removed {s_trace[i]} from position {i}): " - + "->".join(s_trace[:-1] + trace.nodes[i + 1 :]), - ] - ) - return potential_subtraces - - def determine_shapley_value(self, log, constraints, index): - """Determines the Shapley value-based contribution of a constraint to a the - overall conformance rate. - Args: - log (dictionary): The event log, where keys are strings and values are - ints - constraints (list): A list of constraints (regexp strings) - index (int): The - Returns: - float: The contribution of the constraint to the overall conformance - rate - """ - if len(constraints) < index: - raise Exception("Constraint not in constraint list.") - contributor = constraints[index] - sub_ctrbs = [] - reduced_constraints = [c for c in constraints if not c == contributor] - subsets = determine_powerset(reduced_constraints) - for subset in subsets: - lsubset = list(subset) - constraints_without = [c for c in constraints if c in lsubset] - constraints_with = [c for c in constraints if c in lsubset + [contributor]] - weight = ( - math.factorial(len(lsubset)) - * math.factorial(len(constraints) - 1 - len(lsubset)) - ) / math.factorial(len(constraints)) - sub_ctrb = weight * ( - self.determine_conformance_rate(log, constraints_without) - - self.determine_conformance_rate(log, constraints_with) - ) - sub_ctrbs.append(sub_ctrb) - return sum(sub_ctrbs) - - def evaluate_similarity(self, trace): - """ - Calculates the similarity between the adherent trace and the given trace using the Levenshtein distance. - - :param trace: The trace to compare with the adherent trace. - :return: A normalized score indicating the similarity between the adherent trace and the given trace. - """ - length = len(self.adherent_trace) - trace_len = len("".join(trace)) - lev_distance = levenshtein_distance(self.adherent_trace, "".join(trace)) - max_distance = max(length, trace_len) - normalized_score = 1 - lev_distance / max_distance - return normalized_score - - def determine_conformance_rate(self, event_log, constraints=None): - """ - Determines the conformance rate of the event log based on the given constraints. - - :param event_log: The event log to analyze. - :param constraints: The constraints to check against the event log. - :return: The conformance rate as a float between 0 and 1, or a message if no constraints are provided. - """ - if not self.constraints and not constraints: - return "The explainer have no constraints" - len_log = len(event_log) - if len_log == 0: - return 1 - non_conformant = 0 - if constraints == None: - constraints = self.constraints - for trace, count in event_log.log.items(): - for con in constraints: - if not re.search(con, "".join(trace)): - non_conformant += count - break - return (len_log - non_conformant) / len_log - - def trace_contribution_to_conformance_loss( - self, event_log, trace, constraints=None - ): - """ - Calculates the contribution of a specific trace to the conformance loss of the event log. - - :param event_log: The event log to analyze. - :param trace: The trace to calculate its contribution. - :param constraints: The constraints to check against the event log. - :return: The contribution of the trace to the conformance loss as a float between 0 and 1. - """ - if not constraints: - constraints = self.constraints - total_traces = len(event_log) - contribution_of_trace = 0 - for t, count in event_log.log.items(): - if not self.conformant(t, constraints): - if trace.nodes == list(t): - contribution_of_trace = count - - return contribution_of_trace / total_traces - - def determine_powerset(elements): """Determines the powerset of a list of elements Args: diff --git a/explainer/tutorial/explainer_tutorial.ipynb b/explainer/tutorial/explainer_tutorial.ipynb deleted file mode 100644 index c8c0504..0000000 --- a/explainer/tutorial/explainer_tutorial.ipynb +++ /dev/null @@ -1,308 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Explainer utility in BPMN2CONSTRAINTS\n", - "\n", - "In this notebook, we explore the `Explainer` class, designed to analyze and explain the conformance of traces against predefined constraints. Trace analysis is crucial in domains such as process mining, where understanding the behavior of system executions against expected models can uncover inefficiencies, deviations, or compliance issues.\n", - "\n", - "The constraints currently consists of basic regex, this is because of it's similiarities and likeness to declarative constraints used in BPMN2CONSTRAINTS\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Step 1: Setup" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "metadata": {}, - "outputs": [], - "source": [ - "import sys\n", - "sys.path.append('../')\n", - "from explainer import Explainer, Trace" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Step 2: Basic Usage\n", - "Let's start by creating an instance of the `Explainer` and adding a simple constraint that a valid trace should contain the sequence \"A\" followed by \"B\" and then \"C\".\n" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [], - "source": [ - "explainer = Explainer()\n", - "explainer.add_constraint('A.*B.*C')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Step 3: Analyzing Trace Conformance\n", - "\n", - "Now, we'll create a trace and check if it conforms to the constraints we've defined." - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Is the trace conformant? True\n" - ] - } - ], - "source": [ - "trace = Trace(['A', 'X', 'B', 'Y', 'C'])\n", - "is_conformant = explainer.conformant(trace)\n", - "print(f\"Is the trace conformant? {is_conformant}\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Step 4: Explaining Non-conformance\n", - "\n", - "If a trace is not conformant, we can use the `minimal_expl` and `counterfactual_expl` methods to understand why and how to adjust the trace.\n" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Constraint: A.*B.*C\n", - "Trace:['A', 'C']\n", - "\n", - "Addition (Added B at position 1): A->B->C\n", - "Non-conformance due to: Constraint (A.*B.*C) is violated by subtrace: ('A', 'C')\n", - "-----------\n", - "Constraint: A.*B.*C\n", - "Trace:['C', 'B', 'A']\n", - "\n", - "Addition (Added A at position 1): C->A->B->A\n", - "Subtraction (Removed C from position 0): A->B->A\n", - "Addition (Added C at position 2): A->B->C->A\n", - "Non-conformance due to: Constraint (A.*B.*C) is violated by subtrace: ('C', 'B')\n", - "-----------\n", - "Constraint: A.*B.*C\n", - "Trace:['A', 'A', 'C']\n", - "\n", - "Addition (Added B at position 2): A->A->B->C\n", - "Non-conformance due to: Constraint (A.*B.*C) is violated by subtrace: ('A', 'A')\n", - "-----------\n", - "Constraint: A.*B.*C\n", - "Trace:['A', 'A', 'C', 'A', 'TEST', 'A', 'C', 'X', 'Y']\n", - "-----------\n", - "Constraint: AC\n", - "Trace:['A', 'X', 'C']\n", - "\n", - "Subtraction (Removed X from position 1): A->C\n", - "Non-conformance due to: Constraint (AC) is violated by subtrace: ('A', 'X')\n", - "-----------\n", - "constraint: AC\n", - "constraint: B.*A.*B.*C\n", - "constraint: A.*B.*C.*\n", - "constraint: A.*D.*B*\n", - "constraint: A[^D]*B\n", - "constraint: B.*[^X].*\n", - "Trace:['A', 'X', 'C']\n", - "\n", - "Subtraction (Removed X from position 1): A->C\n", - "Non-conformance due to: Constraint (AC) is violated by subtrace: ('A', 'X')\n" - ] - } - ], - "source": [ - "non_conformant_trace = Trace(['A', 'C'])\n", - "print('Constraint: A.*B.*C')\n", - "print('Trace:' + str(non_conformant_trace.nodes))\n", - "print(explainer.counterfactual_expl(non_conformant_trace))\n", - "print(explainer.minimal_expl(non_conformant_trace))\n", - "\n", - "non_conformant_trace = Trace(['C', 'B', 'A'])\n", - "print('-----------')\n", - "print('Constraint: A.*B.*C')\n", - "print('Trace:' + str(non_conformant_trace.nodes))\n", - "print(explainer.counterfactual_expl(non_conformant_trace))\n", - "print(explainer.minimal_expl(non_conformant_trace))\n", - "\n", - "non_conformant_trace = Trace(['A','A','C'])\n", - "print('-----------')\n", - "print('Constraint: A.*B.*C')\n", - "print('Trace:' + str(non_conformant_trace.nodes))\n", - "print(explainer.counterfactual_expl(non_conformant_trace))\n", - "print(explainer.minimal_expl(non_conformant_trace))\n", - "\n", - "\n", - "non_conformant_trace = Trace(['A','A','C','A','TEST','A','C', 'X', 'Y']) \n", - "print('-----------')\n", - "print('Constraint: A.*B.*C')\n", - "print('Trace:' + str(non_conformant_trace.nodes))\n", - "#print(explainer.counterfactual_expl(non_conformant_trace))\n", - "#print(explainer.minimal_expl(non_conformant_trace))\n", - "\n", - "\n", - "explainer.remove_constraint(0)\n", - "explainer.add_constraint('AC')\n", - "non_conformant_trace = Trace(['A', 'X', 'C']) #Substraction\n", - "print('-----------')\n", - "print('Constraint: AC')\n", - "print('Trace:' + str(non_conformant_trace.nodes))\n", - "print(explainer.counterfactual_expl(non_conformant_trace))\n", - "print(explainer.minimal_expl(non_conformant_trace))\n", - "print('-----------')\n", - "\n", - "explainer.add_constraint('B.*A.*B.*C')\n", - "explainer.add_constraint('A.*B.*C.*')\n", - "explainer.add_constraint('A.*D.*B*')\n", - "explainer.add_constraint('A[^D]*B')\n", - "explainer.add_constraint('B.*[^X].*')\n", - "non_conformant_trace = Trace(['A', 'X', 'C']) #Substraction\n", - "for con in explainer.constraints:\n", - " print(f'constraint: {con}')\n", - "print('Trace:' + str(non_conformant_trace.nodes))\n", - "print(explainer.counterfactual_expl(non_conformant_trace))\n", - "print(explainer.minimal_expl(non_conformant_trace))\n", - "\n", - "\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Step 5: Event Logs and Shapely values\n", - "\n", - "The event logs in this context is built with traces, here's how you set them up." - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Conformance rate: 0.2\n", - "Contribution ^A: 0.5\n", - "Contribution C$: 0.30000000000000004\n" - ] - } - ], - "source": [ - "from explainer import EventLog\n", - "\n", - "event_log = EventLog()\n", - "trace1 = Trace(['A', 'B', 'C'])\n", - "trace2 = Trace(['B', 'C'])\n", - "trace3 = Trace(['A', 'B'])\n", - "trace4 = Trace(['B'])\n", - "\n", - "event_log.add_trace(trace1, 5) # The second is how many traces you'd like to add, leave blank for 1\n", - "event_log.add_trace(trace2, 10)\n", - "event_log.add_trace(trace3, 5)\n", - "event_log.add_trace(trace4, 5)\n", - "\n", - "\n", - "exp = Explainer()\n", - "exp.add_constraint(\"^A\")\n", - "exp.add_constraint(\"C$\")\n", - "print(\"Conformance rate: \"+ str(exp.determine_conformance_rate(event_log)))\n", - "print('Contribution ^A:', exp.determine_shapley_value(event_log, exp.constraints, 0))\n", - "print('Contribution C$:', exp.determine_shapley_value(event_log, exp.constraints, 1))\n" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "conformant AC :True\n", - "Conformance rate: 0.14\n", - "Contribution C$: 0.21\n", - "Contribution ^A: 0.36\n", - "Contribution B+: 0.29\n" - ] - } - ], - "source": [ - "exp = Explainer()\n", - "event_log = EventLog()\n", - "trace1 = Trace(['A', 'B', 'C'])\n", - "trace2 = Trace(['B', 'C'])\n", - "trace3 = Trace(['A', 'B'])\n", - "trace4 = Trace(['B'])\n", - "trace5 = Trace(['A', 'C'])\n", - "\n", - "\n", - "event_log.add_trace(trace1, 5) # The second is how many traces you'd like to add, leave blank for 1\n", - "event_log.add_trace(trace2, 10)\n", - "event_log.add_trace(trace3, 5)\n", - "event_log.add_trace(trace4, 5)\n", - "event_log.add_trace(trace5, 10)\n", - "\n", - "\n", - "exp = Explainer()\n", - "exp.add_constraint(\"C$\")\n", - "exp.add_constraint(\"^A\")\n", - "exp.add_constraint(\"B+\")\n", - "print(\"conformant AC :\" + str(exp.conformant(trace5)))\n", - "print(\"Conformance rate: \"+ str(round(exp.determine_conformance_rate(event_log), 2)))\n", - "print('Contribution C$:', round(exp.determine_shapley_value(event_log, exp.constraints, 0), 2))\n", - "print('Contribution ^A:', round(exp.determine_shapley_value(event_log, exp.constraints, 1), 2))\n", - "print('Contribution B+:', round(exp.determine_shapley_value(event_log, exp.constraints, 2), 2))\n", - "\n" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": ".venv", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.12" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/tutorial/explainer_tutorial_1.ipynb b/tutorial/explainer_tutorial_1.ipynb index 80b7ea5..3f9fadf 100644 --- a/tutorial/explainer_tutorial_1.ipynb +++ b/tutorial/explainer_tutorial_1.ipynb @@ -229,24 +229,22 @@ "\n", "Example with minimal solution\n", "--------------------------------\n", - "5\n", "\n", - "Addition (Added B at position 1): A->B->B->A->C->B\n", - "Subtraction (Removed B from position 5): A->B->B->A->C\n", + "Addition (Added B at position 3): A->B->A->B->C->B\n", + "Subtraction (Removed B from position 5): A->B->A->B->C\n", "\n", "Example without minimal solution\n", "--------------------------------\n", "\n", - "Addition (Added B at position 1): C->B->B->A\n", - "Addition (Added B at position 1): C->B->B->B->A\n", - "Addition (Added A at position 1): C->A->B->B->B->A\n", - "Subtraction (Removed C from position 0): A->B->B->B->A\n", - "Addition (Added C at position 4): A->B->B->B->C->A\n", - "Subtraction (Removed A from position 5): A->B->B->B->C\n", + "Addition (Added A at position 1): C->A->B->A\n", + "Addition (Added A at position 1): C->A->A->B->A\n", + "Addition (Added A at position 1): C->A->A->A->B->A\n", + "Subtraction (Removed C from position 0): A->A->A->B->A\n", + "Addition (Added C at position 4): A->A->A->B->C->A\n", + "Subtraction (Removed A from position 5): A->A->A->B->C\n", "\n", "Example with minimal solution\n", "--------------------------------\n", - "3\n", "\n", "Addition (Added A at position 1): C->A->B->A\n", "Subtraction (Removed C from position 0): A->B->A\n", @@ -532,7 +530,7 @@ "total_ctrb = exp.constraint_ctrb_to_conformance(event_log, exp.constraints, 0) + exp.constraint_ctrb_to_conformance(event_log, exp.constraints, 1) + exp.constraint_ctrb_to_conformance(event_log, exp.constraints, 2)\n", "conf_rate = round(conf_rate, 2) \n", "total_ctrb = round(total_ctrb, 2)\n", - "print(\"Conformance loss = \" + str(100 - (conf_rate * 100)) + \"%, contribution to loss: \" + str(total_ctrb * 100) + \"%\")\n", + "print(\"Conformance loss : \" + str(100 - (conf_rate * 100)) + \"%, contribution to loss: \" + str(total_ctrb * 100) + \"%\")\n", "print(\"------------------------------------\")\n", "print(\"Fitness rate: \"+ str(exp.determine_fitness_rate(event_log)))\n", "print(\"C$ ctrb to fitness rate: \" + str(exp.constraint_ctrb_to_fitness(event_log, exp.constraints, 0)))\n", @@ -559,7 +557,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" + "version": "3.11.9" } }, "nbformat": 4,