diff --git a/pkg/scheduler/statefulset/autoscaler.go b/pkg/scheduler/statefulset/autoscaler.go index a829668d35d..203d3963fc3 100644 --- a/pkg/scheduler/statefulset/autoscaler.go +++ b/pkg/scheduler/statefulset/autoscaler.go @@ -20,13 +20,16 @@ import ( "context" "math" "sync" + "sync/atomic" "time" "go.uber.org/zap" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" + "knative.dev/pkg/reconciler" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/logging" @@ -35,6 +38,16 @@ import ( st "knative.dev/eventing/pkg/scheduler/state" ) +var ( + // ephemeralLeaderElectionObject is the key used to check whether a given autoscaler instance + // is leader or not. + // This is an ephemeral key and must be kept stable and unmodified across releases. + ephemeralLeaderElectionObject = types.NamespacedName{ + Namespace: "knative-eventing", + Name: "autoscaler-ephemeral", + } +) + type Autoscaler interface { // Start runs the autoscaler until cancelled. Start(ctx context.Context) @@ -59,9 +72,35 @@ type autoscaler struct { // refreshPeriod is how often the autoscaler tries to scale down the statefulset refreshPeriod time.Duration lock sync.Locker + + // isLeader signals whether a given autoscaler instance is leader or not. + // The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a + // bucket where we've been promoted. + isLeader atomic.Bool +} + +var ( + _ reconciler.LeaderAware = &autoscaler{} +) + +// Promote implements reconciler.LeaderAware. +func (a *autoscaler) Promote(b reconciler.Bucket, _ func(reconciler.Bucket, types.NamespacedName)) error { + if b.Has(ephemeralLeaderElectionObject) { + // The promoted bucket has the ephemeralLeaderElectionObject, so we are leader. + a.isLeader.Store(true) + } + return nil } -func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor) Autoscaler { +// Demote implements reconciler.LeaderAware. +func (a *autoscaler) Demote(b reconciler.Bucket) { + if b.Has(ephemeralLeaderElectionObject) { + // The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore. + a.isLeader.Store(false) + } +} + +func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor) *autoscaler { return &autoscaler{ logger: logging.FromContext(ctx), statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), @@ -73,6 +112,7 @@ func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAcces capacity: cfg.PodCapacity, refreshPeriod: cfg.RefreshPeriod, lock: new(sync.Mutex), + isLeader: atomic.Bool{}, } } @@ -111,6 +151,9 @@ func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool, p } func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pending int32) error { + if !a.isLeader.Load() { + return nil + } state, err := a.stateAccessor.State(nil) if err != nil { a.logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) diff --git a/pkg/scheduler/statefulset/autoscaler_test.go b/pkg/scheduler/statefulset/autoscaler_test.go index 6f0de384329..976a2e1cf8a 100644 --- a/pkg/scheduler/statefulset/autoscaler_test.go +++ b/pkg/scheduler/statefulset/autoscaler_test.go @@ -23,12 +23,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" v1 "k8s.io/client-go/listers/core/v1" gtesting "k8s.io/client-go/testing" + "knative.dev/pkg/reconciler" kubeclient "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake" @@ -386,7 +388,8 @@ func TestAutoscaler(t *testing.T) { RefreshPeriod: 10 * time.Second, PodCapacity: 10, } - autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler) + autoscaler := newAutoscaler(ctx, cfg, stateAccessor) + _ = autoscaler.Promote(reconciler.UniversalBucket(), nil) for _, vpod := range tc.vpods { vpodClient.Append(vpod) @@ -442,7 +445,8 @@ func TestAutoscalerScaleDownToZero(t *testing.T) { RefreshPeriod: 2 * time.Second, PodCapacity: 10, } - autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler) + autoscaler := newAutoscaler(ctx, cfg, stateAccessor) + _ = autoscaler.Promote(reconciler.UniversalBucket(), nil) done := make(chan bool) go func() { @@ -871,7 +875,9 @@ func TestCompactor(t *testing.T) { RefreshPeriod: 10 * time.Second, PodCapacity: 10, } - autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler) + autoscaler := newAutoscaler(ctx, cfg, stateAccessor) + _ = autoscaler.Promote(reconciler.UniversalBucket(), func(bucket reconciler.Bucket, name types.NamespacedName) {}) + assert.Equal(t, true, autoscaler.isLeader.Load()) for _, vpod := range tc.vpods { vpodClient.Append(vpod) @@ -913,6 +919,15 @@ func TestCompactor(t *testing.T) { if len(evictions) != 0 { t.Fatalf("unexpected evictions %v", evictions) } + + autoscaler.Demote(reconciler.UniversalBucket()) + assert.Equal(t, false, autoscaler.isLeader.Load()) }) } } + +func TestEphemeralKeyStableValues(t *testing.T) { + // Do not modify expected values + assert.Equal(t, "knative-eventing", ephemeralLeaderElectionObject.Namespace) + assert.Equal(t, "autoscaler-ephemeral", ephemeralLeaderElectionObject.Name) +} diff --git a/pkg/scheduler/statefulset/scheduler.go b/pkg/scheduler/statefulset/scheduler.go index f3ecb7c0c6e..a1b5985e669 100644 --- a/pkg/scheduler/statefulset/scheduler.go +++ b/pkg/scheduler/statefulset/scheduler.go @@ -32,6 +32,7 @@ import ( corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/utils/integer" + "knative.dev/pkg/reconciler" kubeclient "knative.dev/pkg/client/injection/kube/client" statefulsetinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset" @@ -145,11 +146,31 @@ type StatefulSetScheduler struct { reserved map[types.NamespacedName]map[string]int32 } +var ( + _ reconciler.LeaderAware = &StatefulSetScheduler{} + _ scheduler.Scheduler = &StatefulSetScheduler{} +) + +// Promote implements reconciler.LeaderAware. +func (s *StatefulSetScheduler) Promote(b reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error { + if v, ok := s.autoscaler.(reconciler.LeaderAware); ok { + return v.Promote(b, enq) + } + return nil +} + +// Demote implements reconciler.LeaderAware. +func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) { + if v, ok := s.autoscaler.(reconciler.LeaderAware); ok { + v.Demote(b) + } +} + func newStatefulSetScheduler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor, autoscaler Autoscaler, - podlister corev1listers.PodNamespaceLister) scheduler.Scheduler { + podlister corev1listers.PodNamespaceLister) *StatefulSetScheduler { scheduler := &StatefulSetScheduler{ ctx: ctx, diff --git a/pkg/scheduler/statefulset/scheduler_test.go b/pkg/scheduler/statefulset/scheduler_test.go index 9c534b32e31..f9ac3a25669 100644 --- a/pkg/scheduler/statefulset/scheduler_test.go +++ b/pkg/scheduler/statefulset/scheduler_test.go @@ -17,11 +17,14 @@ limitations under the License. package statefulset import ( + "context" "fmt" "reflect" + "sync/atomic" "testing" "time" + "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -29,6 +32,7 @@ import ( kubeclient "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake" "knative.dev/pkg/controller" + "knative.dev/pkg/reconciler" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" listers "knative.dev/eventing/pkg/reconciler/testing/v1" @@ -779,7 +783,7 @@ func TestStatefulsetScheduler(t *testing.T) { StatefulSetName: sfsName, VPodLister: vpodClient.List, } - s := newStatefulSetScheduler(ctx, cfg, sa, nil, lsp.GetPodLister().Pods(testNs)).(*StatefulSetScheduler) + s := newStatefulSetScheduler(ctx, cfg, sa, nil, lsp.GetPodLister().Pods(testNs)) if tc.pending != nil { s.pending = tc.pending } @@ -877,13 +881,49 @@ func TestReservePlacements(t *testing.T) { StatefulSetName: sfsName, VPodLister: vpodClient.List, } - s := newStatefulSetScheduler(ctx, cfg, nil, nil, nil).(*StatefulSetScheduler) + fa := newFakeAutoscaler() + s := newStatefulSetScheduler(ctx, cfg, nil, fa, nil) + _ = s.Promote(reconciler.UniversalBucket(), func(bucket reconciler.Bucket, name types.NamespacedName) {}) + s.reservePlacements(tc.vpod, tc.vpod.GetPlacements()) //initial reserve s.reservePlacements(tc.vpod, tc.placements) //new reserve if !reflect.DeepEqual(s.reserved[tc.vpod.GetKey()], tc.reserved) { t.Errorf("got %v, want %v", s.reserved[tc.vpod.GetKey()], tc.reserved) } + + assert.Equal(t, true, fa.isLeader.Load()) + + s.Demote(reconciler.UniversalBucket()) + assert.Equal(t, false, fa.isLeader.Load()) }) } } + +type fakeAutoscaler struct { + isLeader atomic.Bool +} + +func (f *fakeAutoscaler) Start(ctx context.Context) { +} + +func (f *fakeAutoscaler) Autoscale(ctx context.Context, attemptScaleDown bool, pending int32) { +} + +func newFakeAutoscaler() *fakeAutoscaler { + return &fakeAutoscaler{ + isLeader: atomic.Bool{}, + } +} + +func (f *fakeAutoscaler) Promote(b reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error { + f.isLeader.Store(true) + return nil +} + +func (f *fakeAutoscaler) Demote(bucket reconciler.Bucket) { + f.isLeader.Store(false) +} + +var _ reconciler.LeaderAware = &fakeAutoscaler{} +var _ Autoscaler = &fakeAutoscaler{}