Skip to content

Commit

Permalink
Fixed linting
Browse files Browse the repository at this point in the history
  • Loading branch information
rasmusSAP committed Oct 21, 2024
1 parent 1759051 commit 8de0980
Showing 1 changed file with 40 additions and 19 deletions.
59 changes: 40 additions & 19 deletions bpmnconstraints/parser/bpmn_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def __create_model(self, bpmn, is_file):
if is_file:
try:
file_extension = Path(bpmn).suffix
if not file_extension or file_extension not in [".json", ".xml"]:
if not file_extension or file_extension not in [
".json", ".xml"]:
return None
elif file_extension == ".xml":
return ElementTree.parse(bpmn).getroot()
Expand Down Expand Up @@ -72,18 +73,23 @@ def validate_splitting_and_joining_gateway_cases(self):
Otherwise, the parser interprets the gateways as start/end events instead of the activities.
"""

item_indices = {item["name"]: index for index, item in enumerate(self.sequence)}
item_indices = {
item["name"]: index for index,
item in enumerate(
self.sequence)}
for cfo in self.sequence:
if cfo["is start"] and (cfo["type"] in DISCARDED_START_GATEWAYS):
cfo["is start"] = False
for successor in cfo["successor"]:
self.sequence[item_indices[successor["name"]]]["is start"] = True
self.sequence[item_indices[successor["name"]]
]["is start"] = True
if cfo["is end"] and (
cfo["name"] in GATEWAY_NAMES or cfo["type"] == "exclusivegateway"
):
cfo["is end"] = False
for predecessor in cfo["predecessor"]:
self.sequence[item_indices[predecessor["name"]]]["is end"] = True
self.sequence[item_indices[predecessor["name"]]
]["is end"] = True

def __mark_gateway_elements(self):
for cfo in self.sequence:
Expand All @@ -100,7 +106,8 @@ def __mark_gateway_elements(self):
"type"
) in ALLOWED_GATEWAYS and predecessor_cfo.get("joining"):
continue
if cfo.get("type") in ALLOWED_GATEWAYS and cfo.get("joining"):
if cfo.get("type") in ALLOWED_GATEWAYS and cfo.get(
"joining"):
continue
if "is in gateway" in predecessor_cfo:
cfo.update({"is in gateway": True})
Expand All @@ -119,7 +126,8 @@ def __get_parsed_cfo_by_bpmn_element(self, elem):
def __find_transitive_closure(self, cfo, transitivity, visited):
# Check if the current node has already been visited to prevent a loop
if cfo.get("id") in visited:
print(f"Already visited {cfo.get('id')}, skipping to avoid a loop.")
print(
f"Already visited {cfo.get('id')}, skipping to avoid a loop.")
return

# Mark the current node as visited
Expand All @@ -134,8 +142,10 @@ def __find_transitive_closure(self, cfo, transitivity, visited):
if "is in gateway" not in successor:
transitivity.append(successor)
for successor in cfo.get("successor"):
successor_cfo = self.__get_cfo_by_id(successor.get("id"))
self.__find_transitive_closure(successor_cfo, transitivity, visited)
successor_cfo = self.__get_cfo_by_id(
successor.get("id"))
self.__find_transitive_closure(
successor_cfo, transitivity, visited)

def __add_transitivity(self):
for cfo in self.sequence:
Expand Down Expand Up @@ -199,7 +209,8 @@ def __valid_cfo_element(self, elem):
return True
if self.__is_element_gateway(elem):
return True
if self.__is_element_start_event(elem) and self.__valid_start_name(elem):
if self.__is_element_start_event(
elem) and self.__valid_start_name(elem):
return True
if self.__is_element_end_event(elem) and self.__valid_end_name(elem):
return True
Expand Down Expand Up @@ -249,7 +260,8 @@ def __get_successors(self, elem):
else self.model.get_id(connection)
)
elem = self.__get_element_by_id(connection_id)
if self.model.get_element_type(elem) in ALLOWED_CONNECTING_OBJECTS:
if self.model.get_element_type(
elem) in ALLOWED_CONNECTING_OBJECTS:
connection = self.model.get_outgoing_connection(elem)
if connection:
elem = (
Expand Down Expand Up @@ -295,7 +307,8 @@ def __format_list(self, elems, gateway=False):
}

try:
cfo.update({"splitting": len(self.__get_successors(elem)) >= 2})
cfo.update(
{"splitting": len(self.__get_successors(elem)) >= 2})
except Exception:
pass
formatted.append(cfo)
Expand All @@ -312,29 +325,36 @@ def __get_element_by_id(self, connection_id):
raise Exception(f"Could not find element with ID {connection_id}")

def __get_activity_type(self, elem):
return ACTIVITY_MAPPING.get(self.model.get_element_type(elem), "Activity")
return ACTIVITY_MAPPING.get(
self.model.get_element_type(elem), "Activity")

def __get_gateway_type(self, elem):
return GATEWAY_MAPPING.get(self.model.get_element_type(elem), "Gateway")
return GATEWAY_MAPPING.get(
self.model.get_element_type(elem), "Gateway")

def __get_end_type(self, elem):
return END_EVENT_MAPPING.get(self.model.get_element_type(elem), "EndEvent")
return END_EVENT_MAPPING.get(
self.model.get_element_type(elem), "EndEvent")

def __get_label(self, elem):
try:
if self.__is_element_activity(elem):
try:
return self.sanitizer.sanitize_label(self.model.get_name(elem))
return self.sanitizer.sanitize_label(
self.model.get_name(elem))
except KeyError:
return self.__get_activity_type(elem)
if self.__is_element_gateway(elem):
try:
return self.sanitizer.sanitize_label(self.model.get_name(elem))
return self.sanitizer.sanitize_label(
self.model.get_name(elem))
except KeyError:
return self.__get_gateway_type(elem)
if self.__is_element_start_event(elem) or self.__is_element_end_event(elem):
if self.__is_element_start_event(
elem) or self.__is_element_end_event(elem):
try:
return self.sanitizer.sanitize_label(self.model.get_name(elem))
return self.sanitizer.sanitize_label(
self.model.get_name(elem))
except KeyError:
return self.model.get_element_type(elem)
except KeyError:
Expand Down Expand Up @@ -368,7 +388,8 @@ def __is_predecessor_start_event(self, predecessors):
def __is_successor_end_event(self, successors):
for successor in successors:
if successor:
if self.model.get_element_type(successor) in ALLOWED_END_EVENTS:
if self.model.get_element_type(
successor) in ALLOWED_END_EVENTS:
if self.__valid_end_name(successor):
return False
return True
Expand Down

0 comments on commit 8de0980

Please sign in to comment.