-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
instancer.go
187 lines (163 loc) · 4.39 KB
/
instancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package consul
import (
"errors"
"fmt"
"time"
consul "github.com/hashicorp/consul/api"
"github.com/go-kit/kit/sd"
"github.com/go-kit/kit/sd/internal/instance"
"github.com/go-kit/kit/util/conn"
"github.com/go-kit/log"
)
const defaultIndex = 0
// errStopped notifies the loop to quit. aka stopped via quitc
var errStopped = errors.New("quit and closed consul instancer")
// Instancer yields instances for a service in Consul.
type Instancer struct {
cache *instance.Cache
client Client
logger log.Logger
service string
tags []string
passingOnly bool
quitc chan struct{}
}
// NewInstancer returns a Consul instancer that publishes instances for the
// requested service. It only returns instances for which all of the passed tags
// are present.
func NewInstancer(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Instancer {
s := &Instancer{
cache: instance.NewCache(),
client: client,
logger: log.With(logger, "service", service, "tags", fmt.Sprint(tags)),
service: service,
tags: tags,
passingOnly: passingOnly,
quitc: make(chan struct{}),
}
instances, index, err := s.getInstances(defaultIndex, nil)
if err == nil {
s.logger.Log("instances", len(instances))
} else {
s.logger.Log("err", err)
}
s.cache.Update(sd.Event{Instances: instances, Err: err})
go s.loop(index)
return s
}
// Stop terminates the instancer.
func (s *Instancer) Stop() {
close(s.quitc)
}
func (s *Instancer) loop(lastIndex uint64) {
var (
instances []string
err error
d time.Duration = 10 * time.Millisecond
index uint64
)
for {
instances, index, err = s.getInstances(lastIndex, s.quitc)
switch {
case errors.Is(err, errStopped):
return // stopped via quitc
case err != nil:
s.logger.Log("err", err)
time.Sleep(d)
d = conn.Exponential(d)
s.cache.Update(sd.Event{Err: err})
case index == defaultIndex:
s.logger.Log("err", "index is not sane")
time.Sleep(d)
d = conn.Exponential(d)
case index < lastIndex:
s.logger.Log("err", "index is less than previous; resetting to default")
lastIndex = defaultIndex
time.Sleep(d)
d = conn.Exponential(d)
default:
lastIndex = index
s.cache.Update(sd.Event{Instances: instances})
d = 10 * time.Millisecond
}
}
}
func (s *Instancer) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error) {
tag := ""
if len(s.tags) > 0 {
tag = s.tags[0]
}
// Consul doesn't support more than one tag in its service query method.
// https://github.com/hashicorp/consul/issues/294
// Hashi suggest prepared queries, but they don't support blocking.
// https://www.consul.io/docs/agent/http/query.html#execute
// If we want blocking for efficiency, we must filter tags manually.
type response struct {
instances []string
index uint64
}
var (
errc = make(chan error, 1)
resc = make(chan response, 1)
)
go func() {
entries, meta, err := s.client.Service(s.service, tag, s.passingOnly, &consul.QueryOptions{
WaitIndex: lastIndex,
})
if err != nil {
errc <- err
return
}
if len(s.tags) > 1 {
entries = filterEntries(entries, s.tags[1:]...)
}
resc <- response{
instances: makeInstances(entries),
index: meta.LastIndex,
}
}()
select {
case err := <-errc:
return nil, 0, err
case res := <-resc:
return res.instances, res.index, nil
case <-interruptc:
return nil, 0, errStopped
}
}
// Register implements Instancer.
func (s *Instancer) Register(ch chan<- sd.Event) {
s.cache.Register(ch)
}
// Deregister implements Instancer.
func (s *Instancer) Deregister(ch chan<- sd.Event) {
s.cache.Deregister(ch)
}
func filterEntries(entries []*consul.ServiceEntry, tags ...string) []*consul.ServiceEntry {
var es []*consul.ServiceEntry
ENTRIES:
for _, entry := range entries {
ts := make(map[string]struct{}, len(entry.Service.Tags))
for _, tag := range entry.Service.Tags {
ts[tag] = struct{}{}
}
for _, tag := range tags {
if _, ok := ts[tag]; !ok {
continue ENTRIES
}
}
es = append(es, entry)
}
return es
}
func makeInstances(entries []*consul.ServiceEntry) []string {
instances := make([]string, len(entries))
for i, entry := range entries {
addr := entry.Node.Address
if entry.Service.Address != "" {
addr = entry.Service.Address
}
instances[i] = fmt.Sprintf("%s:%d", addr, entry.Service.Port)
}
return instances
}