This repository has been archived by the owner on Mar 31, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathenv.go
146 lines (128 loc) · 3.27 KB
/
env.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package well
import (
"context"
"sync"
"github.com/cybozu-go/log"
)
// Environment implements context-based goroutine management.
type Environment struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
generator *IDGenerator
mu sync.RWMutex
stopped bool
stopCh chan struct{}
canceled bool
err error
}
// NewEnvironment creates a new Environment.
//
// This does *not* install signal handlers for SIGINT/SIGTERM
// for new environments. Only the global environment will be
// canceled on these signals.
func NewEnvironment(ctx context.Context) *Environment {
ctx, cancel := context.WithCancel(ctx)
e := &Environment{
ctx: ctx,
cancel: cancel,
generator: NewIDGenerator(),
stopCh: make(chan struct{}),
}
return e
}
// Stop just declares no further Go will be called.
//
// Calling Stop is optional if and only if Cancel is guaranteed
// to be called at some point. For instance, if the program runs
// until SIGINT or SIGTERM, Stop is optional.
func (e *Environment) Stop() {
e.mu.Lock()
if !e.stopped {
e.stopped = true
close(e.stopCh)
}
e.mu.Unlock()
}
// Cancel cancels the base context.
//
// Passed err will be returned by Wait().
// Once canceled, Go() will not start new goroutines.
//
// Note that calling Cancel(nil) is perfectly valid.
// Unlike Stop(), Cancel(nil) cancels the base context and can
// gracefully stop goroutines started by Server.Serve or
// HTTPServer.ListenAndServe.
//
// This returns true if the caller is the first that calls Cancel.
// For second and later calls, Cancel does nothing and returns false.
func (e *Environment) Cancel(err error) bool {
e.mu.Lock()
defer e.mu.Unlock()
if e.canceled {
return false
}
e.canceled = true
e.err = err
e.cancel()
if e.stopped {
return true
}
e.stopped = true
close(e.stopCh)
return true
}
// Wait waits for Stop or Cancel, and for all goroutines started by
// Go to finish.
//
// The returned err is the one passed to Cancel, or nil.
// err can be tested by IsSignaled to determine whether the
// program got SIGINT or SIGTERM.
func (e *Environment) Wait() error {
<-e.stopCh
if log.Enabled(log.LvDebug) {
log.Debug("well: waiting for all goroutines to complete", nil)
}
e.wg.Wait()
e.cancel() // in case no one calls Cancel
e.mu.Lock()
defer e.mu.Unlock()
return e.err
}
// Go starts a goroutine that executes f.
//
// f takes a drived context from the base context. The context
// will be canceled when f returns.
//
// Goroutines started by this function will be waited for by
// Wait until all such goroutines return.
//
// If f returns non-nil error, Cancel is called immediately
// with that error.
//
// f should watch ctx.Done() channel and return quickly when the
// channel is closed.
func (e *Environment) Go(f func(ctx context.Context) error) {
e.mu.RLock()
if e.stopped {
e.mu.RUnlock()
return
}
e.wg.Add(1)
e.mu.RUnlock()
go func() {
ctx, cancel := context.WithCancel(e.ctx)
defer cancel()
err := f(ctx)
if err != nil {
e.Cancel(err)
}
e.wg.Done()
}()
}
// GoWithID calls Go with a context having a new request tracking ID.
func (e *Environment) GoWithID(f func(ctx context.Context) error) {
e.Go(func(ctx context.Context) error {
return f(WithRequestID(ctx, e.generator.Generate()))
})
}