Skip to content

Commit

Permalink
More robust system Tier creation / update
Browse files Browse the repository at this point in the history
System Tier initialization, as well as all the APIserver endpoints that
depend on informer caches, should ideally wait for the caches to sync
before using the listers. An easy way to do that is to ensure that the
APIServer does not run until the informer factory has synced all caches.

Additionally, we improve the logic in system Tier initialization:
* After every failed API call, we should probably check the state of the
  informer cache again (via the lister) and decide which step is needed
  next (creation / update / nothing). In case of a failed create or
  update, this gives us the opportunity to check again if the Tier
  actually exists, and if yes, what its priority is.
* AlreadyExists is no longer treated differently for create. The reason
  is that if the Tier already exists and for some reason it was not
  returned by the Lister, it will probably be the validation webhook
  that fails (overlapping priority), and the error won't match
  AlreadyExists.

Related to antrea-io#6694

Signed-off-by: Antonin Bas <antonin.bas@broadcom.com>
  • Loading branch information
antoninbas committed Sep 30, 2024
1 parent 1dad03b commit ae097b1
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 98 deletions.
13 changes: 12 additions & 1 deletion cmd/antrea-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,18 @@ func run(o *Options) error {

go networkPolicyController.Run(stopCh)

go apiServer.Run(ctx)
go func() {
klog.InfoS("Waiting for all informer caches to sync before running APIserver")
// The apiserver has dependencies on multiple informer listers, which are obtained
// from the networkPolicyController object when configuring the apiserver. Before
// starting serving requests, it makes sense to wait for the informers to have
// synced. Rather than listing all the individual informers, we can call
// WaitForCacheSync on the informer factories.
informerFactory.WaitForCacheSync(stopCh)
crdInformerFactory.WaitForCacheSync(stopCh)
klog.InfoS("All informer caches are synced, running APIserver")
apiServer.Run(ctx)
}()

if features.DefaultFeatureGate.Enabled(features.NetworkPolicyStats) {
go statsAggregator.Run(stopCh)
Expand Down
3 changes: 2 additions & 1 deletion pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ func installHandlers(c *ExtraConfig, s *genericapiserver.GenericAPIServer) {

// Install a post start hook to initialize Tiers on start-up
s.AddPostStartHook("initialize-tiers", func(context genericapiserver.PostStartHookContext) error {
go c.networkPolicyController.InitializeTiers()
// context gets cancelled when the server stops.
go c.networkPolicyController.InitializeTiers(context)
return nil
})
}
Expand Down
151 changes: 85 additions & 66 deletions pkg/controller/networkpolicy/tier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

secv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1"
Expand Down Expand Up @@ -118,83 +119,101 @@ var (
// will first attempt to retrieve the Tier by it's name from K8s and if missing,
// create the CR. InitializeTiers will be called as part of a Post-Start hook
// of antrea-controller's APIServer.
func (n *NetworkPolicyController) InitializeTiers() {
func (n *NetworkPolicyController) InitializeTiers(ctx context.Context) error {
// Wait at most 5s for Tier informer to sync (best effort).
// All caches should be synced before starting the APIServer, so this
// should return instantly.
func() {
timeout := 5 * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
if !cache.WaitForCacheSync(ctx.Done(), n.tierListerSynced) {
klog.ErrorS(nil, "Caches not synced before timeout for default Tiers initialization", "timeout", timeout)
}
}()
select {
case <-ctx.Done():
return ctx.Err()
default:
}
for _, t := range systemGeneratedTiers {
// Check if Tier is already present.
oldTier, err := n.tierLister.Get(t.Name)
if err == nil {
// Tier is already present.
klog.V(2).Infof("%s Tier already created", t.Name)
// Update Tier Priority if it is not set to desired Priority.
expPrio := priorityMap[t.Name]
if oldTier.Spec.Priority != expPrio {
tToUpdate := oldTier.DeepCopy()
tToUpdate.Spec.Priority = expPrio
n.updateTier(tToUpdate)
}
continue
if err := n.initializeTier(ctx, t); err != nil {
return err
}
n.initTier(t)
}
return nil
}

// initTier attempts to create system Tiers until they are created using an
// exponential backoff period from 1 to max of 8secs.
func (n *NetworkPolicyController) initTier(t *secv1beta1.Tier) {
var err error
const maxBackoffTime = 8 * time.Second
backoff := 1 * time.Second
func (n *NetworkPolicyController) initializeTier(ctx context.Context, t *secv1beta1.Tier) error {
// Tier creation or update may fail because antrea APIService is not yet ready to accept
// requests for validation. We will keep retrying until it succeeds, using an exponential
// backoff (not exceeding 8s), unelss the context is cancelled.
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 2.0,
Jitter: 0.0,
Steps: 3, // max duration of 8s
}
retryAttempt := 1
for {
klog.V(2).InfoS("Creating system Tier", "tier", t.Name)
_, err = n.crdClient.CrdV1beta1().Tiers().Create(context.TODO(), t, metav1.CreateOptions{})
// Attempt to recreate Tier after a backoff only if it does not exist.
if err != nil {
if errors.IsAlreadyExists(err) {
klog.InfoS("System Tier already exists", "tier", t.Name)
return
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if success := func() bool {
// Check if Tier is already present.
if oldTier, err := n.tierLister.Get(t.Name); err == nil {
// Tier is already present.
klog.V(2).InfoS("Tier already exists", "tier", klog.KObj(t))
// Update Tier Priority if it is not set to desired Priority.
expPrio := t.Spec.Priority
if oldTier.Spec.Priority == expPrio {
return true
}
tToUpdate := oldTier.DeepCopy()
tToUpdate.Spec.Priority = expPrio
if err := n.updateTier(ctx, tToUpdate); err != nil {
klog.InfoS("Failed to update system Tier on init, will retry", "tier", klog.KObj(t), "attempts", retryAttempt, "err", err)
return false
}
return true
}
klog.InfoS("Failed to create system Tier on init, will retry", "tier", t.Name, "attempts", retryAttempt, "err", err)
// Tier creation may fail because antrea APIService is not yet ready
// to accept requests for validation. Retry fixed number of times
// not exceeding 8s.
time.Sleep(backoff)
backoff *= 2
if backoff > maxBackoffTime {
backoff = maxBackoffTime
if err := n.createTier(ctx, t); err != nil {
// Error may be that the Tier already exists, in this case, we will
// call tierLister.Get again and compare priorities.
klog.InfoS("Failed to create system Tier on init, will retry", "tier", klog.KObj(t), "attempts", retryAttempt, "err", err)
return false
}
retryAttempt += 1
continue
return true
}(); success {
break
}
retryAttempt += 1
waitBeforeRetry := backoff.Step()
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(waitBeforeRetry):
}
klog.InfoS("Created system Tier", "tier", t.Name)
return
}
return nil
}

// updateTier attempts to update Tiers using an
// exponential backoff period from 1 to max of 8secs.
func (n *NetworkPolicyController) updateTier(t *secv1beta1.Tier) {
var err error
const maxBackoffTime = 8 * time.Second
backoff := 1 * time.Second
retryAttempt := 1
for {
klog.V(2).Infof("Updating %s Tier", t.Name)
_, err = n.crdClient.CrdV1beta1().Tiers().Update(context.TODO(), t, metav1.UpdateOptions{})
// Attempt to update Tier after a backoff.
if err != nil {
klog.Warningf("Failed to update %s Tier on init: %v. Retry attempt: %d", t.Name, err, retryAttempt)
// Tier update may fail because antrea APIService is not yet ready
// to accept requests for validation. Retry fixed number of times
// not exceeding 8s.
time.Sleep(backoff)
backoff *= 2
if backoff > maxBackoffTime {
backoff = maxBackoffTime
}
retryAttempt += 1
continue
}
return
func (n *NetworkPolicyController) createTier(ctx context.Context, t *secv1beta1.Tier) error {
klog.V(2).InfoS("Creating system Tier", "tier", klog.KObj(t))
if _, err := n.crdClient.CrdV1beta1().Tiers().Create(ctx, t, metav1.CreateOptions{}); err != nil {
return err
}
klog.InfoS("Created system Tier", "tier", klog.KObj(t))
return nil
}

func (n *NetworkPolicyController) updateTier(ctx context.Context, t *secv1beta1.Tier) error {
klog.V(2).InfoS("Updating system Tier", "tier", klog.KObj(t))
if _, err := n.crdClient.CrdV1beta1().Tiers().Update(ctx, t, metav1.UpdateOptions{}); err != nil {
return err
}
klog.InfoS("Updated system Tier", "tier", klog.KObj(t))
return nil
}
126 changes: 96 additions & 30 deletions pkg/controller/networkpolicy/tier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package networkpolicy

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -27,61 +30,124 @@ import (
"antrea.io/antrea/pkg/client/clientset/versioned/fake"
)

func TestInitTier(t *testing.T) {
testTier := &secv1beta1.Tier{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: secv1beta1.TierSpec{
Priority: 10,
},
func TestInitializeTier(t *testing.T) {
makeTestTier := func(priority int32) *secv1beta1.Tier {
return &secv1beta1.Tier{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: secv1beta1.TierSpec{
Priority: priority,
},
}
}
testTier := makeTestTier(10)

tests := []struct {
name string
reactor k8stesting.ReactionFunc
expectedCalled int
name string
createReactor k8stesting.ReactionFunc
updateReactor k8stesting.ReactionFunc
existingTier *secv1beta1.Tier
createExpectedCalls int
updateExpectedCalls int
}{
{
name: "create successfully",
expectedCalled: 1,
name: "create successful",
createExpectedCalls: 1,
},
{
name: "tier already exists",
reactor: func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewAlreadyExists(action.GetResource().GroupResource(), testTier.Name)
},
expectedCalled: 1,
name: "create error",
createReactor: func() k8stesting.ReactionFunc {
curFailureCount := 0
return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
if curFailureCount < 1 {
curFailureCount += 1
return true, nil, errors.NewServiceUnavailable("unknown reason")
}
return false, nil, nil
}
}(),
createExpectedCalls: 2,
},
{
name: "transient error",
reactor: func() k8stesting.ReactionFunc {
name: "update successful",
existingTier: makeTestTier(5),
updateExpectedCalls: 1,
},
{
name: "update error",
updateReactor: func() k8stesting.ReactionFunc {
curFailureCount := 0
maxFailureCount := 1
return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
if curFailureCount < maxFailureCount {
if curFailureCount < 1 {
curFailureCount += 1
return true, nil, errors.NewServiceUnavailable("unknown reason")
}
return false, nil, nil
}
}(),
expectedCalled: 2,
existingTier: makeTestTier(5),
updateExpectedCalls: 2,
},
{
name: "no change needed",
existingTier: makeTestTier(10),
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
_, c := newController(nil, nil)
if tc.reactor != nil {
c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", tc.reactor)
ctx := context.Background()
crdObjects := []runtime.Object{}
if tc.existingTier != nil {
crdObjects = append(crdObjects, tc.existingTier)
}
_, c := newController(nil, crdObjects)
stopCh := make(chan struct{})
defer close(stopCh)
c.crdInformerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)

if tc.createReactor != nil {
c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", tc.createReactor)
}
if tc.updateReactor != nil {
c.crdClient.(*fake.Clientset).PrependReactor("update", "tiers", tc.updateReactor)
}
createCalled := 0
createCalls := 0
c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", func(action k8stesting.Action) (bool, runtime.Object, error) {
createCalled += 1
createCalls += 1
return false, nil, nil
})
c.initTier(testTier)
assert.Equal(t, tc.expectedCalled, createCalled)
updateCalls := 0
c.crdClient.(*fake.Clientset).PrependReactor("update", "tiers", func(action k8stesting.Action) (bool, runtime.Object, error) {
updateCalls += 1
return false, nil, nil
})
// Prevent test from hanging in case of issue.
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
require.NoError(t, c.initializeTier(ctx, testTier))
assert.Equal(t, tc.createExpectedCalls, createCalls)
assert.Equal(t, tc.updateExpectedCalls, updateCalls)
})
}

}

func TestInitializeTiers(t *testing.T) {
ctx := context.Background()

_, c := newController(nil, nil)
stopCh := make(chan struct{})
defer close(stopCh)
c.crdInformerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)

// All system Tiers should be created on the first try, so we can use a small timeout.
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
require.NoError(t, c.InitializeTiers(ctx))
tiers, err := c.crdClient.CrdV1beta1().Tiers().List(ctx, metav1.ListOptions{})
require.NoError(t, err)
assert.Len(t, tiers.Items, len(systemGeneratedTiers))
}

0 comments on commit ae097b1

Please sign in to comment.