Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: h-scale opsRequest validation considers replicas and shards limit #8834

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion apis/apps/v1/shardingdefinition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ type ShardsLimit struct {
// +kubebuilder:validation:Required
MinShards int32 `json:"minShards"`

// The maximum limit of replicas.
// The maximum limit of shards.
//
// +kubebuilder:validation:Required
MaxShards int32 `json:"maxShards"`
Expand Down
53 changes: 49 additions & 4 deletions apis/operations/v1alpha1/opsrequest_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func compareQuantity(requestQuantity, limitQuantity *resource.Quantity) bool {
}

// validateHorizontalScaling validates api when spec.type is HorizontalScaling
func (r *OpsRequest) validateHorizontalScaling(_ context.Context, _ client.Client, cluster *appsv1.Cluster) error {
func (r *OpsRequest) validateHorizontalScaling(ctx context.Context, cli client.Client, cluster *appsv1.Cluster) error {
horizontalScalingList := r.Spec.HorizontalScalingList
if len(horizontalScalingList) == 0 {
return notEmptyError("spec.horizontalScaling")
Expand All @@ -338,15 +338,41 @@ func (r *OpsRequest) validateHorizontalScaling(_ context.Context, _ client.Clien
return err
}
for _, comSpec := range cluster.Spec.ComponentSpecs {
// Default values if no limit is found
minNum, maxNum := 0, 16384
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default min replicas limit is 1, but not 0.

if comSpec.ComponentDef != "" {
compDef := &appsv1.ComponentDefinition{}
if err := cli.Get(ctx, client.ObjectKey{Name: comSpec.ComponentDef, Namespace: r.Namespace}, compDef); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should only be obtained when h-scale.

return err
}
if compDef != nil && compDef.Spec.ReplicasLimit != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compDef must not be nil.

minNum = int(compDef.Spec.ReplicasLimit.MinReplicas)
maxNum = int(compDef.Spec.ReplicasLimit.MaxReplicas)
}
}
if hScale, ok := hScaleMap[comSpec.Name]; ok {
if err := r.validateHorizontalScalingSpec(hScale, comSpec, cluster.Name, false); err != nil {
if err := r.validateHorizontalScalingSpec(hScale, comSpec, cluster.Name, false, maxNum, minNum); err != nil {
return err
}
}
}
for _, spec := range cluster.Spec.Shardings {
// Default values if no limit is found
minNum, maxNum := 0, 2048
if spec.ShardingDef != "" {
shardingDef := &appsv1.ShardingDefinition{}
if err := cli.Get(ctx, types.NamespacedName{Name: spec.ShardingDef, Namespace: r.Namespace}, shardingDef); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

return err
}

if shardingDef != nil && shardingDef.Spec.ShardsLimit != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

minNum = int(shardingDef.Spec.ShardsLimit.MinShards)
maxNum = int(shardingDef.Spec.ShardsLimit.MaxShards)
}
}

if hScale, ok := hScaleMap[spec.Name]; ok {
if err := r.validateHorizontalScalingSpec(hScale, spec.Template, cluster.Name, true); err != nil {
if err := r.validateHorizontalScalingSpec(hScale, spec.Template, cluster.Name, true, maxNum, minNum); err != nil {
return err
}
}
Expand All @@ -364,7 +390,7 @@ func (r *OpsRequest) CountOfflineOrOnlineInstances(clusterName, componentName st
return offlineOrOnlineInsCountMap
}

func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, compSpec appsv1.ClusterComponentSpec, clusterName string, isSharding bool) error {
func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, compSpec appsv1.ClusterComponentSpec, clusterName string, isSharding bool, maxReplicasOrShards int, minReplicasOrShards int) error {
scaleIn := hScale.ScaleIn
scaleOut := hScale.ScaleOut
if hScale.Shards != nil {
Expand All @@ -374,6 +400,12 @@ func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, com
if scaleOut != nil || scaleIn != nil {
return fmt.Errorf(`shards field cannot be used together with scaleOut or scaleIn for the component "%s"`, hScale.ComponentName)
}
if int(*hScale.Shards) < minReplicasOrShards {
return fmt.Errorf(`the number of shards after horizontal scale violates the shards limit "%s"`, hScale.ComponentName)
}
if int(*hScale.Shards) > maxReplicasOrShards {
return fmt.Errorf(`the number of shards after horizontal scale violates the shards limit "%s"`, hScale.ComponentName)
}
return nil
}
if lastCompConfiguration, ok := r.Status.LastConfiguration.Components[hScale.ComponentName]; ok {
Expand Down Expand Up @@ -438,6 +470,19 @@ func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, com
if replicaChanger.ReplicaChanges != nil && allReplicaChanges > *replicaChanger.ReplicaChanges {
return fmt.Errorf(`%s "replicaChanges" can't be less than the sum of "replicaChanges" for specified instance templates`, msgPrefix)
}

var replicaChange int32
if replicaChanger.ReplicaChanges != nil {
replicaChange = *replicaChanger.ReplicaChanges
} else {
replicaChange = allReplicaChanges
}
if isScaleIn && int(compSpec.Replicas)-int(replicaChange) < minReplicasOrShards {
return fmt.Errorf(`the number of replicas after scaling down violates the replica limit for component "%s"`, hScale.ComponentName)
}
if !isScaleIn && int(compSpec.Replicas)+int(replicaChange) > maxReplicasOrShards {
return fmt.Errorf(`the number of replicas after scaling up violates the replica limit for component "%s"`, hScale.ComponentName)
}
return nil
}
if scaleIn != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ spec:
This field is immutable.
properties:
maxShards:
description: The maximum limit of replicas.
description: The maximum limit of shards.
format: int32
type: integer
minShards:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ spec:
This field is immutable.
properties:
maxShards:
description: The maximum limit of replicas.
description: The maximum limit of shards.
format: int32
type: integer
minShards:
Expand Down
145 changes: 141 additions & 4 deletions pkg/operations/horizontal_scaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ import (
var _ = Describe("HorizontalScaling OpsRequest", func() {

var (
randomStr = testCtx.GetRandomStr()
compDefName = "test-compdef-" + randomStr
clusterName = "test-cluster-" + randomStr
insTplName = "foo"
randomStr = testCtx.GetRandomStr()
compDefName = "test-compdef-" + randomStr
shardingDefName = "test-shardingdef-" + randomStr
clusterName = "test-cluster-" + randomStr
insTplName = "foo"
)

cleanEnv := func() {
Expand Down Expand Up @@ -476,6 +477,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() {
By("expect for opsRequest phase is Creating after doing action")
_, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
fmt.Printf("%v", opsRes.OpsRequest.Status)
Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsCreatingPhase))

By("do Action")
Expand Down Expand Up @@ -733,6 +735,141 @@ var _ = Describe("HorizontalScaling OpsRequest", func() {
g.Expect(pobj.Status.Phase).Should(Equal(opsv1alpha1.OpsSucceedPhase))
})).Should(Succeed())
})
It("should fail when scaling out beyond the max replicas", func() {
By("Setting up the component with a replica limit of 1 to 3")
opsRes, compDef, _ := initOperationsResources(compDefName, clusterName)
Expect(testapps.ChangeObj(&testCtx, compDef, func(compDef *appsv1.ComponentDefinition) {
compDef.Spec.ReplicasLimit = &appsv1.ReplicasLimit{
MinReplicas: 1,
MaxReplicas: 3,
}
})).Should(Succeed())

By("Creating a horizontal scaling operation to scale out beyond the limit")
ops := createHorizontalScaling(clusterName, opsv1alpha1.HorizontalScaling{
ComponentOps: opsv1alpha1.ComponentOps{
ComponentName: defaultCompName,
},
ScaleOut: &opsv1alpha1.ScaleOut{
OfflineInstancesToOnline: []string{clusterName + defaultCompName + "3", clusterName + defaultCompName + "4"},
},
}, false)

ops.Namespace = testCtx.DefaultNamespace
initClusterAnnotationAndPhaseForOps(opsRes)

By("Validating the operation")
err := ops.Validate(ctx, k8sClient, opsRes.Cluster, true)
Expect(err).To(HaveOccurred()) // Expect an error due to exceeding the maximum replica limit
})

It("should fail when scaling in below the min replicas", func() {
By("Setting up the component with a replica limit of 1 to 3")
opsRes, compDef, _ := initOperationsResources(compDefName, clusterName)
Expect(testapps.ChangeObj(&testCtx, compDef, func(compDef *appsv1.ComponentDefinition) {
compDef.Spec.ReplicasLimit = &appsv1.ReplicasLimit{
MinReplicas: 1,
MaxReplicas: 3,
}
})).Should(Succeed())

By("Creating a horizontal scaling operation to scale in below the limit")
ops := createHorizontalScaling(clusterName, opsv1alpha1.HorizontalScaling{
ComponentOps: opsv1alpha1.ComponentOps{
ComponentName: defaultCompName,
},
ScaleIn: &opsv1alpha1.ScaleIn{
OnlineInstancesToOffline: []string{clusterName + defaultCompName + "0", clusterName + defaultCompName + "1", clusterName + defaultCompName + "2"},
},
}, false)

ops.Namespace = testCtx.DefaultNamespace
initClusterAnnotationAndPhaseForOps(opsRes)

By("Validating the operation")
err := ops.Validate(ctx, k8sClient, opsRes.Cluster, true)
Expect(err).To(HaveOccurred()) // Expect an error due to scaling in below the minimum replica limit
})

It("should succeed when scaling within the replicas limit", func() {
By("Setting up the component with a replica limit of 1 to 3")
opsRes, compDef, _ := initOperationsResources(compDefName, clusterName)
Expect(testapps.ChangeObj(&testCtx, compDef, func(compDef *appsv1.ComponentDefinition) {
compDef.Spec.ReplicasLimit = &appsv1.ReplicasLimit{
MinReplicas: 1,
MaxReplicas: 4,
}
})).Should(Succeed())

By("Creating a horizontal scaling operation to scale within the limit")
ops := createHorizontalScaling(clusterName, opsv1alpha1.HorizontalScaling{
ComponentOps: opsv1alpha1.ComponentOps{
ComponentName: defaultCompName,
},
ScaleOut: &opsv1alpha1.ScaleOut{
OfflineInstancesToOnline: []string{clusterName + defaultCompName + "3"},
},
}, false)

ops.Namespace = testCtx.DefaultNamespace
initClusterAnnotationAndPhaseForOps(opsRes)

By("Validating the operation")
err := ops.Validate(ctx, k8sClient, opsRes.Cluster, true)
Expect(err).ToNot(HaveOccurred()) // Should pass as it's within the specified replica limit
})

It("should succeed when scaling within the replicas limit with shards", func() {
By("Setting up the sharding with a shard limit of 1 to 3")
opsRes, _, _ := initOperationsResources(compDefName, clusterName)
shardingDef := testapps.NewShardingDefinitionFactory(shardingDefName, compDefName).Create(&testCtx).GetObject()
// add a sharding component
Expect(testapps.ChangeObj(&testCtx, opsRes.Cluster, func(cluster *appsv1.Cluster) {
cluster.Spec.Shardings = []appsv1.ClusterSharding{
{
Name: defaultCompName,
Shards: int32(3),
Template: cluster.Spec.ComponentSpecs[0],
ShardingDef: shardingDefName,
},
}
cluster.Spec.ComponentSpecs = []appsv1.ClusterComponentSpec{}
})).Should(Succeed())

Expect(testapps.ChangeObj(&testCtx, shardingDef, func(shardingDef *appsv1.ShardingDefinition) {
shardingDef.Spec.ShardsLimit = &appsv1.ShardsLimit{
MinShards: 1,
MaxShards: 3,
}
})).Should(Succeed())

By("Testing horizontal scaling operation with different shard values")
shardValues := []int32{0, 2, 4} // Shard values to test
expectedResults := []bool{false, true, false} // Expected results for each shard value: fail, pass, fail

for i, shards := range shardValues {
ops := createHorizontalScaling(clusterName, opsv1alpha1.HorizontalScaling{
Shards: &shards,
ComponentOps: opsv1alpha1.ComponentOps{
ComponentName: defaultCompName,
},
}, false)

ops.Namespace = testCtx.DefaultNamespace
initClusterAnnotationAndPhaseForOps(opsRes)

By("Validating the operation")
err := ops.Validate(ctx, k8sClient, opsRes.Cluster, true)

// Check if the result matches the expected result
if expectedResults[i] {
Expect(err).ToNot(HaveOccurred()) // Should pass when shards is 2 (within limit)
} else {
Expect(err).To(HaveOccurred()) // Should fail when shards is 4 (beyond limit) and 0 (below limit)
}
}

})
})
})

Expand Down