Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Prevented stuck status due to timeouts during scalers generation #5084

Merged
merged 8 commits into from
Oct 16, 2023
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Here is an overview of all new **experimental** features:

### Fixes

- **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Prevented stuck status due to timeouts during scalers generation ([#5083](https://github.com/kedacore/keda/issues/5083))

### Deprecations

Expand Down
32 changes: 16 additions & 16 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,36 +291,25 @@ func (h *scaleHandler) getScalersCacheForScaledObject(ctx context.Context, scale
// performGetScalersCache returns cache for input scalableObject, it is common code used by GetScalersCache() and getScalersCacheForScaledObject() methods
func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, scalableObject interface{}, scalableObjectGeneration *int64, scalableObjectKind, scalableObjectNamespace, scalableObjectName string) (*cache.ScalersCache, error) {
h.scalerCachesLock.RLock()
regenerateCache := false
if cache, ok := h.scalerCaches[key]; ok {
// generation was specified -> let's include it in the check as well
if scalableObjectGeneration != nil {
if cache.ScalableObjectGeneration == *scalableObjectGeneration {
h.scalerCachesLock.RUnlock()
return cache, nil
}
// object was found in cache, but the generation is not correct,
// we'll need to close scalers in the cache and
// proceed further to recreate the cache
regenerateCache = false
} else {
h.scalerCachesLock.RUnlock()
return cache, nil
}
}
h.scalerCachesLock.RUnlock()

h.scalerCachesLock.Lock()
defer h.scalerCachesLock.Unlock()
if cache, ok := h.scalerCaches[key]; ok {
// generation was specified -> let's include it in the check as well
if scalableObjectGeneration != nil {
if cache.ScalableObjectGeneration == *scalableObjectGeneration {
return cache, nil
}
// object was found in cache, but the generation is not correct,
// let's close scalers in the cache and proceed further to recreate the cache
cache.Close(ctx)
} else {
return cache, nil
}
}

if scalableObject == nil {
switch scalableObjectKind {
case "ScaledObject":
Expand Down Expand Up @@ -388,7 +377,18 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s
default:
}

// Scalers Close() could be impacted by timeouts, blocking the mutex
// until the timeout happens. Instead of locking the mutex, we take
// the old cache item and we close it in another goroutine, not locking
// the cache: https://github.com/kedacore/keda/issues/5083
if regenerateCache {
oldCache := h.scalerCaches[key]
go oldCache.Close(ctx)
}

h.scalerCachesLock.Lock()
h.scalerCaches[key] = newCache
h.scalerCachesLock.Unlock()
return h.scalerCaches[key], nil
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
//go:build e2e
// +build e2e

package broken_scaledobject_tolerancy_test

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/client-go/kubernetes"

. "github.com/kedacore/keda/v2/tests/helper"
)

const (
testName = "broken-scaledobject-tolerancy-test"
)

var (
testNamespace = fmt.Sprintf("%s-ns", testName)
deploymentName = fmt.Sprintf("%s-deployment", testName)
monitoredDeploymentName = fmt.Sprintf("%s-monitored", testName)
scaledObjectName = fmt.Sprintf("%s-so", testName)
)

type templateData struct {
TestNamespace string
DeploymentName string
MonitoredDeploymentName string
ScaledObjectName string
}

const (
monitoredDeploymentTemplate = `apiVersion: apps/v1
kind: Deployment
metadata:
name: {{.MonitoredDeploymentName}}
namespace: {{.TestNamespace}}
labels:
deploy: workload-test
spec:
replicas: 0
selector:
matchLabels:
pod: workload-test
template:
metadata:
labels:
pod: workload-test
spec:
containers:
- name: nginx
image: 'nginxinc/nginx-unprivileged'`

deploymentTemplate = `apiVersion: apps/v1
kind: Deployment
metadata:
name: {{.DeploymentName}}
namespace: {{.TestNamespace}}
labels:
deploy: workload-sut
spec:
replicas: 0
selector:
matchLabels:
pod: workload-sut
template:
metadata:
labels:
pod: workload-sut
spec:
containers:
- name: nginx
image: 'nginxinc/nginx-unprivileged'`

brokenScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}-broken
namespace: {{.TestNamespace}}
spec:
scaleTargetRef:
name: {{.MonitoredDeploymentName}}
minReplicaCount: 0
maxReplicaCount: 1
triggers:
- metadata:
activationLagThreshold: '1'
bootstrapServers: 1.2.3.4:9092
consumerGroup: earliest
lagThreshold: '1'
offsetResetPolicy: earliest
topic: kafka-topic
type: kafka
`

scaledObjectTemplate = `apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}
namespace: {{.TestNamespace}}
spec:
scaleTargetRef:
name: {{.DeploymentName}}
pollingInterval: 1
cooldownPeriod: 0
minReplicaCount: 0
maxReplicaCount: 10
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleDown:
stabilizationWindowSeconds: 5
triggers:
- type: kubernetes-workload
metadata:
podSelector: 'pod=workload-test'
value: '1'
`
)

JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
func TestBrokenScaledObjectTolerance(t *testing.T) {
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
// setup
t.Log("--- setting up ---")
// Create kubernetes resources
kc := GetKubernetesClient(t)
data, templates := getTemplateData()

CreateKubernetesResources(t, kc, testNamespace, data, templates)

testScaleOut(t, kc)
testScaleIn(t, kc)

// cleanup
DeleteKubernetesResources(t, testNamespace, data, templates)
}

func getTemplateData() (templateData, []Template) {
return templateData{
TestNamespace: testNamespace,
DeploymentName: deploymentName,
ScaledObjectName: scaledObjectName,
MonitoredDeploymentName: monitoredDeploymentName,
}, []Template{
{Name: "deploymentTemplate", Config: deploymentTemplate},
{Name: "monitoredDeploymentTemplate", Config: monitoredDeploymentTemplate},
{Name: "scaledObjectTemplate", Config: scaledObjectTemplate},
{Name: "brokenScaledObjectTemplate", Config: brokenScaledObjectTemplate},
}
}

func testScaleOut(t *testing.T, kc *kubernetes.Clientset) {
// scale monitored deployment to 2 replicas
replicas := 2
KubernetesScaleDeployment(t, kc, monitoredDeploymentName, int64(replicas), testNamespace)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, replicas, 10, 6),
fmt.Sprintf("replica count should be %d after 1 minute", replicas))

// scale monitored deployment to 4 replicas
replicas = 4
KubernetesScaleDeployment(t, kc, monitoredDeploymentName, int64(replicas), testNamespace)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, replicas, 10, 6),
fmt.Sprintf("replica count should be %d after 1 minute", replicas))
}

func testScaleIn(t *testing.T, kc *kubernetes.Clientset) {
// scale monitored deployment to 2 replicas
replicas := 2
KubernetesScaleDeployment(t, kc, monitoredDeploymentName, int64(replicas), testNamespace)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, replicas, 10, 6),
fmt.Sprintf("replica count should be %d after 1 minute", replicas))

// scale monitored deployment to 0 replicas
replicas = 0
KubernetesScaleDeployment(t, kc, monitoredDeploymentName, int64(replicas), testNamespace)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, replicas, 10, 6),
fmt.Sprintf("replica count should be %d after 1 minute", replicas))
}