diff --git a/apis/apps/v1/shardingdefinition_types.go b/apis/apps/v1/shardingdefinition_types.go index 756df6fd401..bae85bb03e3 100644 --- a/apis/apps/v1/shardingdefinition_types.go +++ b/apis/apps/v1/shardingdefinition_types.go @@ -150,7 +150,7 @@ type ShardsLimit struct { // +kubebuilder:validation:Required MinShards int32 `json:"minShards"` - // The maximum limit of replicas. + // The maximum limit of shards. // // +kubebuilder:validation:Required MaxShards int32 `json:"maxShards"` diff --git a/apis/operations/v1alpha1/opsrequest_validation.go b/apis/operations/v1alpha1/opsrequest_validation.go index dcc9d2b146f..3debdb503fe 100644 --- a/apis/operations/v1alpha1/opsrequest_validation.go +++ b/apis/operations/v1alpha1/opsrequest_validation.go @@ -323,7 +323,7 @@ func compareQuantity(requestQuantity, limitQuantity *resource.Quantity) bool { } // validateHorizontalScaling validates api when spec.type is HorizontalScaling -func (r *OpsRequest) validateHorizontalScaling(_ context.Context, _ client.Client, cluster *appsv1.Cluster) error { +func (r *OpsRequest) validateHorizontalScaling(ctx context.Context, cli client.Client, cluster *appsv1.Cluster) error { horizontalScalingList := r.Spec.HorizontalScalingList if len(horizontalScalingList) == 0 { return notEmptyError("spec.horizontalScaling") @@ -338,15 +338,41 @@ func (r *OpsRequest) validateHorizontalScaling(_ context.Context, _ client.Clien return err } for _, comSpec := range cluster.Spec.ComponentSpecs { + // Default values if no limit is found + minNum, maxNum := 0, 16384 + if comSpec.ComponentDef != "" { + compDef := &appsv1.ComponentDefinition{} + if err := cli.Get(ctx, client.ObjectKey{Name: comSpec.ComponentDef, Namespace: r.Namespace}, compDef); err != nil { + return err + } + if compDef != nil && compDef.Spec.ReplicasLimit != nil { + minNum = int(compDef.Spec.ReplicasLimit.MinReplicas) + maxNum = int(compDef.Spec.ReplicasLimit.MaxReplicas) + } + } if hScale, ok := hScaleMap[comSpec.Name]; ok { - if err := r.validateHorizontalScalingSpec(hScale, comSpec, cluster.Name, false); err != nil { + if err := r.validateHorizontalScalingSpec(hScale, comSpec, cluster.Name, false, maxNum, minNum); err != nil { return err } } } for _, spec := range cluster.Spec.Shardings { + // Default values if no limit is found + minNum, maxNum := 0, 2048 + if spec.ShardingDef != "" { + shardingDef := &appsv1.ShardingDefinition{} + if err := cli.Get(ctx, types.NamespacedName{Name: spec.ShardingDef, Namespace: r.Namespace}, shardingDef); err != nil { + return err + } + + if shardingDef != nil && shardingDef.Spec.ShardsLimit != nil { + minNum = int(shardingDef.Spec.ShardsLimit.MinShards) + maxNum = int(shardingDef.Spec.ShardsLimit.MaxShards) + } + } + if hScale, ok := hScaleMap[spec.Name]; ok { - if err := r.validateHorizontalScalingSpec(hScale, spec.Template, cluster.Name, true); err != nil { + if err := r.validateHorizontalScalingSpec(hScale, spec.Template, cluster.Name, true, maxNum, minNum); err != nil { return err } } @@ -364,7 +390,7 @@ func (r *OpsRequest) CountOfflineOrOnlineInstances(clusterName, componentName st return offlineOrOnlineInsCountMap } -func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, compSpec appsv1.ClusterComponentSpec, clusterName string, isSharding bool) error { +func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, compSpec appsv1.ClusterComponentSpec, clusterName string, isSharding bool, maxReplicasOrShards int, minReplicasOrShards int) error { scaleIn := hScale.ScaleIn scaleOut := hScale.ScaleOut if hScale.Shards != nil { @@ -374,6 +400,12 @@ func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, com if scaleOut != nil || scaleIn != nil { return fmt.Errorf(`shards field cannot be used together with scaleOut or scaleIn for the component "%s"`, hScale.ComponentName) } + if int(*hScale.Shards) < minReplicasOrShards { + return fmt.Errorf(`the number of shards after horizontal scale violates the shards limit "%s"`, hScale.ComponentName) + } + if int(*hScale.Shards) > maxReplicasOrShards { + return fmt.Errorf(`the number of shards after horizontal scale violates the shards limit "%s"`, hScale.ComponentName) + } return nil } if lastCompConfiguration, ok := r.Status.LastConfiguration.Components[hScale.ComponentName]; ok { @@ -438,6 +470,19 @@ func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, com if replicaChanger.ReplicaChanges != nil && allReplicaChanges > *replicaChanger.ReplicaChanges { return fmt.Errorf(`%s "replicaChanges" can't be less than the sum of "replicaChanges" for specified instance templates`, msgPrefix) } + + var replicaChange int32 + if replicaChanger.ReplicaChanges != nil { + replicaChange = *replicaChanger.ReplicaChanges + } else { + replicaChange = allReplicaChanges + } + if isScaleIn && int(compSpec.Replicas)-int(replicaChange) < minReplicasOrShards { + return fmt.Errorf(`the number of replicas after scaling down violates the replica limit for component "%s"`, hScale.ComponentName) + } + if !isScaleIn && int(compSpec.Replicas)+int(replicaChange) > maxReplicasOrShards { + return fmt.Errorf(`the number of replicas after scaling up violates the replica limit for component "%s"`, hScale.ComponentName) + } return nil } if scaleIn != nil { diff --git a/config/crd/bases/apps.kubeblocks.io_shardingdefinitions.yaml b/config/crd/bases/apps.kubeblocks.io_shardingdefinitions.yaml index a5ad8a526dd..0fa5e083463 100644 --- a/config/crd/bases/apps.kubeblocks.io_shardingdefinitions.yaml +++ b/config/crd/bases/apps.kubeblocks.io_shardingdefinitions.yaml @@ -1192,7 +1192,7 @@ spec: This field is immutable. properties: maxShards: - description: The maximum limit of replicas. + description: The maximum limit of shards. format: int32 type: integer minShards: diff --git a/deploy/helm/crds/apps.kubeblocks.io_shardingdefinitions.yaml b/deploy/helm/crds/apps.kubeblocks.io_shardingdefinitions.yaml index a5ad8a526dd..0fa5e083463 100644 --- a/deploy/helm/crds/apps.kubeblocks.io_shardingdefinitions.yaml +++ b/deploy/helm/crds/apps.kubeblocks.io_shardingdefinitions.yaml @@ -1192,7 +1192,7 @@ spec: This field is immutable. properties: maxShards: - description: The maximum limit of replicas. + description: The maximum limit of shards. format: int32 type: integer minShards: diff --git a/pkg/operations/horizontal_scaling_test.go b/pkg/operations/horizontal_scaling_test.go index ee9d4120773..b827a2b9259 100644 --- a/pkg/operations/horizontal_scaling_test.go +++ b/pkg/operations/horizontal_scaling_test.go @@ -46,10 +46,11 @@ import ( var _ = Describe("HorizontalScaling OpsRequest", func() { var ( - randomStr = testCtx.GetRandomStr() - compDefName = "test-compdef-" + randomStr - clusterName = "test-cluster-" + randomStr - insTplName = "foo" + randomStr = testCtx.GetRandomStr() + compDefName = "test-compdef-" + randomStr + shardingDefName = "test-shardingdef-" + randomStr + clusterName = "test-cluster-" + randomStr + insTplName = "foo" ) cleanEnv := func() { @@ -476,6 +477,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { By("expect for opsRequest phase is Creating after doing action") _, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes) Expect(err).ShouldNot(HaveOccurred()) + fmt.Printf("%v", opsRes.OpsRequest.Status) Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsCreatingPhase)) By("do Action") @@ -733,6 +735,141 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { g.Expect(pobj.Status.Phase).Should(Equal(opsv1alpha1.OpsSucceedPhase)) })).Should(Succeed()) }) + It("should fail when scaling out beyond the max replicas", func() { + By("Setting up the component with a replica limit of 1 to 3") + opsRes, compDef, _ := initOperationsResources(compDefName, clusterName) + Expect(testapps.ChangeObj(&testCtx, compDef, func(compDef *appsv1.ComponentDefinition) { + compDef.Spec.ReplicasLimit = &appsv1.ReplicasLimit{ + MinReplicas: 1, + MaxReplicas: 3, + } + })).Should(Succeed()) + + By("Creating a horizontal scaling operation to scale out beyond the limit") + ops := createHorizontalScaling(clusterName, opsv1alpha1.HorizontalScaling{ + ComponentOps: opsv1alpha1.ComponentOps{ + ComponentName: defaultCompName, + }, + ScaleOut: &opsv1alpha1.ScaleOut{ + OfflineInstancesToOnline: []string{clusterName + defaultCompName + "3", clusterName + defaultCompName + "4"}, + }, + }, false) + + ops.Namespace = testCtx.DefaultNamespace + initClusterAnnotationAndPhaseForOps(opsRes) + + By("Validating the operation") + err := ops.Validate(ctx, k8sClient, opsRes.Cluster, true) + Expect(err).To(HaveOccurred()) // Expect an error due to exceeding the maximum replica limit + }) + + It("should fail when scaling in below the min replicas", func() { + By("Setting up the component with a replica limit of 1 to 3") + opsRes, compDef, _ := initOperationsResources(compDefName, clusterName) + Expect(testapps.ChangeObj(&testCtx, compDef, func(compDef *appsv1.ComponentDefinition) { + compDef.Spec.ReplicasLimit = &appsv1.ReplicasLimit{ + MinReplicas: 1, + MaxReplicas: 3, + } + })).Should(Succeed()) + + By("Creating a horizontal scaling operation to scale in below the limit") + ops := createHorizontalScaling(clusterName, opsv1alpha1.HorizontalScaling{ + ComponentOps: opsv1alpha1.ComponentOps{ + ComponentName: defaultCompName, + }, + ScaleIn: &opsv1alpha1.ScaleIn{ + OnlineInstancesToOffline: []string{clusterName + defaultCompName + "0", clusterName + defaultCompName + "1", clusterName + defaultCompName + "2"}, + }, + }, false) + + ops.Namespace = testCtx.DefaultNamespace + initClusterAnnotationAndPhaseForOps(opsRes) + + By("Validating the operation") + err := ops.Validate(ctx, k8sClient, opsRes.Cluster, true) + Expect(err).To(HaveOccurred()) // Expect an error due to scaling in below the minimum replica limit + }) + + It("should succeed when scaling within the replicas limit", func() { + By("Setting up the component with a replica limit of 1 to 3") + opsRes, compDef, _ := initOperationsResources(compDefName, clusterName) + Expect(testapps.ChangeObj(&testCtx, compDef, func(compDef *appsv1.ComponentDefinition) { + compDef.Spec.ReplicasLimit = &appsv1.ReplicasLimit{ + MinReplicas: 1, + MaxReplicas: 4, + } + })).Should(Succeed()) + + By("Creating a horizontal scaling operation to scale within the limit") + ops := createHorizontalScaling(clusterName, opsv1alpha1.HorizontalScaling{ + ComponentOps: opsv1alpha1.ComponentOps{ + ComponentName: defaultCompName, + }, + ScaleOut: &opsv1alpha1.ScaleOut{ + OfflineInstancesToOnline: []string{clusterName + defaultCompName + "3"}, + }, + }, false) + + ops.Namespace = testCtx.DefaultNamespace + initClusterAnnotationAndPhaseForOps(opsRes) + + By("Validating the operation") + err := ops.Validate(ctx, k8sClient, opsRes.Cluster, true) + Expect(err).ToNot(HaveOccurred()) // Should pass as it's within the specified replica limit + }) + + It("should succeed when scaling within the replicas limit with shards", func() { + By("Setting up the sharding with a shard limit of 1 to 3") + opsRes, _, _ := initOperationsResources(compDefName, clusterName) + shardingDef := testapps.NewShardingDefinitionFactory(shardingDefName, compDefName).Create(&testCtx).GetObject() + // add a sharding component + Expect(testapps.ChangeObj(&testCtx, opsRes.Cluster, func(cluster *appsv1.Cluster) { + cluster.Spec.Shardings = []appsv1.ClusterSharding{ + { + Name: defaultCompName, + Shards: int32(3), + Template: cluster.Spec.ComponentSpecs[0], + ShardingDef: shardingDefName, + }, + } + cluster.Spec.ComponentSpecs = []appsv1.ClusterComponentSpec{} + })).Should(Succeed()) + + Expect(testapps.ChangeObj(&testCtx, shardingDef, func(shardingDef *appsv1.ShardingDefinition) { + shardingDef.Spec.ShardsLimit = &appsv1.ShardsLimit{ + MinShards: 1, + MaxShards: 3, + } + })).Should(Succeed()) + + By("Testing horizontal scaling operation with different shard values") + shardValues := []int32{0, 2, 4} // Shard values to test + expectedResults := []bool{false, true, false} // Expected results for each shard value: fail, pass, fail + + for i, shards := range shardValues { + ops := createHorizontalScaling(clusterName, opsv1alpha1.HorizontalScaling{ + Shards: &shards, + ComponentOps: opsv1alpha1.ComponentOps{ + ComponentName: defaultCompName, + }, + }, false) + + ops.Namespace = testCtx.DefaultNamespace + initClusterAnnotationAndPhaseForOps(opsRes) + + By("Validating the operation") + err := ops.Validate(ctx, k8sClient, opsRes.Cluster, true) + + // Check if the result matches the expected result + if expectedResults[i] { + Expect(err).ToNot(HaveOccurred()) // Should pass when shards is 2 (within limit) + } else { + Expect(err).To(HaveOccurred()) // Should fail when shards is 4 (beyond limit) and 0 (below limit) + } + } + + }) }) })