diff --git a/CHANGELOG.md b/CHANGELOG.md index 302c4d57ae3..83515cf4631 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,6 +77,11 @@ Here is an overview of all new **experimental** features: ### Fixes - TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **General**: Check for missing CRD references and sample CRs ([#5920](https://github.com/kedacore/keda/issues/5920)) +- **General**: Scalers are properly closed after being refreshed ([#5806](https://github.com/kedacore/keda/issues/5806)) +- **MongoDB Scaler**: MongoDB url parses correctly `+srv` scheme ([#5760](https://github.com/kedacore/keda/issues/5760)) +- **New Relic Scaler**: Fix CVE-2024-6104 in github.com/hashicorp/go-retryablehttp ([#5944](https://github.com/kedacore/keda/issues/5944)) +- **ScaledJob**: Fix ScaledJob ignores failing trigger(s) error ([#5922](https://github.com/kedacore/keda/issues/5922)) ### Deprecations diff --git a/pkg/mock/mock_scaling/mock_executor/mock_interface.go b/pkg/mock/mock_scaling/mock_executor/mock_interface.go index 53836a597fb..eef9c5fc4bd 100644 --- a/pkg/mock/mock_scaling/mock_executor/mock_interface.go +++ b/pkg/mock/mock_scaling/mock_executor/mock_interface.go @@ -42,15 +42,15 @@ func (m *MockScaleExecutor) EXPECT() *MockScaleExecutorMockRecorder { } // RequestJobScale mocks base method. -func (m *MockScaleExecutor) RequestJobScale(ctx context.Context, scaledJob *v1alpha1.ScaledJob, isActive bool, scaleTo, maxScale int64) { +func (m *MockScaleExecutor) RequestJobScale(ctx context.Context, scaledJob *v1alpha1.ScaledJob, isActive, isError bool, scaleTo, maxScale int64) { m.ctrl.T.Helper() - m.ctrl.Call(m, "RequestJobScale", ctx, scaledJob, isActive, scaleTo, maxScale) + m.ctrl.Call(m, "RequestJobScale", ctx, scaledJob, isActive, isError, scaleTo, maxScale) } // RequestJobScale indicates an expected call of RequestJobScale. -func (mr *MockScaleExecutorMockRecorder) RequestJobScale(ctx, scaledJob, isActive, scaleTo, maxScale any) *gomock.Call { +func (mr *MockScaleExecutorMockRecorder) RequestJobScale(ctx, scaledJob, isActive, isError, scaleTo, maxScale any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestJobScale", reflect.TypeOf((*MockScaleExecutor)(nil).RequestJobScale), ctx, scaledJob, isActive, scaleTo, maxScale) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestJobScale", reflect.TypeOf((*MockScaleExecutor)(nil).RequestJobScale), ctx, scaledJob, isActive, isError, scaleTo, maxScale) } // RequestScale mocks base method. diff --git a/pkg/scaling/executor/scale_executor.go b/pkg/scaling/executor/scale_executor.go index ee5f15aa171..b28061a495e 100644 --- a/pkg/scaling/executor/scale_executor.go +++ b/pkg/scaling/executor/scale_executor.go @@ -39,7 +39,7 @@ const ( // ScaleExecutor contains methods RequestJobScale and RequestScale type ScaleExecutor interface { - RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64) + RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, isError bool, scaleTo int64, maxScale int64) RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool, options *ScaleExecutorOptions) } diff --git a/pkg/scaling/executor/scale_jobs.go b/pkg/scaling/executor/scale_jobs.go index 52f7ea37fdc..1e9407284ae 100644 --- a/pkg/scaling/executor/scale_jobs.go +++ b/pkg/scaling/executor/scale_jobs.go @@ -38,7 +38,7 @@ const ( defaultFailedJobsHistoryLimit = int32(100) ) -func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64) { +func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive, isError bool, scaleTo int64, maxScale int64) { logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace) runningJobCount := e.getRunningJobCount(ctx, scaledJob) @@ -65,6 +65,19 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al logger.V(1).Info("No change in activity") } + if isError { + // some triggers responded with error + // Set ScaledJob.Status.ReadyCondition to Unknown + readyCondition := scaledJob.Status.Conditions.GetReadyCondition() + msg := "Some triggers defined in ScaledJob are not working correctly" + logger.V(1).Info(msg) + if !readyCondition.IsUnknown() { + if err := e.setReadyCondition(ctx, logger, scaledJob, metav1.ConditionUnknown, "PartialTriggerError", msg); err != nil { + logger.Error(err, "error setting ready condition") + } + } + } + condition := scaledJob.Status.Conditions.GetActiveCondition() if condition.IsUnknown() || condition.IsTrue() != isActive { if isActive { diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 52ff3c3c742..8655cf50dc9 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -259,8 +259,8 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac return } - isActive, scaleTo, maxScale := h.isScaledJobActive(ctx, obj) - h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale) + isActive, isError, scaleTo, maxScale := h.isScaledJobActive(ctx, obj) + h.scaleExecutor.RequestJobScale(ctx, obj, isActive, isError, scaleTo, maxScale) } } @@ -816,15 +816,16 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler, // getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace. // It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler. -func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scaledjob.ScalerMetrics { +func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) ([]scaledjob.ScalerMetrics, bool) { logger := log.WithValues("scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) cache, err := h.GetScalersCache(ctx, scaledJob) metricscollector.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err) if err != nil { log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) - return nil + return nil, true } + var isError bool var scalersMetrics []scaledjob.ScalerMetrics scalers, scalerConfigs := cache.GetScalers() for scalerIndex, scaler := range scalers { @@ -852,8 +853,9 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav metricscollector.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, latency) } if err != nil { - scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err) + scalerLogger.Error(err, "Error getting scaler metrics and activity, but continue") cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + isError = true continue } if isTriggerActive { @@ -886,21 +888,21 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav metricscollector.RecordScalerActive(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, isTriggerActive) } } - return scalersMetrics + return scalersMetrics, isError } // isScaledJobActive returns whether the input ScaledJob: // is active as the first return value, // the second and the third return values indicate queueLength and maxValue for scale -func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { +func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, bool, int64, int64) { logger := logf.Log.WithName("scalemetrics") - scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob) + scalersMetrics, isError := h.getScaledJobMetrics(ctx, scaledJob) isActive, queueLength, maxValue, maxFloatValue := scaledjob.IsScaledJobActive(scalersMetrics, scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation, scaledJob.MinReplicaCount(), scaledJob.MaxReplicaCount()) logger.V(1).WithValues("scaledJob.Name", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxFloatValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) - return isActive, queueLength, maxValue + return isActive, isError, queueLength, maxValue } // getTrueMetricArray is a help function made for composite scaler to determine diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index 7a822c163a8..23180308841 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -661,19 +661,21 @@ func TestIsScaledJobActive(t *testing.T) { scalerCachesLock: &sync.RWMutex{}, scaledObjectsMetricCache: metricscache.NewMetricsCache(), } - isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle) + // nosemgrep: context-todo + isActive, isError, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle) assert.Equal(t, true, isActive) + assert.Equal(t, false, isError) assert.Equal(t, int64(20), queueLength) assert.Equal(t, int64(10), maxValue) scalerCache.Close(context.Background()) // Test the valiation scalerTestDatam := []scalerTestData{ - newScalerTestData("s0-queueLength", 100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 20, 20), - newScalerTestData("queueLength", 100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 5, 2), - newScalerTestData("messageCount", 100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 12, 9), - newScalerTestData("s3-messageCount", 100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 27), - newScalerTestData("s10-messageCount", 25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 25), + newScalerTestData("s0-queueLength", 100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 20, 20), + newScalerTestData("queueLength", 100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 5, 2), + newScalerTestData("messageCount", 100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 12, 9), + newScalerTestData("s3-messageCount", 100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 35, 27), + newScalerTestData("s10-messageCount", 25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 35, 25), } for index, scalerTestData := range scalerTestDatam { @@ -717,9 +719,11 @@ func TestIsScaledJobActive(t *testing.T) { scaledObjectsMetricCache: metricscache.NewMetricsCache(), } fmt.Printf("index: %d", index) - isActive, queueLength, maxValue = sh.isScaledJobActive(context.TODO(), scaledJob) + // nosemgrep: context-todo + isActive, isError, queueLength, maxValue = sh.isScaledJobActive(context.TODO(), scaledJob) // assert.Equal(t, 5, index) assert.Equal(t, scalerTestData.ResultIsActive, isActive) + assert.Equal(t, scalerTestData.ResultIsError, isError) assert.Equal(t, scalerTestData.ResultQueueLength, queueLength) assert.Equal(t, scalerTestData.ResultMaxValue, maxValue) scalerCache.Close(context.Background()) @@ -757,8 +761,10 @@ func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T scaledObjectsMetricCache: metricscache.NewMetricsCache(), } - isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle) + // nosemgrep: context-todo + isActive, isError, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle) assert.Equal(t, true, isActive) + assert.Equal(t, false, isError) assert.Equal(t, int64(0), queueLength) assert.Equal(t, int64(0), maxValue) scalerCache.Close(context.Background()) @@ -781,6 +787,7 @@ func newScalerTestData( scaler4AverageValue int, //nolint:golint,unparam scaler4IsActive bool, //nolint:golint,unparam resultIsActive bool, //nolint:golint,unparam + resultIsError bool, //nolint:golint,unparam resultQueueLength, resultMaxLength int) scalerTestData { return scalerTestData{ @@ -800,6 +807,7 @@ func newScalerTestData( Scaler4AverageValue: int64(scaler4AverageValue), Scaler4IsActive: scaler4IsActive, ResultIsActive: resultIsActive, + ResultIsError: resultIsError, ResultQueueLength: int64(resultQueueLength), ResultMaxValue: int64(resultMaxLength), } @@ -822,6 +830,7 @@ type scalerTestData struct { Scaler4AverageValue int64 Scaler4IsActive bool ResultIsActive bool + ResultIsError bool ResultQueueLength int64 ResultMaxValue int64 MinReplicaCount int32