Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Fix ipam reclaim #3192

Merged
merged 3 commits into from
Dec 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 38 additions & 23 deletions prog/kube-peers/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ const (
retryPeriod = time.Second * 2
jitterFactor = 1.0

// Prefix all our annotation keys with this string so they don't clash with anyone else's
KubePeersPrefix = "kube-peers.weave.works/"
// KubePeersAnnotationKey is the default annotation key
KubePeersAnnotationKey = "kube-peers.weave.works/peers"
KubePeersAnnotationKey = KubePeersPrefix + "peers"
)

func (cml *configMapAnnotations) Init() error {
Expand Down Expand Up @@ -130,46 +132,59 @@ func (cml *configMapAnnotations) UpdatePeerList(list peerList) error {
return cml.UpdateAnnotation(KubePeersAnnotationKey, string(recordBytes))
}

func (cml *configMapAnnotations) UpdateAnnotation(key, value string) error {
// Clean up a string so it meets the Kubernetes requiremements for Annotation keys:
// name part must consist of alphanumeric characters, '-', '_' or '.', and must
// start and end with an alphanumeric character (e.g. 'MyName', or 'my.name', or '123-abc')
func cleanKey(key string) string {
buf := []byte(key)
for i, c := range buf {
if (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.' || c == '/' {
continue
}
buf[i] = '_'
}
return string(buf)
}

func (cml *configMapAnnotations) GetAnnotation(key string) (string, bool) {
value, ok := cml.cm.Annotations[cleanKey(key)]
return value, ok
}

func (cml *configMapAnnotations) UpdateAnnotation(key, value string) (err error) {
if cml.cm == nil {
return errors.New("endpoint not initialized, call Init first")
}
cm := cml.cm
cm.Annotations[key] = value
cm, err := cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm)
if err == nil {
cml.cm = cm
}
// speculatively change the state, then replace with whatever comes back
// from Update(), which will be the latest on the server whatever happened
cml.cm.Annotations[cleanKey(key)] = value
cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm)
return err
}

func (cml *configMapAnnotations) RemoveAnnotation(key string) error {
func (cml *configMapAnnotations) RemoveAnnotation(key string) (err error) {
if cml.cm == nil {
return errors.New("endpoint not initialized, call Init first")
}
cm := cml.cm
delete(cm.Annotations, key)
cm, err := cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm)
if err == nil {
cml.cm = cm
}
// speculatively change the state, then replace with whatever comes back
// from Update(), which will be the latest on the server whatever happened
delete(cml.cm.Annotations, cleanKey(key))
cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm)
return err
}

func (cml *configMapAnnotations) RemoveAnnotationsWithValue(valueToRemove string) error {
func (cml *configMapAnnotations) RemoveAnnotationsWithValue(valueToRemove string) (err error) {
if cml.cm == nil {
return errors.New("endpoint not initialized, call Init first")
}
cm := cml.cm
for key, value := range cm.Annotations {
// speculatively change the state, then replace with whatever comes back
// from Update(), which will be the latest on the server whatever happened
for key, value := range cml.cm.Annotations {
if value == valueToRemove {
delete(cm.Annotations, key)
delete(cml.cm.Annotations, key) // don't need to clean this key as it came from the map
}
}
cm, err := cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm)
if err == nil {
cml.cm = cm
}
cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm)
return err
}

Expand Down
16 changes: 9 additions & 7 deletions prog/kube-peers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func checkIamInPeerList(cml *configMapAnnotations, c *kubernetes.Clientset, peer
// Kubernetes, remove it from Weave IPAM
func reclaimRemovedPeers(weave *weaveapi.Client, cml *configMapAnnotations, nodes []nodeInfo, myPeerName string) error {
for {
if err := cml.Init(); err != nil {
return err
}
// 1. Compare peers stored in the peerList against all peers reported by k8s now.
storedPeerList, err := cml.GetPeerList()
if err != nil {
Expand All @@ -119,7 +122,7 @@ func reclaimRemovedPeers(weave *weaveapi.Client, cml *configMapAnnotations, node
common.Log.Debugln("[kube-peers] Preparing to remove disappeared peer", peer)
okToRemove := false
// 3. Check if there is an existing annotation with key X
if existingAnnotation, found := cml.cm.Annotations[peer.PeerName]; found {
if existingAnnotation, found := cml.GetAnnotation(KubePeersPrefix + peer.PeerName); found {
common.Log.Debugln("[kube-peers] Existing annotation", existingAnnotation)
// 4. If annotation already contains my identity, ok;
if existingAnnotation == myPeerName {
Expand All @@ -128,14 +131,16 @@ func reclaimRemovedPeers(weave *weaveapi.Client, cml *configMapAnnotations, node
} else {
// 5. If non-existent, write an annotation with key X and contents "my identity"
common.Log.Debugln("[kube-peers] Noting I plan to remove ", peer.PeerName)
if err := cml.UpdateAnnotation(peer.PeerName, myPeerName); err == nil {
if err := cml.UpdateAnnotation(KubePeersPrefix+peer.PeerName, myPeerName); err == nil {
okToRemove = true
} else {
common.Log.Debugln("[kube-peers] error from UpdateAnnotation: ", err)
}
}
if okToRemove {
// 6. If step 4 or 5 succeeded, rmpeer X
result, err := weave.RmPeer(peer.PeerName)
common.Log.Infoln("[kube-peers] rmpeer of", peer.PeerName, ":", result)
common.Log.Infof("[kube-peers] rmpeer of %s: %s", peer.PeerName, result)
if err != nil {
return err
}
Expand All @@ -146,15 +151,12 @@ func reclaimRemovedPeers(weave *weaveapi.Client, cml *configMapAnnotations, node
}
// 7a. Remove X from peerList
storedPeerList.remove(peer.PeerName)
common.Log.Infoln("[kube-peers] Removing peer ", peer.PeerName, ". Expecting to remove linked annotation next.")
if err := cml.UpdatePeerList(*storedPeerList); err != nil {
return err
}
common.Log.Infoln("[kube-peers] Removing annotation ", peer.PeerName)
// 7b. Remove annotation with key X
return cml.RemoveAnnotation(peer.PeerName)
return cml.RemoveAnnotation(KubePeersPrefix + peer.PeerName)
})
common.Log.Debugln("[kube-peers] Finished removal of ", peer.PeerName)
}
// 8. If step 5 failed due to optimistic lock conflict, stop: someone else is handling X

Expand Down