Skip to content

Commit 2c540c9

Browse files
committed
update: Allow save and repeat for standard steps
1 parent 12ec56b commit 2c540c9

File tree

8 files changed

+135
-17
lines changed

8 files changed

+135
-17
lines changed

record.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package workflow
22

3-
import "time"
3+
import (
4+
"strconv"
5+
"time"
6+
)
47

58
// Record is the cornerstone of Workflow. Record must always be wire compatible with no generics as it's intended
69
// purpose is to be the persisted data structure of a Run.
@@ -74,6 +77,32 @@ type Meta struct {
7477
// It provides a line trace or path that helps track where the execution was initiated,
7578
// offering context for debugging or auditing purposes by capturing the origin of the workflow trigger.
7679
TraceOrigin string
80+
81+
// UpdateType indicates the type of update that is performed relating to the state being updated in the RecordStore
82+
// and whether it results in a new event being created. UpdateType is useful for understanding the nature of
83+
// the change made to the record and the intended consequences of that change.
84+
UpdateType UpdateType
85+
}
86+
87+
type UpdateType int
88+
89+
const (
90+
// UpdateTypeDefault indicates a standard update to the record that results in the state being updated and a new
91+
// corresponding event is created.
92+
UpdateTypeDefault UpdateType = 0
93+
// UpdateTypeStateOnly indicates an update that only modifies the RunState of the record without creating a new event.
94+
UpdateTypeStateOnly = 1
95+
)
96+
97+
func (u UpdateType) String() string {
98+
switch u {
99+
case UpdateTypeDefault:
100+
return "Default"
101+
case UpdateTypeStateOnly:
102+
return "State Only"
103+
default:
104+
return "UpdateType(" + strconv.FormatInt(int64(u), 10) + ")"
105+
}
77106
}
78107

79108
// TypedRecord differs from Record in that it contains a Typed Object and Typed Status

run.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ func (r *Run[Type, Status]) Cancel(ctx context.Context, reason string) (Status,
4444
return Status(SkipTypeRunStateUpdate), nil
4545
}
4646

47+
func (r *Run[Type, Status]) SaveAndRepeat() (Status, error) {
48+
return Status(SkipTypeSaveAndRepeat), nil
49+
}
50+
4751
func buildRun[Type any, Status StatusType](store storeFunc, wr *Record) (*Run[Type, Status], error) {
4852
var t Type
4953
err := Unmarshal(wr.Object, &t)

runstate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (rs RunState) Stopped() bool {
6868
}
6969
}
7070

71-
// RunStateController allows the interaction with a specific workflow record.
71+
// RunStateController allows the interaction with a specific workflow run.
7272
type RunStateController interface {
7373
// Pause will take the workflow run specified and move it into a temporary state where it will no longer be processed.
7474
// A paused workflow run can be resumed by calling Resume. ErrUnableToPause is returned when a workflow is not in a

status.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ func skipUpdate[Status StatusType](status Status) bool {
1111
return ok
1212
}
1313

14+
func isSaveAndRepeat[Status StatusType](status Status) bool {
15+
return SkipType(status) == SkipTypeSaveAndRepeat
16+
}
17+
1418
func skipUpdateDescription[Status StatusType](status Status) string {
1519
description, ok := skipConfig[SkipType(status)]
1620
if !ok {
@@ -25,6 +29,7 @@ type SkipType int
2529
var (
2630
SkipTypeDefault SkipType = 0
2731
SkipTypeRunStateUpdate SkipType = -1
32+
SkipTypeSaveAndRepeat SkipType = -2
2833
)
2934

3035
// skipConfig holds the skip values and descriptions as documentation as to what they mean.

testing.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ func TriggerCallbackOn[Type any, Status StatusType, Payload any](
2828
panic("*workflow.Workflow required for testing utility functions")
2929
}
3030

31-
_ = waitFor(t, w, foreignID, func(r *Record) (bool, error) {
31+
_, err := waitFor(t, w, foreignID, func(r *Record) (bool, error) {
3232
return r.Status == int(waitForStatus), nil
3333
})
34+
require.NoError(t, err)
3435

3536
b, err := json.Marshal(p)
3637
require.NoError(t, err)
@@ -109,12 +110,13 @@ func Require[Type any, Status StatusType](
109110
return
110111
}
111112

112-
wr := waitFor(t, w, foreignID, func(r *Record) (bool, error) {
113+
wr, err := waitFor(t, w, foreignID, func(r *Record) (bool, error) {
113114
return r.Status == int(waitForStatus), nil
114115
})
116+
require.NoError(t, err)
115117

116118
var actual Type
117-
err := Unmarshal(wr.Object, &actual)
119+
err = Unmarshal(wr.Object, &actual)
118120
require.NoError(t, err)
119121

120122
// Due to nuances in encoding libraries such as json with the ability to implement custom
@@ -147,27 +149,28 @@ func WaitFor[Type any, Status StatusType](
147149
panic("*workflow.Workflow required for testing utility functions")
148150
}
149151

150-
waitFor(t, w, foreignID, func(r *Record) (bool, error) {
152+
_, err := waitFor(t, w, foreignID, func(r *Record) (bool, error) {
151153
run, err := buildRun[Type, Status](w.recordStore.Store, r)
152154
require.NoError(t, err)
153155

154156
return fn(run)
155157
})
158+
require.NoError(t, err)
156159
}
157160

158161
func waitFor[Type any, Status StatusType](
159162
t testing.TB,
160163
w *Workflow[Type, Status],
161164
foreignID string,
162165
fn func(r *Record) (bool, error),
163-
) *Record {
166+
) (*Record, error) {
164167
testingStore, ok := w.recordStore.(TestingRecordStore)
165168
if !ok {
166169
panic("TestingRecordStore implementation for record store dependency required")
167170
}
168171

169172
var runID string
170-
for runID == "" {
173+
for runID == "" && w.ctx.Err() == nil {
171174
latest, err := w.recordStore.Latest(context.Background(), w.Name(), foreignID)
172175
if errors.Is(err, ErrRecordNotFound) {
173176
continue
@@ -182,7 +185,7 @@ func waitFor[Type any, Status StatusType](
182185
// testingStore.SetSnapshotOffset(w.name, foreignID, runID, 0)
183186

184187
var wr Record
185-
for wr.RunID == "" {
188+
for wr.RunID == "" && w.ctx.Err() == nil {
186189
snapshots := testingStore.Snapshots(w.Name(), foreignID, runID)
187190
for _, r := range snapshots {
188191
ok, err := fn(r)
@@ -194,7 +197,7 @@ func waitFor[Type any, Status StatusType](
194197
}
195198
}
196199

197-
return &wr
200+
return &wr, w.ctx.Err()
198201
}
199202

200203
// NewTestingRun should be used when testing logic that defines a workflow.Run as a parameter. This is usually the
@@ -255,10 +258,11 @@ func WithDeleteDataFn(deleteData func(ctx context.Context) error) TestingRunOpti
255258
}
256259

257260
type testingRunStateController struct {
258-
pause func(ctx context.Context) error
259-
cancel func(ctx context.Context) error
260-
resume func(ctx context.Context) error
261-
deleteData func(ctx context.Context) error
261+
pause func(ctx context.Context) error
262+
cancel func(ctx context.Context) error
263+
resume func(ctx context.Context) error
264+
deleteData func(ctx context.Context) error
265+
saveAndRepeat func(ctx context.Context) error
262266
}
263267

264268
func (c *testingRunStateController) Pause(ctx context.Context, reason string) error {
@@ -293,4 +297,12 @@ func (c *testingRunStateController) DeleteData(ctx context.Context, reason strin
293297
return c.deleteData(ctx)
294298
}
295299

300+
func (c *testingRunStateController) SaveAndRepeat(ctx context.Context) error {
301+
if c.saveAndRepeat == nil {
302+
return nil
303+
}
304+
305+
return c.saveAndRepeat(ctx)
306+
}
307+
296308
var _ RunStateController = (*testingRunStateController)(nil)

update.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ func newUpdater[Type any, Status StatusType](
5858
return nil
5959
}
6060

61+
// Save and repeat skips transition validation and keeps the current status.
62+
if isSaveAndRepeat(next) {
63+
updatedRecord.Status = int(current)
64+
return updateRecord(ctx, store, updatedRecord, record.RunState, current.String())
65+
}
66+
6167
err = validateTransition(current, next, graph)
6268
if err != nil {
6369
return err

workflow.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,9 @@ func runOnce(
272272
if errors.Is(err, context.Canceled) {
273273
// Exit cleanly if error returned is cancellation of context
274274
return err
275+
} else if errors.Is(err, context.DeadlineExceeded) {
276+
// Exit cleanly if context's deadline was exceeded
277+
return err
275278
} else if err != nil {
276279
logger.Error(ctx, fmt.Errorf("run error [role=%s], [process=%s]: %v", role, processName, err))
277280

@@ -287,6 +290,9 @@ func runOnce(
287290
// Context can be cancelled by the role scheduler and thus return nil to attempt to gain the role again
288291
// and if the parent context was cancelled then that will exit safely.
289292
return nil
293+
} else if errors.Is(err, context.DeadlineExceeded) {
294+
// Exit cleanly if context's deadline was exceeded
295+
return err
290296
} else if err != nil {
291297
logger.Error(ctx, fmt.Errorf("run error [role=%s], [process=%s]: %v", role, processName, err))
292298
metrics.ProcessErrors.WithLabelValues(workflowName, processName).Inc()

workflow_test.go

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,7 @@ func BenchmarkWorkflow(b *testing.B) {
231231

232232
func benchmarkWorkflow(b *testing.B, numberOfSteps int) {
233233
ctx, cancel := context.WithCancel(context.Background())
234-
b.Cleanup(func() {
235-
cancel()
236-
})
234+
b.Cleanup(cancel)
237235

238236
bldr := workflow.NewBuilder[MyType, status]("benchmark")
239237

@@ -729,3 +727,61 @@ func TestExpectedProcesses_outboxDisabled(t *testing.T) {
729727
require.Truef(t, expected[process], "process '%s' is missing expected value", process)
730728
}
731729
}
730+
731+
func TestSaveAndRepeat(t *testing.T) {
732+
iterations := 100
733+
ctx, cancel := context.WithCancel(context.Background())
734+
t.Cleanup(cancel)
735+
736+
type Custom struct {
737+
Count int
738+
Collection []string
739+
}
740+
741+
bldr := workflow.NewBuilder[Custom, status]("save and repeat")
742+
bldr.AddStep(StatusStart, func(ctx context.Context, r *workflow.Run[Custom, status]) (status, error) {
743+
r.Object.Count++
744+
r.Object.Collection = append(r.Object.Collection, fmt.Sprintf("item-%d", r.Object.Count))
745+
746+
if r.Object.Count < iterations {
747+
return r.SaveAndRepeat()
748+
}
749+
750+
return StatusEnd, nil
751+
}, StatusEnd)
752+
753+
recordStore := memrecordstore.New()
754+
clock := clock_testing.NewFakeClock(time.Now())
755+
wf := bldr.Build(
756+
memstreamer.New(),
757+
recordStore,
758+
memrolescheduler.New(),
759+
workflow.WithClock(clock),
760+
workflow.WithDefaultOptions(
761+
workflow.PollingFrequency(time.Nanosecond),
762+
),
763+
workflow.WithOutboxOptions(
764+
workflow.OutboxPollingFrequency(time.Nanosecond),
765+
),
766+
workflow.WithDebugMode(),
767+
)
768+
769+
wf.Run(ctx)
770+
t.Cleanup(wf.Stop)
771+
772+
fid := "custom-save-and-repeat"
773+
_, err := wf.Trigger(ctx, fid)
774+
if err != nil {
775+
t.Fatal(err)
776+
}
777+
778+
var expected []string
779+
for i := range iterations {
780+
expected = append(expected, fmt.Sprintf("item-%d", i+1))
781+
}
782+
783+
workflow.Require(t, wf, fid, StatusEnd, Custom{
784+
Count: iterations,
785+
Collection: expected,
786+
})
787+
}

0 commit comments

Comments
 (0)