Skip to content

Commit

Permalink
[BUG-5922] Report failing ScaledJob triggers in status (kedacore#5916)
Browse files Browse the repository at this point in the history
Signed-off-by: Josef Karasek <josef@kedify.io>
Signed-off-by: Jorge Turrado <jorge_turrado@hotmail.es>
  • Loading branch information
josefkarasek authored and JorTurFer committed Jul 30, 2024
1 parent c174395 commit 1738a2c
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 23 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ Here is an overview of all new **experimental** features:
### Fixes

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Check for missing CRD references and sample CRs ([#5920](https://github.com/kedacore/keda/issues/5920))
- **General**: Scalers are properly closed after being refreshed ([#5806](https://github.com/kedacore/keda/issues/5806))
- **MongoDB Scaler**: MongoDB url parses correctly `+srv` scheme ([#5760](https://github.com/kedacore/keda/issues/5760))
- **New Relic Scaler**: Fix CVE-2024-6104 in github.com/hashicorp/go-retryablehttp ([#5944](https://github.com/kedacore/keda/issues/5944))
- **ScaledJob**: Fix ScaledJob ignores failing trigger(s) error ([#5922](https://github.com/kedacore/keda/issues/5922))

### Deprecations

Expand Down
8 changes: 4 additions & 4 deletions pkg/mock/mock_scaling/mock_executor/mock_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (

// ScaleExecutor contains methods RequestJobScale and RequestScale
type ScaleExecutor interface {
RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64)
RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, isError bool, scaleTo int64, maxScale int64)
RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool, options *ScaleExecutorOptions)
}

Expand Down
15 changes: 14 additions & 1 deletion pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
defaultFailedJobsHistoryLimit = int32(100)
)

func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64) {
func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive, isError bool, scaleTo int64, maxScale int64) {
logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace)

runningJobCount := e.getRunningJobCount(ctx, scaledJob)
Expand All @@ -65,6 +65,19 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al
logger.V(1).Info("No change in activity")
}

if isError {
// some triggers responded with error
// Set ScaledJob.Status.ReadyCondition to Unknown
readyCondition := scaledJob.Status.Conditions.GetReadyCondition()
msg := "Some triggers defined in ScaledJob are not working correctly"
logger.V(1).Info(msg)
if !readyCondition.IsUnknown() {
if err := e.setReadyCondition(ctx, logger, scaledJob, metav1.ConditionUnknown, "PartialTriggerError", msg); err != nil {
logger.Error(err, "error setting ready condition")
}
}
}

condition := scaledJob.Status.Conditions.GetActiveCondition()
if condition.IsUnknown() || condition.IsTrue() != isActive {
if isActive {
Expand Down
20 changes: 11 additions & 9 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac
return
}

isActive, scaleTo, maxScale := h.isScaledJobActive(ctx, obj)
h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale)
isActive, isError, scaleTo, maxScale := h.isScaledJobActive(ctx, obj)
h.scaleExecutor.RequestJobScale(ctx, obj, isActive, isError, scaleTo, maxScale)
}
}

Expand Down Expand Up @@ -816,15 +816,16 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler,

// getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace.
// It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler.
func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scaledjob.ScalerMetrics {
func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) ([]scaledjob.ScalerMetrics, bool) {
logger := log.WithValues("scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)

cache, err := h.GetScalersCache(ctx, scaledJob)
metricscollector.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err)
if err != nil {
log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)
return nil
return nil, true
}
var isError bool
var scalersMetrics []scaledjob.ScalerMetrics
scalers, scalerConfigs := cache.GetScalers()
for scalerIndex, scaler := range scalers {
Expand Down Expand Up @@ -852,8 +853,9 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
metricscollector.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, latency)
}
if err != nil {
scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err)
scalerLogger.Error(err, "Error getting scaler metrics and activity, but continue")
cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
isError = true
continue
}
if isTriggerActive {
Expand Down Expand Up @@ -886,21 +888,21 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
metricscollector.RecordScalerActive(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, isTriggerActive)
}
}
return scalersMetrics
return scalersMetrics, isError
}

// isScaledJobActive returns whether the input ScaledJob:
// is active as the first return value,
// the second and the third return values indicate queueLength and maxValue for scale
func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, bool, int64, int64) {
logger := logf.Log.WithName("scalemetrics")

scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob)
scalersMetrics, isError := h.getScaledJobMetrics(ctx, scaledJob)
isActive, queueLength, maxValue, maxFloatValue :=
scaledjob.IsScaledJobActive(scalersMetrics, scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation, scaledJob.MinReplicaCount(), scaledJob.MaxReplicaCount())

logger.V(1).WithValues("scaledJob.Name", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxFloatValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation)
return isActive, queueLength, maxValue
return isActive, isError, queueLength, maxValue
}

// getTrueMetricArray is a help function made for composite scaler to determine
Expand Down
25 changes: 17 additions & 8 deletions pkg/scaling/scale_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,19 +661,21 @@ func TestIsScaledJobActive(t *testing.T) {
scalerCachesLock: &sync.RWMutex{},
scaledObjectsMetricCache: metricscache.NewMetricsCache(),
}
isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle)
// nosemgrep: context-todo
isActive, isError, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle)
assert.Equal(t, true, isActive)
assert.Equal(t, false, isError)
assert.Equal(t, int64(20), queueLength)
assert.Equal(t, int64(10), maxValue)
scalerCache.Close(context.Background())

// Test the valiation
scalerTestDatam := []scalerTestData{
newScalerTestData("s0-queueLength", 100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 20, 20),
newScalerTestData("queueLength", 100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 5, 2),
newScalerTestData("messageCount", 100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 12, 9),
newScalerTestData("s3-messageCount", 100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 27),
newScalerTestData("s10-messageCount", 25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 25),
newScalerTestData("s0-queueLength", 100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 20, 20),
newScalerTestData("queueLength", 100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 5, 2),
newScalerTestData("messageCount", 100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 12, 9),
newScalerTestData("s3-messageCount", 100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 35, 27),
newScalerTestData("s10-messageCount", 25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 35, 25),
}

for index, scalerTestData := range scalerTestDatam {
Expand Down Expand Up @@ -717,9 +719,11 @@ func TestIsScaledJobActive(t *testing.T) {
scaledObjectsMetricCache: metricscache.NewMetricsCache(),
}
fmt.Printf("index: %d", index)
isActive, queueLength, maxValue = sh.isScaledJobActive(context.TODO(), scaledJob)
// nosemgrep: context-todo
isActive, isError, queueLength, maxValue = sh.isScaledJobActive(context.TODO(), scaledJob)
// assert.Equal(t, 5, index)
assert.Equal(t, scalerTestData.ResultIsActive, isActive)
assert.Equal(t, scalerTestData.ResultIsError, isError)
assert.Equal(t, scalerTestData.ResultQueueLength, queueLength)
assert.Equal(t, scalerTestData.ResultMaxValue, maxValue)
scalerCache.Close(context.Background())
Expand Down Expand Up @@ -757,8 +761,10 @@ func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T
scaledObjectsMetricCache: metricscache.NewMetricsCache(),
}

isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle)
// nosemgrep: context-todo
isActive, isError, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle)
assert.Equal(t, true, isActive)
assert.Equal(t, false, isError)
assert.Equal(t, int64(0), queueLength)
assert.Equal(t, int64(0), maxValue)
scalerCache.Close(context.Background())
Expand All @@ -781,6 +787,7 @@ func newScalerTestData(
scaler4AverageValue int, //nolint:golint,unparam
scaler4IsActive bool, //nolint:golint,unparam
resultIsActive bool, //nolint:golint,unparam
resultIsError bool, //nolint:golint,unparam
resultQueueLength,
resultMaxLength int) scalerTestData {
return scalerTestData{
Expand All @@ -800,6 +807,7 @@ func newScalerTestData(
Scaler4AverageValue: int64(scaler4AverageValue),
Scaler4IsActive: scaler4IsActive,
ResultIsActive: resultIsActive,
ResultIsError: resultIsError,
ResultQueueLength: int64(resultQueueLength),
ResultMaxValue: int64(resultMaxLength),
}
Expand All @@ -822,6 +830,7 @@ type scalerTestData struct {
Scaler4AverageValue int64
Scaler4IsActive bool
ResultIsActive bool
ResultIsError bool
ResultQueueLength int64
ResultMaxValue int64
MinReplicaCount int32
Expand Down

0 comments on commit 1738a2c

Please sign in to comment.