-
Notifications
You must be signed in to change notification settings - Fork 0
/
runner.go
138 lines (117 loc) · 3.79 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: Runner interfaces
package plumber
import (
"context"
)
// Runner describes basic runnable unit. Runner can be started.
// This interface is used as a common denominator used in api but,
// it is more that recommended to implement other interface Closable
// and optionally Readyable.
type Runner interface {
Run(ctx context.Context) error
}
// Readier describes Runner that can signal whether it is ready.
// This is useful when Runners needs to be execute with the Pipeline sequentially.
type Readier interface {
Ready() (<-chan struct{}, error)
}
// Closeable describes Runner that can be graceful closed. Close method must be idempotent.
// Once called runner is required exit from main Run method within defined duration otherwise run context will be canceled.
// Close can block for configured duration. When exceeded close context is canceled
type Closeable interface {
Close(ctx context.Context) error
}
// ErrorNotifier describes Runner that can report that error has occurred before it returns from the main Run method.
// This is signal might bubble up the execution tree and on top it might be used to start Close sequence.
type ErrorNotifier interface {
Errored() <-chan struct{}
}
// SmartRunner implements all interfaces that makes the runner good citizen
type SmartRunner interface {
Runner
Readier
Closeable
}
// RunnerOptions holds runner optional callbacks
type RunnerOptions struct {
close func(ctx context.Context) error
ready func() <-chan struct{}
}
func (o *RunnerOptions) apply(opts ...RunnerOption) {
o.ready = func() <-chan struct{} {
return closedCh
}
for _, op := range opts {
op(o)
}
}
// RunnerOption is option pattern function
type RunnerOption func(*RunnerOptions)
func WithReady(s *Signal) RunnerOption {
return func(ro *RunnerOptions) {
ro.ready = func() <-chan struct{} {
return s.C()
}
}
}
func WithClose(closeFunc func(context.Context) error) RunnerOption {
return func(ro *RunnerOptions) {
ro.close = closeFunc
}
}
// runner represent a struct that complies with Runner interfaces
type runner struct {
options RunnerOptions
run func(ctx context.Context) error
}
// NewRunner returns an instance of the runner. Optionally supplied options might redefine other Runner method Close and Ready
func NewRunner(run func(ctx context.Context) error, opts ...RunnerOption) SmartRunner {
r := &runner{
run: run,
}
r.options.apply(opts...)
return r
}
// Ready signals that runner is ready
func (r *runner) Ready() (<-chan struct{}, error) {
return r.options.ready(), nil
}
// Run executes a task
func (r *runner) Run(ctx context.Context) error {
return r.run(ctx)
}
// Close runner. Once called runner is required to exit from main Run method
// within defined duration otherwise run context will be canceled.
// Close can block for configured duration. When exceeded close context is canceled
func (r *runner) Close(ctx context.Context) error {
if r.options.close != nil {
return r.options.close(ctx)
}
return nil
}
// GracefulRunner is runner supporting Run and Close methods
func GracefulRunner(run, closeFn func(ctx context.Context) error) Runner {
return NewRunner(run, WithClose(closeFn))
}
// closedCh is a ready made closed channel
var closedCh = func() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}()
// RunnerReady return channel that can be used to check if runner is ready.
// When channel is closed runner can be considered ready.
func RunnerReady(runner Runner) (<-chan struct{}, error) {
if r, ok := runner.(Readier); ok {
return r.Ready()
}
return closedCh, nil
}
// RunnerClose calls Close method on given Runner when supported
func RunnerClose(ctx context.Context, runner Runner) error {
if r, ok := runner.(Closeable); ok {
return r.Close(ctx)
}
return nil
}