Skip to content

Commit

Permalink
fix: scaleobject ready condition 'False/Unknow' to 'True' requeue (#3097
Browse files Browse the repository at this point in the history
)

Signed-off-by: champly <champly@outlook.com>
  • Loading branch information
champly authored Jun 7, 2022
1 parent c00294c commit e816248
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
- **General:** Use metricName from GetMetricsSpec in ScaledJobs instead of `queueLength` ([#3032](https://github.com/kedacore/keda/issue/3032))
- **General:** Refactor adapter startup to ensure proper log initilization. ([2316](https://github.com/kedacore/keda/issues/2316))
- **Azure Eventhub Scaler:** KEDA operator crashes on nil memory panic if the eventhub connectionstring for Azure Eventhub Scaler contains an invalid character ([#3082](https://github.com/kedacore/keda/issues/3082))
- **General:** Scaleobject ready condition 'False/Unknow' to 'True' requeue([#3096](https://github.com/kedacore/keda/issues/3096))

### Deprecations

Expand Down
6 changes: 5 additions & 1 deletion controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont
// (in this case metadata.Generation does not change)
// so reconcile loop is not started on Status updates
For(&kedav1alpha1.ScaledObject{}, builder.WithPredicates(
predicate.Or(kedacontrollerutil.PausedReplicasPredicate{}, predicate.GenerationChangedPredicate{}),
predicate.Or(
kedacontrollerutil.PausedReplicasPredicate{},
kedacontrollerutil.ScaleObjectReadyConditionPredicate{},
predicate.GenerationChangedPredicate{},
),
)).
Owns(&autoscalingv2beta2.HorizontalPodAutoscaler{}).
Complete(r)
Expand Down
112 changes: 112 additions & 0 deletions controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package keda
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/golang/mock/gomock"
Expand All @@ -27,6 +28,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -598,6 +600,116 @@ var _ = Describe("ScaledObjectController", func() {
}, 20*time.Second).Should(Equal(metav1.ConditionFalse))
})
})

It("scaleobject ready condition 'False/Unknow' to 'True' will requeue", func() {
var (
deploymentName = "conditionchange"
soName = "so-" + deploymentName
min int32 = 1
max int32 = 5
pollingInterVal int32 = 1
)

// Create the scaling target.
err := k8sClient.Create(context.Background(), generateDeployment(deploymentName))
Expect(err).ToNot(HaveOccurred())

so := &kedav1alpha1.ScaledObject{
ObjectMeta: metav1.ObjectMeta{Name: soName, Namespace: "default"},
Spec: kedav1alpha1.ScaledObjectSpec{
ScaleTargetRef: &kedav1alpha1.ScaleTarget{
Name: deploymentName,
},
MinReplicaCount: &min,
MaxReplicaCount: &max,
PollingInterval: &pollingInterVal,
Triggers: []kedav1alpha1.ScaleTriggers{
{
Type: "cpu",
MetricType: autoscalingv2beta2.UtilizationMetricType,
Metadata: map[string]string{
"value": "50",
},
},
{
Type: "external-mock",
MetricType: autoscalingv2beta2.AverageValueMetricType,
Metadata: map[string]string{},
},
},
},
}
err = k8sClient.Create(context.Background(), so)
Expect(err).ToNot(HaveOccurred())

// wait so's ready condition Ready
Eventually(func() metav1.ConditionStatus {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
return so.Status.Conditions.GetReadyCondition().Status
}, 5*time.Second).Should(Equal(metav1.ConditionTrue))

// check hpa
hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
Eventually(func() int {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
Expect(err).ToNot(HaveOccurred())
return len(hpa.Spec.Metrics)
}, 1*time.Second).Should(Equal(2))

// mock external server offline
atomic.StoreInt32(&scalers.MockExternalServerStatus, scalers.MockExternalServerStatusOffline)

// wait so's ready condition not
Eventually(func() metav1.ConditionStatus {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
return so.Status.Conditions.GetReadyCondition().Status
}, 5*time.Second).Should(Or(Equal(metav1.ConditionFalse), Equal(metav1.ConditionUnknown)))

// mock kube-controller-manager request v1beta1.custom.metrics.k8s.io api GetMetrics
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
Expect(err).ToNot(HaveOccurred())
hpa.Status.CurrentMetrics = []autoscalingv2beta2.MetricStatus{
{
Type: autoscalingv2beta2.ResourceMetricSourceType,
Resource: &autoscalingv2beta2.ResourceMetricStatus{
Name: corev1.ResourceCPU,
Current: autoscalingv2beta2.MetricValueStatus{
Value: resource.NewQuantity(int64(100), resource.DecimalSI),
},
},
},
}
err = k8sClient.Status().Update(ctx, hpa)
Expect(err).ToNot(HaveOccurred())

// hpa metrics will only left CPU metric
Eventually(func() int {
hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
Expect(err).ToNot(HaveOccurred())
return len(hpa.Spec.Metrics)
}, 5*time.Second).Should(Equal(1))

// mock external server online
atomic.StoreInt32(&scalers.MockExternalServerStatus, scalers.MockExternalServerStatusOnline)

// wait so's ready condition Ready
Eventually(func() metav1.ConditionStatus {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
return so.Status.Conditions.GetReadyCondition().Status
}, 5*time.Second).Should(Equal(metav1.ConditionTrue))

// hpa will recover
Eventually(func() int {
hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
Expect(err).ToNot(HaveOccurred())
return len(hpa.Spec.Metrics)
}, 5*time.Second).Should(Equal(2))
})
})

func generateDeployment(name string) *appsv1.Deployment {
Expand Down
33 changes: 33 additions & 0 deletions controllers/keda/util/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package util
import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas"
Expand All @@ -28,3 +30,34 @@ func (PausedReplicasPredicate) Update(e event.UpdateEvent) bool {
}
return false
}

type ScaleObjectReadyConditionPredicate struct {
predicate.Funcs
}

func (ScaleObjectReadyConditionPredicate) Update(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}

var newReadyCondition, oldReadyCondition kedav1alpha1.Condition

oldObj, ok := e.ObjectOld.(*kedav1alpha1.ScaledObject)
if !ok {
return false
}
oldReadyCondition = oldObj.Status.Conditions.GetReadyCondition()

newObj, ok := e.ObjectNew.(*kedav1alpha1.ScaledObject)
if !ok {
return false
}
newReadyCondition = newObj.Status.Conditions.GetReadyCondition()

// False/Unknown -> True
if !oldReadyCondition.IsTrue() && newReadyCondition.IsTrue() {
return true
}

return false
}
91 changes: 91 additions & 0 deletions pkg/scalers/external_mock_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package scalers

import (
"context"
"errors"
"sync/atomic"

"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
)

const (
MockExternalServerStatusOffline int32 = 0
MockExternalServerStatusOnline int32 = 1
)

var (
MockExternalServerStatus = MockExternalServerStatusOnline
ErrMock = errors.New("mock error")
MockMetricName = "mockMetricName"
MockMetricTarget int64 = 50
MockMetricValue int64 = 100
)

type externalMockScaler struct{}

func NewExternalMockScaler(config *ScalerConfig) (Scaler, error) {
return &externalMockScaler{}, nil
}

// IsActive implements Scaler
func (*externalMockScaler) IsActive(ctx context.Context) (bool, error) {
if atomic.LoadInt32(&MockExternalServerStatus) != MockExternalServerStatusOnline {
return false, ErrMock
}

return true, nil
}

// Close implements Scaler
func (*externalMockScaler) Close(ctx context.Context) error {
return nil
}

// GetMetricSpecForScaling implements Scaler
func (*externalMockScaler) GetMetricSpecForScaling(ctx context.Context) []v2beta2.MetricSpec {
if atomic.LoadInt32(&MockExternalServerStatus) != MockExternalServerStatusOnline {
return nil
}

return getMockMetricsSpecs()
}

// GetMetrics implements Scaler
func (*externalMockScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
if atomic.LoadInt32(&MockExternalServerStatus) != MockExternalServerStatusOnline {
return nil, ErrMock
}

return getMockExternalMetricsValue(), nil
}

func getMockMetricsSpecs() []v2beta2.MetricSpec {
return []v2beta2.MetricSpec{
{
Type: v2beta2.ExternalMetricSourceType,
External: &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: MockMetricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.ValueMetricType,
Value: resource.NewQuantity(MockMetricValue, resource.DecimalSI),
},
},
},
}
}

func getMockExternalMetricsValue() []external_metrics.ExternalMetricValue {
return []external_metrics.ExternalMetricValue{
{
MetricName: MockMetricName,
Value: *resource.NewQuantity(MockMetricValue, resource.DecimalSI),
Timestamp: metav1.Now(),
},
}
}
2 changes: 1 addition & 1 deletion pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
// but ScaledObject.Status.ReadyCondition is set not set to 'true' -> set it back to 'true'
readyCondition := scaledObject.Status.Conditions.GetReadyCondition()
if !isError && !readyCondition.IsTrue() {
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionFalse,
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionTrue,
kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil {
logger.Error(err, "error setting ready condition")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
return scalers.NewElasticsearchScaler(config)
case "external":
return scalers.NewExternalScaler(config)
// TODO: use other way for test.
case "external-mock":
return scalers.NewExternalMockScaler(config)
case "external-push":
return scalers.NewExternalPushScaler(config)
case "gcp-pubsub":
Expand Down

0 comments on commit e816248

Please sign in to comment.