Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/improved diff utils and docs #426

Merged
merged 2 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 133 additions & 54 deletions SpiffWorkflow/bpmn/util/diff.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,46 @@
from SpiffWorkflow import TaskState
from .task import BpmnTaskFilter

class SpecDiff:
"""This class is used to hold results for comparisions between two workflow specs.

def __init__(self, serializer, original, new):
"""This class is used to hold results for comparisions between two workflow specs.
Attributes:
added (list(`TaskSpec`)): task specs from the new version that cannot be aligned
alignment (dict): a mapping of old task spec to new
comparisons (dict): a mapping of old task spec to changed attributes

Attributes:
registry: a serializer's registry
unmatched (list(`TaskSpec`)): a list of task specs that cannot be aligned
alignment (dict): a mapping of old task spec to new
updated (dict): a mapping of old task spec to changed attributes
Properties:
removed (list of `TaskSpec`: specs from the original that cannot be aligned
changed (dict): a filtered version of comparisons that contains only changed items

The chief basis for alignment is `TaskSpec.name` (ie, the BPMN ID of the spec): if the IDs are identical,
it is assumed the task specs correspond. If a spec in the old version does not have an ID in the new,
some attempt to match based on inputs and outputs is made.

The general procdedure is to attempt to align as many tasks based on ID as possible first, and then
attempt to match by other means while backing out of the traversal.
The chief basis for alignment is `TaskSpec.name` (ie, the BPMN ID of the spec): if the IDs are identical,
it is assumed the task specs correspond. If a spec in the old version does not have an ID in the new,
some attempt to match based on inputs and outputs is made.

Updates are organized primarily by the specs from the original version.
"""
The general procdedure is to attempt to align as many tasks based on ID as possible first, and then
attempt to match by other means while backing out of the traversal.

Updates are organized primarily by the specs from the original version.
"""

def __init__(self, registry, original, new):
"""Constructor for a spec diff.

Args:
registry (`DictionaryConverter`): a serislizer registry
original (`BpmnProcessSpec`): the original spec
new (`BpmnProcessSpec`): the spec to compare

self.registry = serializer.registry
self.unmatched = [spec for spec in new.task_specs.values() if spec.name not in original.task_specs]
Aligns specs from the original with specs from the new workflow and checks each aligned pair
for chames.
"""
self.added = [spec for spec in new.task_specs.values() if spec.name not in original.task_specs]
self.alignment = {}
self.updated = {}
self.comparisons = {}
self._registry = registry
self._align(original.start, new)

@property
def added(self):
"""Task specs from the new version that did not exist in the old"""
return self.unmatched

@property
def removed(self):
"""Task specs from the old version that were removed from the new"""
Expand All @@ -40,14 +49,14 @@ def removed(self):
@property
def changed(self):
"""Task specs with updated attributes"""
return dict((ts, changes) for ts, changes in self.updated.items() if changes)
return dict((ts, changes) for ts, changes in self.comparisons.items() if changes)

def _align(self, spec, new):

candidate = new.task_specs.get(spec.name)
self.alignment[spec] = candidate
if candidate is not None:
self.updated[spec] = self._compare_task_specs(spec, candidate)
self.comparisons[spec] = self._compare_task_specs(spec, candidate)

# Traverse the spec, prioritizing matching by name
# Without this starting point, alignment would be way too difficult
Expand All @@ -62,7 +71,7 @@ def _align(self, spec, new):
def _search_unmatched(self, spec):
# If any outputs were matched, we can use its unmatched inputs as candidates
for match in self._substitutions(spec.outputs):
for parent in [ts for ts in match.inputs if ts in self.unmatched]:
for parent in [ts for ts in match.inputs if ts in self.added]:
if self._aligned(spec.outputs, parent.outputs):
path = [parent] # We may need to check ancestor inputs as well as this spec's inputs
searched = [] # We have to keep track of what we've looked at in case of loops
Expand All @@ -72,15 +81,17 @@ def _find_ancestor(self, spec, path, searched):
if path[-1] not in searched:
searched.append(path[-1])
# Stop if we reach a previously matched spec or if an ancestor's inputs match
if path[-1] not in self.unmatched or self._aligned(spec.inputs, path[-1].inputs):
if path[-1] not in self.added or self._aligned(spec.inputs, path[-1].inputs):
self.alignment[spec] = path[0]
self.unmatched.remove(path[0])
if path[0] in self.added:
self.added.remove(path[0])
self.comparisons[spec] = self._compare_task_specs(spec, path[0])
else:
for parent in [ts for ts in path[-1].inputs if ts not in searched]:
self._find_ancestor(spec, path + [parent], searched)

def _substitutions(self, spec_list, skip_unaligned=True):
subs = [self.alignment[ts] for ts in spec_list]
subs = [self.alignment.get(ts) for ts in spec_list]
return [ts for ts in subs if ts is not None] if skip_unaligned else subs

def _aligned(self, original, candidates):
Expand All @@ -89,8 +100,8 @@ def _aligned(self, original, candidates):
all(first is not None and first.name == second.name for first, second in zip(subs, candidates))

def _compare_task_specs(self, spec, candidate):
s1 = self.registry.convert(spec)
s2 = self.registry.convert(candidate)
s1 = self._registry.convert(spec)
s2 = self._registry.convert(candidate)
if s1.get('typename') != s2.get('typename'):
return ['typename']
else:
Expand All @@ -101,41 +112,109 @@ class WorkflowDiff:
to its WorkflowSpec.

Attributes
workflow (`BpmnWorkflow`): a workflow instance
spec_diff (`SpecDiff`): the results of a comparision of two specs
removed (list(`Task`)): a list of tasks whose specs do not exist in the new version
changed (list(`Task`)): a list of tasks with aligned specs where attributes have changed
alignment (dict): a mapping of old task spec to new task spec
"""

def __init__(self, workflow, spec_diff):
self.workflow = workflow
self.spec_diff = spec_diff
self.removed = []
self.changed = []
self.alignment = {}
self._align()
self._align(workflow, spec_diff)

def filter_tasks(self, tasks, **kwargs):
"""Applies task filtering arguments to a list of tasks.
def _align(self, workflow, spec_diff):
for task in workflow.get_tasks(skip_subprocesses=True):
if task.task_spec in spec_diff.changed:
self.changed.append(task)
if task.task_spec in spec_diff.removed:
self.removed.append(task)
else:
self.alignment[task] = spec_diff.alignment[task.task_spec]

Args:
tasks (list(`Task`)): a list of of tasks

Keyword Args:
any keyword arg that may be passed to `BpmnTaskFilter`
def diff_dependencies(registry, original, new):
"""Helper method for comparing sets of spec dependencies.

Returns:
a list containing tasks matching the filter
"""
task_filter = BpmnTaskFilter(**kwargs)
return [t for t in tasks if task_filter.matches(t)]
Args:
registry (`DictionaryConverter`): a serislizer registry
original (dict): the name -> `BpmnProcessSpec` mapping for the original spec
new (dict): the name -> `BpmnProcessSpec` mapping for the updated spec

def _align(self):
for task in self.workflow.get_tasks(skip_subprocesses=True):
if task.task_spec in self.spec_diff.changed:
self.changed.append(task)
if task.task_spec in self.spec_diff.removed:
self.removed.append(task)
else:
self.alignment[task] = self.spec_diff.alignment[task.task_spec]
Returns:
a tuple of:
mapping from name -> `SpecDiff` (or None) for each original dependency
list of names of specs in the new dependencies that did not previously exist
"""
result = {}
subprocesses = {}
for name, spec in original.items():
if name in new:
result[name] = SpecDiff(registry, spec, new[name])
else:
result[name] = None

return result, [name for name in new if name not in original]


def diff_workflow(registry, workflow, new_spec, new_dependencies):
"""Helper method to handle diffing a workflow and all its dependencies at once.

Args:
registry (`DictionaryConverter`): a serislizer registry
workflow (`BpmnWorkflow`): a workflow instance
new_spec (`BpmnProcessSpec`): the new top level spec
new_depedencies (dict): a dictionary of name -> `BpmnProcessSpec`

Returns:
tuple of `WorkflowDiff` and mapping of subworkflow id -> `WorkflowDiff`

This method checks the top level workflow against the new spec as well as any
existing subprocesses for missing or updated specs.
"""
spec_diff = SpecDiff(registry, workflow.spec, new_spec)
top_diff = WorkflowDiff(workflow, spec_diff)
sp_diffs = {}
for sp_id, sp in workflow.subprocesses.items():
if sp.spec.name in new_dependencies:
dep_diff = SpecDiff(registry, sp.spec, new_dependencies[sp.spec.name])
sp_diffs[sp_id] = WorkflowDiff(sp, dep_diff)
else:
sp_diffs[sp_id] = None
return top_diff, sp_diffs

def filter_tasks(tasks, **kwargs):
"""Applies task filtering arguments to a list of tasks.

Args:
tasks (list(`Task`)): a list of of tasks

Keyword Args:
any keyword arg that may be passed to `BpmnTaskFilter`

Returns:
a list containing tasks matching the filter
"""
task_filter = BpmnTaskFilter(**kwargs)
return [t for t in tasks if task_filter.matches(t)]

def migrate_workflow(diff, workflow, spec, reset_mask=None):
"""Update the spec for workflow.

Args:
diff (`WorkflowDiff`): the diff of this workflow and spec
workflow (`BpmnWorkflow` or `BpmnSubWorkflow`): the workflow
spec (`BpmnProcessSpec`): the new spec

Keyword Args:
reset_mask (`TaskState`): reset and repredict tasks in this state

The default rest_mask is TaskState.READY|TaskState.WAITING but can be overridden.
"""
workflow.spec = spec
for task in workflow.get_tasks():
task.task_spec = diff.alignment.get(task)

default_mask = TaskState.READY|TaskState.WAITING
for task in list(workflow.get_tasks(state=reset_mask or default_mask)):
task.reset_branch(None)
Loading
Loading