Skip to content

Commit

Permalink
Add resync cache logic upon watcher reset (#305)
Browse files Browse the repository at this point in the history
Fixes a race condition which in watcher is trying to re-connect and
ANP is deleted by the user. In this case, when the watcher re-connect,
it will not receive delete event for those objects and those objects
will remain in network policy controller cache and in cloud too.

Here, we introduce a resync logic in which, upon any re-connect, the
cache will be resynced first and then new cache will be compared with
in-memory cache to figure out stale objects and explicitly delete them.

Signed-off-by: Rahul Jain <rahulj@vmware.com>
  • Loading branch information
reachjainrahul authored Sep 6, 2023
1 parent 71c522a commit 2e45df4
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 7 deletions.
2 changes: 1 addition & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko=
github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
Expand Down
182 changes: 176 additions & 6 deletions pkg/controllers/networkpolicy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package networkpolicy

import (
"context"
"errors"
"fmt"
"reflect"
"time"
Expand Down Expand Up @@ -90,11 +91,14 @@ type NetworkPolicyReconciler struct {
networkPolicyWatcher watch.Interface

// Indexers
networkPolicyIndexer cache.Indexer
addrSGIndexer cache.Indexer
appliedToSGIndexer cache.Indexer
npTrackerIndexer cache.Indexer
cloudRuleIndexer cache.Indexer
networkPolicyIndexer cache.Indexer
appliedToGroupIndexer cache.Indexer
addressGroupIndexer cache.Indexer

addrSGIndexer cache.Indexer
appliedToSGIndexer cache.Indexer
npTrackerIndexer cache.Indexer
cloudRuleIndexer cache.Indexer

Inventory inventory.Interface

Expand Down Expand Up @@ -372,6 +376,16 @@ func (r *NetworkPolicyReconciler) processAddressGroup(event watch.Event) error {
added = patch.AddedGroupMembers
removed = patch.RemovedGroupMembers
}
// Just storing AddressGroup, not AddressGroupPatch. We just need the object key to remove stale member.
if event.Type == watch.Added {
if err := r.addressGroupIndexer.Add(event.Object); err != nil {
r.Log.Error(err, "failed to add addressGroup")
}
} else if event.Type == watch.Deleted {
if err := r.addressGroupIndexer.Delete(event.Object); err != nil {
r.Log.Error(err, "failed to delete addressGroup")
}
}
return r.processGroup(getNormalizedName(accessor.GetName()), event.Type, true, added, removed)
}

Expand All @@ -392,6 +406,17 @@ func (r *NetworkPolicyReconciler) processAppliedToGroup(event watch.Event) error
added = patch.AddedGroupMembers
removed = patch.RemovedGroupMembers
}

// Just storing AppliedToGroup, not AppliedToGroupPatch. We just need the object key to remove stale member.
if event.Type == watch.Added {
if err := r.appliedToGroupIndexer.Add(event.Object); err != nil {
r.Log.Error(err, "failed to add appliedToGroup")
}
} else if event.Type == watch.Deleted {
if err := r.appliedToGroupIndexer.Delete(event.Object); err != nil {
r.Log.Error(err, "failed to delete appliedToGroup")
}
}
return r.processGroup(getNormalizedName(accessor.GetName()), event.Type, false, added, removed)
}

Expand Down Expand Up @@ -682,6 +707,18 @@ func (r *NetworkPolicyReconciler) SetupWithManager(mgr ctrl.Manager, antreaKubec
return appliedToGroups, nil
},
})
r.addressGroupIndexer = cache.NewIndexer(
func(obj interface{}) (string, error) {
addressGroup := obj.(*antreanetworking.AddressGroup)
return types.NamespacedName{Name: addressGroup.Name, Namespace: addressGroup.Namespace}.String(), nil
}, cache.Indexers{},
)
r.appliedToGroupIndexer = cache.NewIndexer(
func(obj interface{}) (string, error) {
appliedToGroup := obj.(*antreanetworking.AppliedToGroup)
return types.NamespacedName{Name: appliedToGroup.Name, Namespace: appliedToGroup.Namespace}.String(), nil
}, cache.Indexers{},
)
r.npTrackerIndexer = cache.NewIndexer(
// Each CloudResourceNpTracker is uniquely identified by cloud resource.
func(obj interface{}) (string, error) {
Expand Down Expand Up @@ -758,11 +795,141 @@ func (r *NetworkPolicyReconciler) createAntreaClient(mgr ctrl.Manager, antreaKub
r.antreaClient = antreanetworkingclient.NewForConfigOrDie(config)
}

// reSyncCache re-syncs internal cache of appliedToGroup, addressGroup and networkPolicy upon watcher reset.
// It caches all the events till the bookmark and figures out stale objects (if any) and issue explicit
// delete for those objects.
func (r *NetworkPolicyReconciler) reSyncCache() error {
var (
npInitialEventObjects []watch.Event
addressGroupInitialEventObjects []watch.Event
appliedToInitialEventObjects []watch.Event

npInitialObjectKeys = map[types.NamespacedName]struct{}{}
addressGroupInitialObjectKeys = map[types.NamespacedName]struct{}{}
appliedToInitialObjectKeys = map[types.NamespacedName]struct{}{}
)

// TODO: Better choice is to make watch interface and let networkPolicy, addressGroup and appliedToGroup
// share the common watch interface. This can work we can process these events in parallel.
syncCount := 0
for {
if syncCount == 3 {
break
}
select {
case event, ok := <-r.addrGroupWatcher.ResultChan():
if !ok || event.Type == watch.Error {
return errors.New("closed addressGroupWatcher channel, restart")
}
// Cache in initial objects till bookmark event.
addressGroupInitialEventObjects = append(addressGroupInitialEventObjects, event)

if event.Type == watch.Bookmark {
syncCount++
break
} else {
// Store the key so that it can be used for figuring out stale objects later.
addressGroup := event.Object.(*antreanetworking.AddressGroup)
key := types.NamespacedName{Namespace: addressGroup.Namespace, Name: addressGroup.Name}
addressGroupInitialObjectKeys[key] = struct{}{}
}
case event, ok := <-r.appliedToGroupWatcher.ResultChan():
if !ok || event.Type == watch.Error {
return errors.New("closed appliedToGroupWatcher channel, restart")
}
appliedToInitialEventObjects = append(appliedToInitialEventObjects, event)
if event.Type == watch.Bookmark {
syncCount++
break
} else {
appliedToGroup := event.Object.(*antreanetworking.AppliedToGroup)
key := types.NamespacedName{Namespace: appliedToGroup.Namespace, Name: appliedToGroup.Name}
appliedToInitialObjectKeys[key] = struct{}{}
}
case event, ok := <-r.networkPolicyWatcher.ResultChan():
if !ok || event.Type == watch.Error {
return errors.New("closed networkPolicyWatcher channel, restart")
}
npInitialEventObjects = append(npInitialEventObjects, event)
if event.Type == watch.Bookmark {
syncCount++
break
} else {
np := event.Object.(*antreanetworking.NetworkPolicy)
key := types.NamespacedName{Name: np.SourceRef.Name, Namespace: np.SourceRef.Namespace}
npInitialObjectKeys[key] = struct{}{}
}
}
}

// Delete Stale Network Policy, AppliedToGroup and AddressGroup.
cachedNp := r.networkPolicyIndexer.List()
for _, obj := range cachedNp {
np := obj.(*networkPolicy)
npKey := types.NamespacedName{Name: np.Name, Namespace: np.Namespace}
if _, ok := npInitialObjectKeys[npKey]; !ok {
r.Log.V(1).Info("Deleting stale networkPolicy", "name", npKey)
obj := watch.Event{Object: &np.NetworkPolicy, Type: watch.Deleted}
if err := r.processNetworkPolicy(obj); err != nil {
r.Log.Error(err, "processing")
}
}
}

cachedAppliedToGroup := r.appliedToGroupIndexer.List()
for _, obj := range cachedAppliedToGroup {
appliedToGroup := obj.(*antreanetworking.AppliedToGroup)
key := types.NamespacedName{Name: appliedToGroup.Name, Namespace: appliedToGroup.Namespace}
if _, ok := npInitialObjectKeys[key]; !ok {
r.Log.V(1).Info("Deleting stale appliedToGroup", "name", key)
obj := watch.Event{Object: appliedToGroup, Type: watch.Deleted}
if err := r.processAppliedToGroup(obj); err != nil {
r.Log.Error(err, "processing")
}
}
}

cachedAddressGroup := r.addressGroupIndexer.List()
for _, obj := range cachedAddressGroup {
addressGroup := obj.(*antreanetworking.AddressGroup)
key := types.NamespacedName{Name: addressGroup.Name, Namespace: addressGroup.Namespace}
if _, ok := npInitialObjectKeys[key]; !ok {
r.Log.V(1).Info("Deleting stale addressGroup", "name", key)
obj := watch.Event{Object: addressGroup, Type: watch.Deleted}
if err := r.processAddressGroup(obj); err != nil {
r.Log.Error(err, "processing")
}
}
}

// Add AddressGroup, AppliedToGroup and NetworkPolicy.
for _, obj := range addressGroupInitialEventObjects {
if err := r.processAddressGroup(obj); err != nil {
r.Log.Error(err, "processing")
}
}

for _, obj := range appliedToInitialEventObjects {
if err := r.processAppliedToGroup(obj); err != nil {
r.Log.Error(err, "processing")
}
}

for _, obj := range npInitialEventObjects {
if err := r.processNetworkPolicy(obj); err != nil {
r.Log.Error(err, "processing")
}
}
return nil
}

func (r *NetworkPolicyReconciler) resetWatchers() error {
var err error
options := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("nodeName", config.ANPNepheController).String(),
}

// TODO: Evaluate if we can avoid resetting all the watchers, if one of the watcher fails.
for {
if r.addrGroupWatcher, err = r.antreaClient.AddressGroups().Watch(context.Background(), options); err != nil {
r.Log.Error(err, "watcher connect to AddressGroup")
Expand All @@ -779,7 +946,10 @@ func (r *NetworkPolicyReconciler) resetWatchers() error {
time.Sleep(time.Second * 5)
continue
}
break
// Re-sync cache after reset.
if err := r.reSyncCache(); err == nil {
break
}
}
return err
}
Expand Down

0 comments on commit 2e45df4

Please sign in to comment.