-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
registrar.go
120 lines (105 loc) · 3.04 KB
/
registrar.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package etcd
import (
"sync"
"time"
etcd "go.etcd.io/etcd/client/v2"
"github.com/go-kit/log"
)
const minHeartBeatTime = 500 * time.Millisecond
// Registrar registers service instance liveness information to etcd.
type Registrar struct {
client Client
service Service
logger log.Logger
quitmtx sync.Mutex
quit chan struct{}
}
// Service holds the instance identifying data you want to publish to etcd. Key
// must be unique, and value is the string returned to subscribers, typically
// called the "instance" string in other parts of package sd.
type Service struct {
Key string // unique key, e.g. "/service/foobar/1.2.3.4:8080"
Value string // returned to subscribers, e.g. "http://1.2.3.4:8080"
TTL *TTLOption
DeleteOptions *etcd.DeleteOptions
}
// TTLOption allow setting a key with a TTL. This option will be used by a loop
// goroutine which regularly refreshes the lease of the key.
type TTLOption struct {
heartbeat time.Duration // e.g. time.Second * 3
ttl time.Duration // e.g. time.Second * 10
}
// NewTTLOption returns a TTLOption that contains proper TTL settings. Heartbeat
// is used to refresh the lease of the key periodically; its value should be at
// least 500ms. TTL defines the lease of the key; its value should be
// significantly greater than heartbeat.
//
// Good default values might be 3s heartbeat, 10s TTL.
func NewTTLOption(heartbeat, ttl time.Duration) *TTLOption {
if heartbeat <= minHeartBeatTime {
heartbeat = minHeartBeatTime
}
if ttl <= heartbeat {
ttl = 3 * heartbeat
}
return &TTLOption{
heartbeat: heartbeat,
ttl: ttl,
}
}
// NewRegistrar returns a etcd Registrar acting on the provided catalog
// registration (service).
func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar {
return &Registrar{
client: client,
service: service,
logger: log.With(logger, "key", service.Key, "value", service.Value),
}
}
// Register implements the sd.Registrar interface. Call it when you want your
// service to be registered in etcd, typically at startup.
func (r *Registrar) Register() {
if err := r.client.Register(r.service); err != nil {
r.logger.Log("err", err)
} else {
r.logger.Log("action", "register")
}
if r.service.TTL != nil {
go r.loop()
}
}
func (r *Registrar) loop() {
r.quitmtx.Lock()
if r.quit != nil {
return // already running
}
r.quit = make(chan struct{})
r.quitmtx.Unlock()
tick := time.NewTicker(r.service.TTL.heartbeat)
defer tick.Stop()
for {
select {
case <-tick.C:
if err := r.client.Register(r.service); err != nil {
r.logger.Log("err", err)
}
case <-r.quit:
return
}
}
}
// Deregister implements the sd.Registrar interface. Call it when you want your
// service to be deregistered from etcd, typically just prior to shutdown.
func (r *Registrar) Deregister() {
if err := r.client.Deregister(r.service); err != nil {
r.logger.Log("err", err)
} else {
r.logger.Log("action", "deregister")
}
r.quitmtx.Lock()
defer r.quitmtx.Unlock()
if r.quit != nil {
close(r.quit)
r.quit = nil
}
}