-
Notifications
You must be signed in to change notification settings - Fork 799
/
peers.go
88 lines (77 loc) · 1.93 KB
/
peers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package alertmanager
import (
"net"
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
)
var (
srvRequests = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "srv_lookup_requests_total",
Help: "Total number of SRV requests.",
})
srvRequestFailures = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "srv_lookup_request_failures_total",
Help: "Total number of failed SRV requests.",
})
)
func init() {
prometheus.MustRegister(srvRequests)
prometheus.MustRegister(srvRequestFailures)
}
// TODO: change memcache_client to use this.
// srvDiscovery discovers SRV services.
type srvDiscovery struct {
service string
hostname string
pollInterval time.Duration
addresses chan []*net.SRV
stop chan struct{}
done chan struct{}
}
// newSRVDiscovery makes a new srvDiscovery.
func newSRVDiscovery(service, hostname string, pollInterval time.Duration) *srvDiscovery {
disco := &srvDiscovery{
service: service,
hostname: hostname,
pollInterval: pollInterval,
addresses: make(chan []*net.SRV),
stop: make(chan struct{}),
done: make(chan struct{}),
}
go disco.loop()
return disco
}
// Stop the srvDiscovery
func (s *srvDiscovery) Stop() {
close(s.stop)
<-s.done
}
func (s *srvDiscovery) updatePeers() {
var addrs []*net.SRV
_, addrs, err := net.LookupSRV(s.service, "tcp", s.hostname)
srvRequests.Inc()
if err != nil {
srvRequestFailures.Inc()
level.Warn(util.Logger).Log("msg", "error discovering services for hostname", "service", s.service, "hostname", s.hostname, "err", err)
} else {
s.addresses <- addrs
}
}
func (s *srvDiscovery) loop() {
defer close(s.done)
ticker := time.NewTicker(s.pollInterval)
s.updatePeers()
for {
select {
case <-ticker.C:
s.updatePeers()
case <-s.stop:
ticker.Stop()
return
}
}
}