-
Notifications
You must be signed in to change notification settings - Fork 7
/
run.go
72 lines (60 loc) · 2.5 KB
/
run.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package workflow
import (
"context"
)
// Run is a representation of a workflow run. It incorporates all the fields from the Record as well as
// having defined types for the Status and Object fields along with access to the RunStateController which
// controls the state of the run aka "RunState".
type Run[Type any, Status StatusType] struct {
TypedRecord[Type, Status]
// stopper provides controls over the run state of the record. Run is not serializable and is not
// intended to be and thus Record exists as a serializable representation of a record.
controller RunStateController
}
// Pause is intended to be used inside a workflow process where (Status, error) are the return signature. This allows
// the user to simply type "return r.Pause(ctx)" to pause a record from inside a workflow which results in the record
// being temporarily left alone and will not be processed until it is resumed.
func (r *Run[Type, Status]) Pause(ctx context.Context) (Status, error) {
err := r.controller.Pause(ctx)
if err != nil {
return 0, err
}
return Status(SkipTypeRunStateUpdate), nil
}
// Skip is a util function to skip the update and move on to the next event (consumer) or execution (callback)
func (r *Run[Type, Status]) Skip() (Status, error) {
return Status(SkipTypeDefault), nil
}
// Cancel is intended to be used inside a workflow process where (Status, error) are the return signature. This allows
// the user to simply type "return r.Cancel(ctx)" to cancel a record from inside a workflow which results in the record
// being permanently left alone and will not be processed.
func (r *Run[Type, Status]) Cancel(ctx context.Context) (Status, error) {
err := r.controller.Cancel(ctx)
if err != nil {
return 0, err
}
return Status(SkipTypeRunStateUpdate), nil
}
func buildRun[Type any, Status StatusType](store storeFunc, wr *Record) (*Run[Type, Status], error) {
var t Type
err := Unmarshal(wr.Object, &t)
if err != nil {
return nil, err
}
// The first time the record is consumed, it needs to be marked as RunStateRunning to represent that the record
// has begun being processed. Even if the consumer errors then this should update should remain in place and
// not be executed on the subsequent retries.
if wr.RunState == RunStateInitiated {
wr.RunState = RunStateRunning
}
controller := NewRunStateController(store, wr)
record := Run[Type, Status]{
TypedRecord: TypedRecord[Type, Status]{
Record: *wr,
Status: Status(wr.Status),
Object: &t,
},
controller: controller,
}
return &record, nil
}