This repository has been archived by the owner on Jun 20, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 672
/
annotations.go
154 lines (139 loc) · 4.86 KB
/
annotations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/*
In order to keep track of active weave peers, we use annotations on the Kubernetes cluster.
Kubernetes uses etcd to distribute and synchronise these annotations so we don't have to.
*/
package main
import (
"log"
"time"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
kubeErrors "k8s.io/apimachinery/pkg/api/errors"
api "k8s.io/apimachinery/pkg/apis/meta/v1"
wait "k8s.io/apimachinery/pkg/util/wait"
kubernetes "k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
)
type configMapAnnotations struct {
ConfigMapName string
Namespace string
Client corev1client.ConfigMapsGetter
cm *v1.ConfigMap
}
func newConfigMapAnnotations(ns string, configMapName string, c kubernetes.Interface) *configMapAnnotations {
return &configMapAnnotations{
Namespace: ns,
ConfigMapName: configMapName,
Client: c.CoreV1(),
}
}
const (
retryPeriod = time.Second * 2
jitterFactor = 1.0
// Prefix all our annotation keys with this string so they don't clash with anyone else's
KubePeersPrefix = "kube-peers.weave.works/"
// KubePeersAnnotationKey is the default annotation key
KubePeersAnnotationKey = KubePeersPrefix + "peers"
)
func (cml *configMapAnnotations) Init() error {
for {
// Since it's potentially racy to GET, then CREATE if not found, we wrap in a check loop
// so that if the configmap is created after our GET but before or CREATE, we'll gracefully
// re-try to get the configmap.
var err error
cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Get(cml.ConfigMapName, api.GetOptions{})
if err != nil {
if !kubeErrors.IsNotFound(err) {
return errors.Wrapf(err, "Unable to fetch ConfigMap %s/%s", cml.Namespace, cml.ConfigMapName)
}
cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Create(&v1.ConfigMap{
ObjectMeta: api.ObjectMeta{
Name: cml.ConfigMapName,
Namespace: cml.Namespace,
},
})
if err != nil {
if kubeErrors.IsAlreadyExists(err) {
continue
}
return errors.Wrapf(err, "Unable to create ConfigMap %s/%s", cml.Namespace, cml.ConfigMapName)
}
}
break
}
if cml.cm.Annotations == nil {
cml.cm.Annotations = make(map[string]string)
}
return nil
}
// Clean up a string so it meets the Kubernetes requiremements for Annotation keys:
// name part must consist of alphanumeric characters, '-', '_' or '.', and must
// start and end with an alphanumeric character (e.g. 'MyName', or 'my.name', or '123-abc')
func cleanKey(key string) string {
buf := []byte(key)
for i, c := range buf {
if (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.' || c == '/' {
continue
}
buf[i] = '_'
}
return string(buf)
}
func (cml *configMapAnnotations) GetAnnotation(key string) (string, bool) {
value, ok := cml.cm.Annotations[cleanKey(key)]
return value, ok
}
func (cml *configMapAnnotations) UpdateAnnotation(key, value string) (err error) {
if cml.cm == nil {
return errors.New("endpoint not initialized, call Init first")
}
// speculatively change the state, then replace with whatever comes back
// from Update(), which will be the latest on the server whatever happened
cml.cm.Annotations[cleanKey(key)] = value
cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm)
return err
}
func (cml *configMapAnnotations) RemoveAnnotation(key string) (err error) {
if cml.cm == nil {
return errors.New("endpoint not initialized, call Init first")
}
// speculatively change the state, then replace with whatever comes back
// from Update(), which will be the latest on the server whatever happened
delete(cml.cm.Annotations, cleanKey(key))
cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm)
return err
}
func (cml *configMapAnnotations) RemoveAnnotationsWithValue(valueToRemove string) (err error) {
if cml.cm == nil {
return errors.New("endpoint not initialized, call Init first")
}
// speculatively change the state, then replace with whatever comes back
// from Update(), which will be the latest on the server whatever happened
for key, value := range cml.cm.Annotations {
if value == valueToRemove {
delete(cml.cm.Annotations, key) // don't need to clean this key as it came from the map
}
}
cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm)
return err
}
// Loop with jitter, fetching the cml data and calling f() until it
// doesn't get an optimistic locking conflict.
// If it succeeds or gets any other kind of error, stop the loop.
func (cml *configMapAnnotations) LoopUpdate(f func() error) error {
stop := make(chan struct{})
var err error
wait.JitterUntil(func() {
if err = cml.Init(); err != nil {
close(stop)
return
}
err = f()
if err != nil && kubeErrors.IsConflict(err) {
log.Printf("Optimistic locking conflict: trying again: %s", err)
return
}
close(stop)
}, retryPeriod, jitterFactor, true, stop)
return err
}