Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/depth calc and task dict fixes #355

Merged
merged 2 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions SpiffWorkflow/bpmn/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,27 +61,32 @@ def _next(self):
if len(self.task_list) == 0:
raise StopIteration()

task = self.task_list.pop(-1)
task = self.task_list.pop(0)
subprocess = task.workflow.top_workflow.subprocesses.get(task.id)

if all([
if task.task_spec.name == self.end_at_spec:
self.task_list = []
elif all([
len(task._children) > 0 or subprocess is not None,
task.state >= self.min_state,
self.depth < self.max_depth,
task.task_spec.name != self.end_at_spec,
]):
add_tasks = [t for t in reversed(task.children)]
if subprocess is None:
next_tasks = task.children
elif self.depth_first:
next_tasks = [subprocess.task_tree] + task.children
else:
next_tasks = task.children = [subprocess.task_tree]

if self.depth_first:
if subprocess is not None:
add_tasks.append(subprocess.task_tree)
self.task_list.extend(add_tasks)
self.task_list = next_tasks + self.task_list
else:
if subprocess is not None:
add_tasks = [subprocess.task_tree] + add_tasks
self.task_list = add_tasks + self.task_list
self.depth += 1
elif len(self.task_list) > 0 and task.parent != self.task_list[0].parent:
self.depth -= 1
self.task_list.extend(next_tasks)
self._update_depth(task)

elif self.depth_first and len(self.task_list) > 0:
self._handle_leaf_depth(task)

return task


Expand Down
5 changes: 3 additions & 2 deletions SpiffWorkflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def _sync_children(self, task_specs, state=TaskState.MAYBE):

# Update children accordingly
for child in unneeded_children:
self._children.remove(child.id)
self.workflow._remove_task(child.id)
for task_spec in new_children:
self._add_child(task_spec, state)

Expand All @@ -290,8 +290,9 @@ def _drop_children(self, force=False):
drop.append(child)
else:
child._drop_children()

for task in drop:
self._children.remove(task.id)
self.workflow._remove_task(task.id)

def _set_state(self, value):
"""Force set the state on a task"""
Expand Down
40 changes: 32 additions & 8 deletions SpiffWorkflow/util/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,19 +204,43 @@ def _next(self):
if len(self.task_list) == 0:
raise StopIteration()

task = self.task_list.pop(-1)
if all([
task = self.task_list.pop(0)

if task.task_spec.name == self.end_at_spec:
self.task_list = []
elif all([
len(task._children) > 0,
task.state >= self.min_state,
self.depth < self.max_depth,
task.task_spec.name != self.end_at_spec,
]):
if self.depth_first:
self.task_list.extend(reversed(task.children))
self.task_list = task.children + self.task_list
else:
self.task_list = reversed(task.children) + self.task_list
self.depth += 1
elif len(self.task_list) > 0 and task.parent != self.task_list[0].parent:
self.depth -= 1
self.task_list.extend(task.children)
self._update_depth(task)
elif self.depth_first and len(self.task_list) > 0:
self._handle_leaf_depth(task)

return task

def _update_depth(self, task):

if self.depth_first:
# Since we visit the children before siblings, we always increment depth when adding children
self.depth += 1
else:
# In this case, we have to check for a common ancestor at the same depth
first, second = task, self.task_list[0]
for i in range(self.depth):
first = first.parent if first is not None else None
second = second.parent if second is not None else None
if first != second:
self.depth += 1

def _handle_leaf_depth(self, task):

ancestor = self.task_list[0].parent
current = task.parent
while current is not None and current != ancestor:
current = current.parent
self.depth -= 1
7 changes: 7 additions & 0 deletions SpiffWorkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,13 @@ def _task_completed_notify(self, task):
# Since is_completed() is expensive it makes sense to bail out if calling it is not necessary.
self.completed_event(self)

def _remove_task(self, task_id):
task = self.tasks[task_id]
for child in task.children:
self._remove_task(child.id)
task.parent._children.remove(task.id)
self.tasks.pop(task_id)

def _get_mutex(self, name):
"""Get or create a mutex"""
if name not in self.locks:
Expand Down
79 changes: 79 additions & 0 deletions tests/SpiffWorkflow/core/IteratorTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import unittest
import os
from datetime import datetime

from lxml import etree

from SpiffWorkflow.workflow import Workflow
from SpiffWorkflow.specs.Cancel import Cancel
from SpiffWorkflow.specs.Simple import Simple
from SpiffWorkflow.specs.WorkflowSpec import WorkflowSpec
from SpiffWorkflow.util.task import TaskState, TaskIterator, TaskFilter
from SpiffWorkflow.serializer.prettyxml import XmlSerializer

data_dir = os.path.join(os.path.dirname(__file__), 'data')

class IterationTest(unittest.TestCase):

def setUp(self):
xml_file = os.path.join(data_dir, 'iteration_test.xml')
with open(xml_file) as fp:
xml = etree.parse(fp).getroot()
wf_spec = WorkflowSpec.deserialize(XmlSerializer(), xml)
self.workflow = Workflow(wf_spec)

def get_tasks_updated_after(self):
start = self.workflow.get_next_task(end_at_spec='Start')
start.run()
updated = datetime.now().timestamp()
for task in self.workflow.get_tasks(state=TaskState.READY):
task.run()
return updated

class DepthFirstTest(IterationTest):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you writing a computer science textbook here or what? :)


def test_get_tasks_updated_after(self):
updated = super().get_tasks_updated_after()
tasks = self.workflow.get_tasks(updated_ts=updated)
self.assertListEqual(
[t.task_spec.name for t in tasks],
['a', 'a1', 'a2', 'c', 'b', 'b1', 'b2']
)

def test_get_tasks_end_at(self):
tasks = self.workflow.get_tasks(end_at_spec='c')
self.assertEqual(
[t.task_spec.name for t in tasks],
['Start', 'a', 'a1', 'last', 'End', 'a2', 'last', 'End', 'c']
)

def test_get_tasks_max_depth(self):
tasks = self.workflow.get_tasks(max_depth=2)
self.assertEqual(
[t.task_spec.name for t in tasks],
['Start', 'a', 'a1', 'a2', 'c', 'b', 'b1', 'b2']
)

class BreadthFirstTest(IterationTest):

def test_get_tasks_updated_after(self):
updated = super().get_tasks_updated_after()
tasks = self.workflow.get_tasks(updated_ts=updated, depth_first=False)
self.assertListEqual(
[t.task_spec.name for t in tasks],
['a', 'b', 'a1', 'a2', 'c', 'b1', 'b2']
)

def test_get_tasks_end_at(self):
tasks = self.workflow.get_tasks(end_at_spec='c', depth_first=False)
self.assertEqual(
[t.task_spec.name for t in tasks],
['Start', 'a', 'b', 'a1', 'a2', 'c']
)

def test_get_tasks_max_depth(self):
tasks = self.workflow.get_tasks(max_depth=2, depth_first=False)
self.assertEqual(
[t.task_spec.name for t in tasks],
['Start', 'a', 'b', 'a1', 'a2', 'c', 'b1', 'b2']
)
37 changes: 6 additions & 31 deletions tests/SpiffWorkflow/core/WorkflowTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

class WorkflowTest(unittest.TestCase):

ready_task_filter = TaskFilter(state=TaskState.READY)

def setUp(self):
xml_file = os.path.join(data_dir, 'workflow1.xml')
with open(xml_file) as fp:
Expand All @@ -27,13 +25,13 @@ def setUp(self):
def test_interactive_calls(self):
"""Simulates interactive calls, as would be issued by a user."""

tasks = self.workflow.get_tasks(task_filter=self.ready_task_filter)
tasks = self.workflow.get_tasks(state=TaskState.READY)
self.assertEqual(len(tasks), 1)
self.assertEqual(tasks[0].task_spec.name, 'Start')
self.workflow.run_task_from_id(tasks[0].id)
self.assertEqual(tasks[0].state, TaskState.COMPLETED)

tasks = self.workflow.get_tasks(task_filter=self.ready_task_filter)
tasks = self.workflow.get_tasks(state=TaskState.READY)
self.assertEqual(len(tasks), 2)
task_a1 = tasks[0]
task_b1 = tasks[1]
Expand All @@ -44,46 +42,23 @@ def test_interactive_calls(self):
self.workflow.run_task_from_id(task_a1.id)
self.assertEqual(task_a1.state, TaskState.COMPLETED)

tasks = self.workflow.get_tasks(task_filter=self.ready_task_filter)
tasks = self.workflow.get_tasks(state=TaskState.READY)
self.assertEqual(len(tasks), 2)
self.assertTrue(task_b1 in tasks)
task_a2 = tasks[0]
self.assertEqual(task_a2.task_spec.__class__, Simple)
self.assertEqual(task_a2.task_spec.name, 'task_a2')
self.workflow.run_task_from_id(task_a2.id)

tasks = self.workflow.get_tasks(task_filter=self.ready_task_filter)
tasks = self.workflow.get_tasks(state=TaskState.READY)
self.assertEqual(len(tasks), 1)
self.assertTrue(task_b1 in tasks)

self.workflow.run_task_from_id(task_b1.id)
tasks = self.workflow.get_tasks(task_filter=self.ready_task_filter)
tasks = self.workflow.get_tasks(state=TaskState.READY)
self.assertEqual(len(tasks), 1)
self.workflow.run_task_from_id(tasks[0].id)

tasks = self.workflow.get_tasks(task_filter=self.ready_task_filter)
tasks = self.workflow.get_tasks(state=TaskState.READY)
self.assertEqual(len(tasks), 1)
self.assertEqual(tasks[0].task_spec.name, 'synch_1')

def test_get_tasks_updated_after(self):

start = self.workflow.get_next_task(end_at_spec='Start')
start.run()
updated = datetime.now().timestamp()
for task in self.workflow.get_tasks(task_filter=self.ready_task_filter):
task.run()
tasks = self.workflow.get_tasks(task_filter=TaskFilter(updated_ts=updated))
self.assertListEqual([t.task_spec.name for t in tasks], ['task_a1', 'task_a2', 'task_b1', 'task_b2'])

def test_get_tasks_end_at(self):

tasks = self.workflow.get_tasks(end_at_spec='excl_choice_1')
spec_names = [t.task_spec.name for t in tasks]
self.assertEqual(len([name for name in spec_names if name == 'excl_choice_1']), 2)
self.assertNotIn('task_c1', spec_names)
self.assertNotIn('task_c2', spec_names)
self.assertNotIn('task_c3', spec_names)

def test_get_tasks_max_depth(self):
tasks = [t for t in self.workflow.get_tasks(max_depth=2)]
self.assertListEqual([t.task_spec.name for t in tasks], ['Start', 'task_a1', 'task_a2', 'task_b1', 'task_b2'])
53 changes: 53 additions & 0 deletions tests/SpiffWorkflow/core/data/iteration_test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<process-definition name="Test Iteration" revision="1.0">
<description>
A test workflow to be used to test task iteration.
</description>

<!-- Start with an implicit simple split. -->
<start-task>
<successor>a</successor>
<successor>b</successor>
</start-task>

<task name="a">
<successor>a1</successor>
<successor>a2</successor>
<successor>c</successor>
</task>

<task name="b">
<successor>b1</successor>
<successor>b2</successor>
</task>

<task name="c">
<successor>c1</successor>
<successor>c2</successor>
</task>

<task name="a1">
<successor>last</successor>
</task>
<task name="a2">
<successor>last</successor>
</task>

<task name="b1">
<successor>last</successor>
</task>
<task name="b2">
<successor>last</successor>
</task>

<task name="c1">
<successor>last</successor>
</task>
<task name="c2">
<successor>last</successor>
</task>

<task name="last">
<successor>end</successor>
</task>
</process-definition>