Skip to content

Commit

Permalink
Make autoscaler leader aware (#6814)
Browse files Browse the repository at this point in the history
Fixes #6732 

The autoscaler runs in every controller replica [1], it tries
to scale down on every replica after the given refresh period,
and sometimes the 2 replicas don't agree on which value to use
for the new replicas since the state is lister/cache based,
leading to a too fast scale up or down behavior or sometime
not converging.
(also because of #6733)

Implementations should be using
knative/pkg#2675
for enabling leader-aware autoscaler. (PR
knative/pkg#2688)

[1]

https://github.com/knative/eventing/blob/1092472f440586099d6a5cbf1d3234bb36431af4/pkg/scheduler/statefulset/autoscaler.go#L85-L103

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi authored Mar 14, 2023
1 parent 5988c42 commit 62eb85e
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 7 deletions.
45 changes: 44 additions & 1 deletion pkg/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import (
"context"
"math"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
"knative.dev/pkg/reconciler"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"
Expand All @@ -35,6 +38,16 @@ import (
st "knative.dev/eventing/pkg/scheduler/state"
)

var (
// ephemeralLeaderElectionObject is the key used to check whether a given autoscaler instance
// is leader or not.
// This is an ephemeral key and must be kept stable and unmodified across releases.
ephemeralLeaderElectionObject = types.NamespacedName{
Namespace: "knative-eventing",
Name: "autoscaler-ephemeral",
}
)

type Autoscaler interface {
// Start runs the autoscaler until cancelled.
Start(ctx context.Context)
Expand All @@ -59,9 +72,35 @@ type autoscaler struct {
// refreshPeriod is how often the autoscaler tries to scale down the statefulset
refreshPeriod time.Duration
lock sync.Locker

// isLeader signals whether a given autoscaler instance is leader or not.
// The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a
// bucket where we've been promoted.
isLeader atomic.Bool
}

var (
_ reconciler.LeaderAware = &autoscaler{}
)

// Promote implements reconciler.LeaderAware.
func (a *autoscaler) Promote(b reconciler.Bucket, _ func(reconciler.Bucket, types.NamespacedName)) error {
if b.Has(ephemeralLeaderElectionObject) {
// The promoted bucket has the ephemeralLeaderElectionObject, so we are leader.
a.isLeader.Store(true)
}
return nil
}

func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor) Autoscaler {
// Demote implements reconciler.LeaderAware.
func (a *autoscaler) Demote(b reconciler.Bucket) {
if b.Has(ephemeralLeaderElectionObject) {
// The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore.
a.isLeader.Store(false)
}
}

func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor) *autoscaler {
return &autoscaler{
logger: logging.FromContext(ctx),
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace),
Expand All @@ -73,6 +112,7 @@ func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAcces
capacity: cfg.PodCapacity,
refreshPeriod: cfg.RefreshPeriod,
lock: new(sync.Mutex),
isLeader: atomic.Bool{},
}
}

Expand Down Expand Up @@ -111,6 +151,9 @@ func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool, p
}

func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pending int32) error {
if !a.isLeader.Load() {
return nil
}
state, err := a.stateAccessor.State(nil)
if err != nil {
a.logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err))
Expand Down
21 changes: 18 additions & 3 deletions pkg/scheduler/statefulset/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
v1 "k8s.io/client-go/listers/core/v1"
gtesting "k8s.io/client-go/testing"
"knative.dev/pkg/reconciler"

kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake"
Expand Down Expand Up @@ -386,7 +388,8 @@ func TestAutoscaler(t *testing.T) {
RefreshPeriod: 10 * time.Second,
PodCapacity: 10,
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
_ = autoscaler.Promote(reconciler.UniversalBucket(), nil)

for _, vpod := range tc.vpods {
vpodClient.Append(vpod)
Expand Down Expand Up @@ -442,7 +445,8 @@ func TestAutoscalerScaleDownToZero(t *testing.T) {
RefreshPeriod: 2 * time.Second,
PodCapacity: 10,
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
_ = autoscaler.Promote(reconciler.UniversalBucket(), nil)

done := make(chan bool)
go func() {
Expand Down Expand Up @@ -871,7 +875,9 @@ func TestCompactor(t *testing.T) {
RefreshPeriod: 10 * time.Second,
PodCapacity: 10,
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
_ = autoscaler.Promote(reconciler.UniversalBucket(), func(bucket reconciler.Bucket, name types.NamespacedName) {})
assert.Equal(t, true, autoscaler.isLeader.Load())

for _, vpod := range tc.vpods {
vpodClient.Append(vpod)
Expand Down Expand Up @@ -913,6 +919,15 @@ func TestCompactor(t *testing.T) {
if len(evictions) != 0 {
t.Fatalf("unexpected evictions %v", evictions)
}

autoscaler.Demote(reconciler.UniversalBucket())
assert.Equal(t, false, autoscaler.isLeader.Load())
})
}
}

func TestEphemeralKeyStableValues(t *testing.T) {
// Do not modify expected values
assert.Equal(t, "knative-eventing", ephemeralLeaderElectionObject.Namespace)
assert.Equal(t, "autoscaler-ephemeral", ephemeralLeaderElectionObject.Name)
}
23 changes: 22 additions & 1 deletion pkg/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/integer"
"knative.dev/pkg/reconciler"

kubeclient "knative.dev/pkg/client/injection/kube/client"
statefulsetinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset"
Expand Down Expand Up @@ -145,11 +146,31 @@ type StatefulSetScheduler struct {
reserved map[types.NamespacedName]map[string]int32
}

var (
_ reconciler.LeaderAware = &StatefulSetScheduler{}
_ scheduler.Scheduler = &StatefulSetScheduler{}
)

// Promote implements reconciler.LeaderAware.
func (s *StatefulSetScheduler) Promote(b reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error {
if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
return v.Promote(b, enq)
}
return nil
}

// Demote implements reconciler.LeaderAware.
func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) {
if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
v.Demote(b)
}
}

func newStatefulSetScheduler(ctx context.Context,
cfg *Config,
stateAccessor st.StateAccessor,
autoscaler Autoscaler,
podlister corev1listers.PodNamespaceLister) scheduler.Scheduler {
podlister corev1listers.PodNamespaceLister) *StatefulSetScheduler {

scheduler := &StatefulSetScheduler{
ctx: ctx,
Expand Down
44 changes: 42 additions & 2 deletions pkg/scheduler/statefulset/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@ limitations under the License.
package statefulset

import (
"context"
"fmt"
"reflect"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake"
"knative.dev/pkg/controller"
"knative.dev/pkg/reconciler"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
listers "knative.dev/eventing/pkg/reconciler/testing/v1"
Expand Down Expand Up @@ -779,7 +783,7 @@ func TestStatefulsetScheduler(t *testing.T) {
StatefulSetName: sfsName,
VPodLister: vpodClient.List,
}
s := newStatefulSetScheduler(ctx, cfg, sa, nil, lsp.GetPodLister().Pods(testNs)).(*StatefulSetScheduler)
s := newStatefulSetScheduler(ctx, cfg, sa, nil, lsp.GetPodLister().Pods(testNs))
if tc.pending != nil {
s.pending = tc.pending
}
Expand Down Expand Up @@ -877,13 +881,49 @@ func TestReservePlacements(t *testing.T) {
StatefulSetName: sfsName,
VPodLister: vpodClient.List,
}
s := newStatefulSetScheduler(ctx, cfg, nil, nil, nil).(*StatefulSetScheduler)
fa := newFakeAutoscaler()
s := newStatefulSetScheduler(ctx, cfg, nil, fa, nil)
_ = s.Promote(reconciler.UniversalBucket(), func(bucket reconciler.Bucket, name types.NamespacedName) {})

s.reservePlacements(tc.vpod, tc.vpod.GetPlacements()) //initial reserve

s.reservePlacements(tc.vpod, tc.placements) //new reserve
if !reflect.DeepEqual(s.reserved[tc.vpod.GetKey()], tc.reserved) {
t.Errorf("got %v, want %v", s.reserved[tc.vpod.GetKey()], tc.reserved)
}

assert.Equal(t, true, fa.isLeader.Load())

s.Demote(reconciler.UniversalBucket())
assert.Equal(t, false, fa.isLeader.Load())
})
}
}

type fakeAutoscaler struct {
isLeader atomic.Bool
}

func (f *fakeAutoscaler) Start(ctx context.Context) {
}

func (f *fakeAutoscaler) Autoscale(ctx context.Context, attemptScaleDown bool, pending int32) {
}

func newFakeAutoscaler() *fakeAutoscaler {
return &fakeAutoscaler{
isLeader: atomic.Bool{},
}
}

func (f *fakeAutoscaler) Promote(b reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error {
f.isLeader.Store(true)
return nil
}

func (f *fakeAutoscaler) Demote(bucket reconciler.Bucket) {
f.isLeader.Store(false)
}

var _ reconciler.LeaderAware = &fakeAutoscaler{}
var _ Autoscaler = &fakeAutoscaler{}

0 comments on commit 62eb85e

Please sign in to comment.