Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make autoscaler leader aware #6814

Merged
merged 1 commit into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion pkg/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import (
"context"
"math"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
"knative.dev/pkg/reconciler"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"
Expand All @@ -35,6 +38,16 @@ import (
st "knative.dev/eventing/pkg/scheduler/state"
)

var (
// ephemeralLeaderElectionObject is the key used to check whether a given autoscaler instance
// is leader or not.
// This is an ephemeral key and must be kept stable and unmodified across releases.
ephemeralLeaderElectionObject = types.NamespacedName{
Namespace: "knative-eventing",
Name: "autoscaler-ephemeral",
}
)

type Autoscaler interface {
// Start runs the autoscaler until cancelled.
Start(ctx context.Context)
Expand All @@ -59,9 +72,35 @@ type autoscaler struct {
// refreshPeriod is how often the autoscaler tries to scale down the statefulset
refreshPeriod time.Duration
lock sync.Locker

// isLeader signals whether a given autoscaler instance is leader or not.
// The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a
// bucket where we've been promoted.
isLeader atomic.Bool
}

var (
_ reconciler.LeaderAware = &autoscaler{}
)

// Promote implements reconciler.LeaderAware.
func (a *autoscaler) Promote(b reconciler.Bucket, _ func(reconciler.Bucket, types.NamespacedName)) error {
if b.Has(ephemeralLeaderElectionObject) {
// The promoted bucket has the ephemeralLeaderElectionObject, so we are leader.
a.isLeader.Store(true)
}
return nil
}

func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor) Autoscaler {
// Demote implements reconciler.LeaderAware.
func (a *autoscaler) Demote(b reconciler.Bucket) {
if b.Has(ephemeralLeaderElectionObject) {
// The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore.
a.isLeader.Store(false)
}
}

func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor) *autoscaler {
return &autoscaler{
logger: logging.FromContext(ctx),
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace),
Expand All @@ -73,6 +112,7 @@ func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAcces
capacity: cfg.PodCapacity,
refreshPeriod: cfg.RefreshPeriod,
lock: new(sync.Mutex),
isLeader: atomic.Bool{},
}
}

Expand Down Expand Up @@ -111,6 +151,9 @@ func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool, p
}

func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pending int32) error {
if !a.isLeader.Load() {
return nil
}
state, err := a.stateAccessor.State(nil)
if err != nil {
a.logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err))
Expand Down
21 changes: 18 additions & 3 deletions pkg/scheduler/statefulset/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
v1 "k8s.io/client-go/listers/core/v1"
gtesting "k8s.io/client-go/testing"
"knative.dev/pkg/reconciler"

kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake"
Expand Down Expand Up @@ -386,7 +388,8 @@ func TestAutoscaler(t *testing.T) {
RefreshPeriod: 10 * time.Second,
PodCapacity: 10,
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
_ = autoscaler.Promote(reconciler.UniversalBucket(), nil)

for _, vpod := range tc.vpods {
vpodClient.Append(vpod)
Expand Down Expand Up @@ -442,7 +445,8 @@ func TestAutoscalerScaleDownToZero(t *testing.T) {
RefreshPeriod: 2 * time.Second,
PodCapacity: 10,
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
_ = autoscaler.Promote(reconciler.UniversalBucket(), nil)

done := make(chan bool)
go func() {
Expand Down Expand Up @@ -871,7 +875,9 @@ func TestCompactor(t *testing.T) {
RefreshPeriod: 10 * time.Second,
PodCapacity: 10,
}
autoscaler := newAutoscaler(ctx, cfg, stateAccessor).(*autoscaler)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
_ = autoscaler.Promote(reconciler.UniversalBucket(), func(bucket reconciler.Bucket, name types.NamespacedName) {})
assert.Equal(t, true, autoscaler.isLeader.Load())

for _, vpod := range tc.vpods {
vpodClient.Append(vpod)
Expand Down Expand Up @@ -913,6 +919,15 @@ func TestCompactor(t *testing.T) {
if len(evictions) != 0 {
t.Fatalf("unexpected evictions %v", evictions)
}

autoscaler.Demote(reconciler.UniversalBucket())
assert.Equal(t, false, autoscaler.isLeader.Load())
})
}
}

func TestEphemeralKeyStableValues(t *testing.T) {
// Do not modify expected values
assert.Equal(t, "knative-eventing", ephemeralLeaderElectionObject.Namespace)
assert.Equal(t, "autoscaler-ephemeral", ephemeralLeaderElectionObject.Name)
}
23 changes: 22 additions & 1 deletion pkg/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/integer"
"knative.dev/pkg/reconciler"

kubeclient "knative.dev/pkg/client/injection/kube/client"
statefulsetinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset"
Expand Down Expand Up @@ -145,11 +146,31 @@ type StatefulSetScheduler struct {
reserved map[types.NamespacedName]map[string]int32
}

var (
_ reconciler.LeaderAware = &StatefulSetScheduler{}
_ scheduler.Scheduler = &StatefulSetScheduler{}
)

// Promote implements reconciler.LeaderAware.
func (s *StatefulSetScheduler) Promote(b reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error {
if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
return v.Promote(b, enq)
}
return nil
}

// Demote implements reconciler.LeaderAware.
func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) {
if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
v.Demote(b)
}
}

func newStatefulSetScheduler(ctx context.Context,
cfg *Config,
stateAccessor st.StateAccessor,
autoscaler Autoscaler,
podlister corev1listers.PodNamespaceLister) scheduler.Scheduler {
podlister corev1listers.PodNamespaceLister) *StatefulSetScheduler {

scheduler := &StatefulSetScheduler{
ctx: ctx,
Expand Down
44 changes: 42 additions & 2 deletions pkg/scheduler/statefulset/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@ limitations under the License.
package statefulset

import (
"context"
"fmt"
"reflect"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake"
"knative.dev/pkg/controller"
"knative.dev/pkg/reconciler"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
listers "knative.dev/eventing/pkg/reconciler/testing/v1"
Expand Down Expand Up @@ -779,7 +783,7 @@ func TestStatefulsetScheduler(t *testing.T) {
StatefulSetName: sfsName,
VPodLister: vpodClient.List,
}
s := newStatefulSetScheduler(ctx, cfg, sa, nil, lsp.GetPodLister().Pods(testNs)).(*StatefulSetScheduler)
s := newStatefulSetScheduler(ctx, cfg, sa, nil, lsp.GetPodLister().Pods(testNs))
if tc.pending != nil {
s.pending = tc.pending
}
Expand Down Expand Up @@ -877,13 +881,49 @@ func TestReservePlacements(t *testing.T) {
StatefulSetName: sfsName,
VPodLister: vpodClient.List,
}
s := newStatefulSetScheduler(ctx, cfg, nil, nil, nil).(*StatefulSetScheduler)
fa := newFakeAutoscaler()
s := newStatefulSetScheduler(ctx, cfg, nil, fa, nil)
_ = s.Promote(reconciler.UniversalBucket(), func(bucket reconciler.Bucket, name types.NamespacedName) {})

s.reservePlacements(tc.vpod, tc.vpod.GetPlacements()) //initial reserve

s.reservePlacements(tc.vpod, tc.placements) //new reserve
if !reflect.DeepEqual(s.reserved[tc.vpod.GetKey()], tc.reserved) {
t.Errorf("got %v, want %v", s.reserved[tc.vpod.GetKey()], tc.reserved)
}

assert.Equal(t, true, fa.isLeader.Load())

s.Demote(reconciler.UniversalBucket())
assert.Equal(t, false, fa.isLeader.Load())
})
}
}

type fakeAutoscaler struct {
isLeader atomic.Bool
}

func (f *fakeAutoscaler) Start(ctx context.Context) {
}

func (f *fakeAutoscaler) Autoscale(ctx context.Context, attemptScaleDown bool, pending int32) {
}

func newFakeAutoscaler() *fakeAutoscaler {
return &fakeAutoscaler{
isLeader: atomic.Bool{},
}
}

func (f *fakeAutoscaler) Promote(b reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error {
f.isLeader.Store(true)
return nil
}

func (f *fakeAutoscaler) Demote(bucket reconciler.Bucket) {
f.isLeader.Store(false)
}

var _ reconciler.LeaderAware = &fakeAutoscaler{}
var _ Autoscaler = &fakeAutoscaler{}