Skip to content

Commit 97e3cbb

Browse files
update: Allow save and repeat for standard steps (#154)
* update: Allow save and repeat for standard steps * add testing for testingRunStateController impl * make skip types unexported * panic if user uses internally reserved node values
1 parent 59dfc8b commit 97e3cbb

17 files changed

+397
-54
lines changed

builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func NewBuilder[Type any, Status StatusType](name string) *Builder[Type, Status]
3030
consumers: make(map[Status]consumerConfig[Type, Status]),
3131
callback: make(map[Status][]callback[Type, Status]),
3232
timeouts: make(map[Status]timeouts[Type, Status]),
33-
statusGraph: graph.New(),
33+
statusGraph: graph.New(allSkipTypes()...),
3434
errorCounter: errorcounter.New(),
3535
internalState: make(map[string]State),
3636
logger: &logger{

builder_internal_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,3 +561,21 @@ func TestOnComplete(t *testing.T) {
561561
require.True(t, ok)
562562
require.Equal(t, testErr, actualFn(nil, nil))
563563
}
564+
565+
func TestPanicForUsingReservedNodeValues(t *testing.T) {
566+
b := NewBuilder[string, testStatus]("")
567+
568+
for _, skipType := range allSkipTypes() {
569+
require.PanicsWithValue(t,
570+
"cannot use reserved node as 'from' node",
571+
func() {
572+
b.AddStep(testStatus(skipType), nil, statusEnd)
573+
}, "Using reserved nodes should panic")
574+
575+
require.PanicsWithValue(t,
576+
"cannot use reserved node as 'to' node",
577+
func() {
578+
b.AddStep(statusStart, nil, testStatus(skipType))
579+
}, "Using reserved nodes should panic")
580+
}
581+
}

callback_internal_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func TestProcessCallback(t *testing.T) {
136136

137137
callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Run[string, testStatus], reader io.Reader) (testStatus, error) {
138138
calls["callbackFunc"] += 1
139-
return testStatus(SkipTypeDefault), nil
139+
return testStatus(skipTypeDefault), nil
140140
})
141141

142142
updater := func(ctx context.Context, current testStatus, next testStatus, record *Run[string, testStatus], workingVersion uint) error {

internal/graph/graph.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
package graph
22

3-
import "slices"
3+
import (
4+
"slices"
5+
)
6+
7+
func New(reservedNodes ...int) *Graph {
8+
rs := make(map[int]bool)
9+
for _, node := range reservedNodes {
10+
rs[node] = true
11+
}
412

5-
func New() *Graph {
613
return &Graph{
7-
graph: make(map[int][]int),
8-
starting: make(map[int]bool),
9-
terminal: make(map[int]bool),
10-
validNodes: make(map[int]bool),
14+
graph: make(map[int][]int),
15+
starting: make(map[int]bool),
16+
terminal: make(map[int]bool),
17+
validNodes: make(map[int]bool),
18+
reservedNodes: rs,
1119
}
1220
}
1321

@@ -17,9 +25,20 @@ type Graph struct {
1725
starting map[int]bool
1826
terminal map[int]bool
1927
validNodes map[int]bool
28+
29+
// reservedNode is for values that are reserved for the user and cannot be used as nodes in the graph.
30+
reservedNodes map[int]bool
2031
}
2132

2233
func (g *Graph) AddTransition(from int, to int) {
34+
if _, reserved := g.reservedNodes[from]; reserved {
35+
panic("cannot use reserved node as 'from' node")
36+
}
37+
38+
if _, reserved := g.reservedNodes[to]; reserved {
39+
panic("cannot use reserved node as 'to' node")
40+
}
41+
2342
if _, ok := g.validNodes[from]; !ok {
2443
g.nodeOrder = append(g.nodeOrder, from)
2544
}

internal/graph/graph_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,36 @@ func TestGraph(t *testing.T) {
5858
expectedNodes := []int{1, 2, 3, 4, 5}
5959
require.Equal(t, expectedNodes, actualNodes)
6060
}
61+
62+
func TestGraphReservedNodes(t *testing.T) {
63+
g := graph.New(-1, 99)
64+
// Ensure no panic on adding transitions that are not reserved nodes
65+
g.AddTransition(1, 2)
66+
require.True(t, g.IsTerminal(2))
67+
require.True(t, g.IsValid(1))
68+
require.True(t, g.IsValid(2))
69+
70+
require.PanicsWithValue(t,
71+
"cannot use reserved node as 'from' node",
72+
func() {
73+
g.AddTransition(-1, 1)
74+
}, "Using reserved nodes should panic")
75+
76+
require.PanicsWithValue(t,
77+
"cannot use reserved node as 'to' node",
78+
func() {
79+
g.AddTransition(1, -1)
80+
}, "Using reserved nodes should panic")
81+
82+
require.PanicsWithValue(t,
83+
"cannot use reserved node as 'from' node",
84+
func() {
85+
g.AddTransition(99, 1)
86+
}, "Using reserved nodes should panic")
87+
88+
require.PanicsWithValue(t,
89+
"cannot use reserved node as 'to' node",
90+
func() {
91+
g.AddTransition(1, 99)
92+
}, "Using reserved nodes should panic")
93+
}

run.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ func (r *Run[Type, Status]) Pause(ctx context.Context, reason string) (Status, e
2424
return 0, err
2525
}
2626

27-
return Status(SkipTypeRunStateUpdate), nil
27+
return Status(skipTypeRunStateUpdate), nil
2828
}
2929

3030
// Skip is a util function to skip the update and move on to the next event (consumer) or execution (callback)
3131
func (r *Run[Type, Status]) Skip() (Status, error) {
32-
return Status(SkipTypeDefault), nil
32+
return Status(skipTypeDefault), nil
3333
}
3434

3535
// Cancel is intended to be used inside a workflow process where (Status, error) are the return signature. This allows
@@ -41,7 +41,11 @@ func (r *Run[Type, Status]) Cancel(ctx context.Context, reason string) (Status,
4141
return 0, err
4242
}
4343

44-
return Status(SkipTypeRunStateUpdate), nil
44+
return Status(skipTypeRunStateUpdate), nil
45+
}
46+
47+
func (r *Run[Type, Status]) SaveAndRepeat() (Status, error) {
48+
return Status(skipTypeSaveAndRepeat), nil
4549
}
4650

4751
func buildRun[Type any, Status StatusType](store storeFunc, wr *Record) (*Run[Type, Status], error) {
Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,29 @@
1-
package workflow_test
1+
package workflow
22

33
import (
44
"testing"
55

66
"github.com/stretchr/testify/require"
7-
8-
"github.com/luno/workflow"
97
)
108

119
func TestNewTestingRun(t *testing.T) {
12-
r := workflow.NewTestingRun[string, status](t, workflow.Record{}, "test")
10+
r := NewTestingRun[string, testStatus](t, Record{}, "test")
1311
ctx := t.Context()
1412

1513
pauseStatus, err := r.Pause(ctx, "")
1614
require.NoError(t, err)
17-
require.Equal(t, status(workflow.SkipTypeRunStateUpdate), pauseStatus)
15+
require.Equal(t, testStatus(skipTypeRunStateUpdate), pauseStatus)
1816

1917
cancelStatus, err := r.Cancel(ctx, "")
2018
require.NoError(t, err)
21-
require.Equal(t, status(workflow.SkipTypeRunStateUpdate), cancelStatus)
19+
require.Equal(t, testStatus(skipTypeRunStateUpdate), cancelStatus)
2220
}
2321

2422
func TestNewTestingRun_requiresTestingParam(t *testing.T) {
2523
require.PanicsWithValue(t,
2624
"Cannot use NewTestingRun without *testing.T parameter",
2725
func() {
28-
_ = workflow.NewTestingRun[string, status](nil, workflow.Record{}, "test")
26+
_ = NewTestingRun[string, testStatus](nil, Record{}, "test")
2927
},
3028
)
3129
}

runstate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (rs RunState) Stopped() bool {
6868
}
6969
}
7070

71-
// RunStateController allows the interaction with a specific workflow record.
71+
// RunStateController allows the interaction with a specific workflow run.
7272
type RunStateController interface {
7373
// Pause will take the workflow run specified and move it into a temporary state where it will no longer be processed.
7474
// A paused workflow run can be resumed by calling Resume. ErrUnableToPause is returned when a workflow is not in a

runstate_internal_test.go

Lines changed: 102 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,117 @@ package workflow
22

33
import (
44
"context"
5+
"errors"
56
"testing"
67

78
"github.com/stretchr/testify/require"
89
)
910

10-
func TestNoopRunStateController(t *testing.T) {
11-
ctrl := testingRunStateController{}
11+
func TestTestingRunStateController(t *testing.T) {
12+
ctx := context.Background()
1213

13-
ctx := t.Context()
14-
err := ctrl.Pause(ctx, "")
15-
require.NoError(t, err)
14+
t.Run("methods return nil error when no functions set", func(t *testing.T) {
15+
ctrl := testingRunStateController{}
1616

17-
err = ctrl.Resume(ctx)
18-
require.NoError(t, err)
17+
err := ctrl.Pause(ctx, "test reason")
18+
require.NoError(t, err)
1919

20-
err = ctrl.Cancel(ctx, "")
21-
require.NoError(t, err)
20+
err = ctrl.Resume(ctx)
21+
require.NoError(t, err)
2222

23-
err = ctrl.DeleteData(ctx, "")
24-
require.NoError(t, err)
23+
err = ctrl.Cancel(ctx, "test reason")
24+
require.NoError(t, err)
25+
26+
err = ctrl.DeleteData(ctx, "test reason")
27+
require.NoError(t, err)
28+
29+
err = ctrl.SaveAndRepeat(ctx)
30+
require.NoError(t, err)
31+
})
32+
33+
t.Run("methods call provided functions", func(t *testing.T) {
34+
var pauseCalled, resumeCalled, cancelCalled, deleteDataCalled, saveAndRepeatCalled bool
35+
36+
ctrl := testingRunStateController{
37+
pause: func(ctx context.Context) error {
38+
pauseCalled = true
39+
return nil
40+
},
41+
resume: func(ctx context.Context) error {
42+
resumeCalled = true
43+
return nil
44+
},
45+
cancel: func(ctx context.Context) error {
46+
cancelCalled = true
47+
return nil
48+
},
49+
deleteData: func(ctx context.Context) error {
50+
deleteDataCalled = true
51+
return nil
52+
},
53+
saveAndRepeat: func(ctx context.Context) error {
54+
saveAndRepeatCalled = true
55+
return nil
56+
},
57+
}
58+
59+
err := ctrl.Pause(ctx, "test reason")
60+
require.NoError(t, err)
61+
require.True(t, pauseCalled)
62+
63+
err = ctrl.Resume(ctx)
64+
require.NoError(t, err)
65+
require.True(t, resumeCalled)
66+
67+
err = ctrl.Cancel(ctx, "test reason")
68+
require.NoError(t, err)
69+
require.True(t, cancelCalled)
70+
71+
err = ctrl.DeleteData(ctx, "test reason")
72+
require.NoError(t, err)
73+
require.True(t, deleteDataCalled)
74+
75+
err = ctrl.SaveAndRepeat(ctx)
76+
require.NoError(t, err)
77+
require.True(t, saveAndRepeatCalled)
78+
})
79+
80+
t.Run("methods propagate errors", func(t *testing.T) {
81+
testErr := errors.New("test error")
82+
83+
ctrl := testingRunStateController{
84+
pause: func(ctx context.Context) error {
85+
return testErr
86+
},
87+
resume: func(ctx context.Context) error {
88+
return testErr
89+
},
90+
cancel: func(ctx context.Context) error {
91+
return testErr
92+
},
93+
deleteData: func(ctx context.Context) error {
94+
return testErr
95+
},
96+
saveAndRepeat: func(ctx context.Context) error {
97+
return testErr
98+
},
99+
}
100+
101+
err := ctrl.Pause(ctx, "test reason")
102+
require.Equal(t, testErr, err)
103+
104+
err = ctrl.Resume(ctx)
105+
require.Equal(t, testErr, err)
106+
107+
err = ctrl.Cancel(ctx, "test reason")
108+
require.Equal(t, testErr, err)
109+
110+
err = ctrl.DeleteData(ctx, "test reason")
111+
require.Equal(t, testErr, err)
112+
113+
err = ctrl.SaveAndRepeat(ctx)
114+
require.Equal(t, testErr, err)
115+
})
25116
}
26117

27118
func TestRunStateControllerTransitions(t *testing.T) {

status.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,43 @@ type StatusType interface {
77
}
88

99
func skipUpdate[Status StatusType](status Status) bool {
10-
_, ok := skipConfig[SkipType(status)]
11-
return ok
10+
return skipType(status) == skipTypeDefault || skipType(status) == skipTypeRunStateUpdate
11+
}
12+
13+
func isSaveAndRepeat[Status StatusType](status Status) bool {
14+
return skipType(status) == skipTypeSaveAndRepeat
1215
}
1316

1417
func skipUpdateDescription[Status StatusType](status Status) string {
15-
description, ok := skipConfig[SkipType(status)]
18+
description, ok := skipConfig[skipType(status)]
1619
if !ok {
1720
return "Unknown skip reason '" + status.String() + "'"
1821
}
1922

2023
return description
2124
}
2225

23-
type SkipType int
26+
type skipType int
2427

2528
var (
26-
SkipTypeDefault SkipType = 0
27-
SkipTypeRunStateUpdate SkipType = -1
29+
skipTypeDefault skipType = 0
30+
skipTypeRunStateUpdate skipType = -1
31+
skipTypeSaveAndRepeat skipType = -2
32+
skipTypeSentinel skipType = -3
2833
)
2934

35+
func allSkipTypes() []int {
36+
var all []int
37+
for i := skipTypeDefault; i > skipTypeSentinel; i-- {
38+
all = append(all, int(i))
39+
}
40+
41+
return all
42+
}
43+
3044
// skipConfig holds the skip values and descriptions as documentation as to what they mean.
31-
var skipConfig = map[SkipType]string{
32-
SkipTypeDefault: "Zero status with nil error value should result in a skip",
33-
SkipTypeRunStateUpdate: "Internal run state update taken place. Skip normal newUpdater",
45+
var skipConfig = map[skipType]string{
46+
skipTypeDefault: "Zero status with nil error value should result in a skip",
47+
skipTypeRunStateUpdate: "Internal run state update taken place. Skip normal newUpdater",
48+
skipTypeSaveAndRepeat: "Save and repeat operation: record saved and re-queued for processing",
3449
}

0 commit comments

Comments
 (0)