Skip to content

Commit

Permalink
Linting
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcusRostSAP committed May 24, 2024
1 parent 29c9e0f commit f836c62
Showing 1 changed file with 19 additions and 11 deletions.
30 changes: 19 additions & 11 deletions explainer/explainer_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import requests
from tutorial.conf import system_instance, workspace_id, user_name, pw


class ExplainerSignal:
def __init__(self):
self.constraints = [] # List to store constraints (constraint patterns)
Expand Down Expand Up @@ -80,7 +81,7 @@ def conformant(self, trace, constraints=None):
:return: Boolean indicating if the trace is conformant with all constraints.
"""
if constraints == None:
return self.post_query_trace_in_dataset(trace, self.constraints)
return self.post_query_trace_in_dataset(trace, self.constraints)
return self.post_query_trace_in_dataset(trace, constraints)

def minimal_expl(self, trace):
Expand Down Expand Up @@ -224,13 +225,14 @@ def get_nodes_from_constraint(self, constraint=None):
return list(set(self.filter_keywords(constraint)))

def filter_keywords(self, text):
""" Extracts the events from a SIGNAL constraint
"""
Extracts the events from a SIGNAL constraint
Args:
text (String): The SIGNAL constraint
Returns:
[String]: A list of the filtered events from the SIGNAL constraint
[String]: A list of the filtered events from the SIGNAL constraint
"""
text = re.sub(r"\s+", "_", text.strip())
words = re.findall(r"\b[A-Z_a-z]+\b", text)
Expand Down Expand Up @@ -392,7 +394,7 @@ def constraint_ctrb_to_fitness(self, log=None, constraints=None, index=-1):
len_log = self.get_total_cases()
return ctrb_count / (len_log * len(constraints))

def check_conformance(self, constraint, negative = True):
def check_conformance(self, constraint, negative=True):
"""
Checks the conformance of the event log against a specific constraint.
Expand All @@ -419,7 +421,9 @@ def check_violations(self, constraints):
combined_constraints = " OR ".join(
[f"NOT event_name MATCHES {constraint}" for constraint in constraints]
)
query = f'SELECT COUNT(CASE_ID) FROM "defaultview-4" WHERE {combined_constraints}'
query = (
f'SELECT COUNT(CASE_ID) FROM "defaultview-4" WHERE {combined_constraints}'
)
return self.post_query(query) # Execute the query and return the result

def get_total_cases(self):
Expand All @@ -442,7 +446,7 @@ def post_query(self, query):
cache_key = hash(query) # Generate a cache key for the query
if cache_key in self.cache: # Check if the result is already in the cache
return self.cache[cache_key] # Return cached result if available

# Send the query to the server
request = requests.post(
self.signal_endpoint,
Expand All @@ -464,15 +468,15 @@ def post_query_trace_in_dataset(self, trace, constraints):
"""
if not constraints:
constraints = self.constraints # Use self.constraints if none are provided

# Combine constraints with AND if there are multiple, otherwise use the single constraint
if len(constraints) > 1:
constraints = " AND ".join(
[f"event_name MATCHES {constraint}" for constraint in constraints]
)
else:
constraints = "".join(f"event_name MATCHES {constraints[0]}")

# Formulate the query
query = f'SELECT ACTIVITY, COUNT(CASE_ID) FROM "defaultview-4" WHERE {constraints}'
cache_key = hash(query) # Generate a cache key for the query
Expand Down Expand Up @@ -503,8 +507,12 @@ def get_all_conformant_traces(self):
constraints = "".join(f"event_name MATCHES {constraints[0]}")

# Formulate the query
query = f'SELECT ACTIVITY, COUNT(CASE_ID) FROM "defaultview-4" WHERE {constraints}'
return self.post_query_return_all(query) # Execute the query and return the list of conformant traces
query = (
f'SELECT ACTIVITY, COUNT(CASE_ID) FROM "defaultview-4" WHERE {constraints}'
)
return self.post_query_return_all(
query
) # Execute the query and return the list of conformant traces

def post_query_return_all(self, query):
"""
Expand All @@ -516,7 +524,7 @@ def post_query_return_all(self, query):
cache_key = hash(query) # Generate a cache key for the query
if cache_key in self.cache: # Check if the result is already in the cache
return self.cache[cache_key] # Return cached result if available

# Send the query to the server
request = requests.post(
self.signal_endpoint,
Expand Down

0 comments on commit f836c62

Please sign in to comment.