provide restful viewflow
many of code in the project looks like django-viewflow. I want to keep the interface as same as the django viewflow.
Thanks you for all the contributors of viewflow.
The project is under ONLY USE NO PRIVATE CHANGE LICENSE
, any one who change the source code (even if you just use it in intranet of just at home) should publish his code to public
the flow graph can been cloned the changed from this link.
this graph like above picture can be written like the below code:
# example_project/exam/flows.py
class ExamFlow(flows.Flow):
process_class = models.ExamProcess
task_class = models.ExamTask
register = nodes.Start(
viewclass=rest_extensions.AutoCreateAPIView,
serializer_class=serializers.RegisterExamSerializer,
).Next(
this.select_term
)
select_term = nodes.View(
viewclass=rest_extensions.AutoUpdateAPIView,
fields=["term"],
).Next(this.take_exam)
take_exam = nodes.View(
viewclass=rest_extensions.AutoUpdateAPIView,
fields=["grade"],
).Next(this.check_grade)
check_grade = nodes.If(
cond=lambda activation: activation.process.passed
).Then(this.end).Else(this.select_term)
end = nodes.End()
quite simple and intuitive, right?
class HireFlow(flows.Flow):
process_class = models.HireProcess
task_class = models.HireTask
start = nodes.Start(
viewclass=rest_extensions.AutoCreateAPIView,
serializer_class=serializers.AddCandidateSerializer,
).Permission(
group=Group.objects.get_or_create(name="hr")[0]
).Next(
this.split_to_3rd_and_direct_leader
)
split_to_3rd_and_direct_leader = nodes.Split(
).Always(
this.approve
).Always(
this.background_research
)
background_research = nodes.View(
viewclass=rest_extensions.AutoUpdateAPIView,
fields=["background_ok"],
).Next(
this.check_background
)
check_background = nodes.If(
cond=lambda activation: activation.process.background_ok
).Then(
this.join_on_both_approve
).Else(
this.end
)
join_on_both_approve = nodes.Join().Next(
this.notify
)
notify = nodes.View(
viewclass=rest_extensions.AutoUpdateAPIView,
fields=["notified"],
).Next(
this.end
)
approve = nodes.View(
viewclass=rest_extensions.AutoUpdateAPIView,
serializer_class = serializers.ApproveSerializer,
# fields=["approved"],
).Permission(
group=Group.objects.get_or_create(name="leader")[0]
).Next(
this.check_if_approve
)
check_if_approve = nodes.If(
cond=lambda activation: activation.process.approved
).Then(
this.set_salary
).Else(
this.notify
)
set_salary = nodes.View(
viewclass=rest_extensions.AutoUpdateAPIView,
fields=["salary"],
).Permission(
group=Group.objects.get_or_create(name="hr")[0]
).Next(
this.join_on_both_approve
)
end = nodes.End()
use the example_project
as a example
git clone git@github.com:ramwin/viewflow-rest.git
cd vieflow-rest/example_project/
sudo pip3 install -r ./requirements.txt
python3 manage.py migrate
python3 manage.py runserver
# visit http://localhost:8000/exam/ or http://localhost:8000/hire/ to get the api
The post_finish
and post_start
signal not use the flow_task
instead of a flow_process
as a sender. You should change your code from
task_started.connect(function, ProcessClass)
to
task_started.connect(function, ProcessClass.one_of_its_flow_task)
- How To Update the Task manually
task = models.FlowTaskModel.objects.get(id=4)
task.auto_finish(operator=User) or operator=None
- create project
- create Flow
- Create Start & End Node
- Create ViewActivation
- Create If Node
- Create Split Node
- Create Join Node
- Create Permission
- 0.3.0
- one task for every
flow_task
- add
serializer_class
parameters forrest_extentions.views
- add
operator
for every task
- one task for every
- fork and clone this repository.
git clone git@github.com:<your_username>/viewflow-rest.git
- make your change in your own repository.
- make sure you pass the unittest
cd example_project
python manage.py test
- create a pull request.
A flow contains many flow_tasks/nodes
# here exam_flow is a workflow
# it contains three flow_tasks, which were register, do, end
class ExamFlow(flows.Flow):
register = nodes.Start(
...
).Next(this.do)
do.nodes.View(
...
).Next(this.end)
end = nodes.End()
exam_flow = ExamFlow()
every flow_task is a instance of Node
every flow_task have a activation_class
every action_class
instance will activate_next
by
self.flow_task._next // the next node instance
self.flow_task._next.activate //
- src: source Node instance
- dst: target Node instance
- Attribute
flow_class
flow_task
: Node Instance defined in theflows.py
task
: Current Task
- Function
_incoming
: Edge Instance list_outgoing
: Edge Instance list
-
models
-
Views
-
Flow
-
rest_extensions