forked from alibaba-archive/confl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwatcher.go
171 lines (143 loc) · 3.4 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package confl
import (
"encoding/json"
"fmt"
"log"
"os"
"reflect"
"sync"
"github.com/kelseyhightower/envconfig"
"github.com/teambition/confl/etcd"
"github.com/teambition/confl/vault"
)
var (
DefaultChangeChan = 10
ConfPathEnv = "CONFL_CONF_PATH"
ErrorNoConfPath = fmt.Errorf("required env %s missing value", ConfPathEnv)
)
var (
defautlOnError = func(err error) {
if err != nil {
log.Println(err)
}
}
)
// Hook hook type
// When configuration updates, then pass the copy of configuration to it
type Hook func(config interface{})
// Watcher manage the watch states
type Watcher struct {
sync.Mutex
confPath string
// c the config struct user defined
c interface{}
etcd *etcd.Client
changeCh chan struct{}
hooks []Hook
onError func(error)
}
// NewFromEnv create a config watcher from env
func NewFromEnv(c interface{}, onError func(error)) (*Watcher, error) {
if onError == nil {
onError = defautlOnError
}
confPath, _ := os.LookupEnv(ConfPathEnv)
var (
err error
etcdConf = &etcd.Config{}
vaultConf = &vault.Config{}
)
if err = envconfig.Process("", etcdConf); err != nil {
return nil, err
}
if err = envconfig.Process("", vaultConf); err != nil {
return nil, err
}
return New(c, confPath, etcdConf, vaultConf, onError)
}
func New(c interface{}, confPath string, etcdConf *etcd.Config, vaultConf *vault.Config, onError func(error)) (*Watcher, error) {
var err error
if confPath == "" {
return nil, ErrorNoConfPath
}
if onError == nil {
onError = defautlOnError
}
if etcdConf.OnError == nil {
etcdConf.OnError = onError
}
if vaultConf.OnError == nil {
vaultConf.OnError = onError
}
w := &Watcher{
c: c,
confPath: confPath,
changeCh: make(chan struct{}, DefaultChangeChan),
hooks: []Hook{},
onError: onError,
}
if w.etcd, err = etcd.NewClient(etcdConf); err != nil {
return nil, err
}
vaultConf.ChangeCh = w.changeCh
if err = vault.Init(vaultConf); err != nil {
return nil, err
}
if err = w.loadConfig(); err != nil {
return nil, err
}
return w, nil
}
// Config return the copy of w.c
// Example:
// cfg := w.Config().(MyConfigStruct)
func (w *Watcher) Config() interface{} {
val := reflect.ValueOf(w.c)
if val.Kind() == reflect.Ptr {
val = reflect.Indirect(val)
}
return val.Interface()
}
// AddHook add hooks for the update events of configuration
func (w *Watcher) AddHook(hooks ...Hook) {
w.Lock()
w.hooks = append(w.hooks, hooks...)
w.Unlock()
}
// GoWatch start watch the update events
// It is blocked until the watcher is closed
func (w *Watcher) GoWatch() {
go w.etcd.WatchKey(w.confPath, w.changeCh)
w.procHooks()
}
// Close close the watcher
// Change channel must be closed finally in case of panic
func (w *Watcher) Close() error {
w.etcd.Close()
vault.Close()
close(w.changeCh)
return nil
}
// loadConfig load configuration from etcd by given conf_path
func (w *Watcher) loadConfig() error {
v, err := w.etcd.Key(w.confPath)
if err != nil {
return err
}
// now configuration only support json type
return json.Unmarshal([]byte(v), w.c)
}
// procHooks reloads config and runs the hooks when the watched value has changed
func (w *Watcher) procHooks() {
for range w.changeCh {
if err := w.loadConfig(); err != nil {
w.onError(err)
continue
}
// hooks must be called one by one
// bcs there may be dependencies
for _, hook := range w.hooks {
hook(w.Config())
}
}
}