From da2b678c28159f8dfb91f02440ec9ba66e52b9f0 Mon Sep 17 00:00:00 2001 From: ian-hui Date: Mon, 2 Sep 2024 12:46:12 +0000 Subject: [PATCH] feat: support ignore strict horizontal scaling validation --- .../v1alpha1/opsrequest_validation.go | 16 +- pkg/constant/ops.go | 1 + pkg/controller/instanceset/object_builder.go | 1 - pkg/operations/horizontal_scaling.go | 170 ++++++++++++++---- pkg/operations/horizontal_scaling_test.go | 146 ++++++++++++--- pkg/operations/ops_progress_util_test.go | 4 +- pkg/operations/ops_util.go | 11 ++ pkg/operations/ops_util_test.go | 12 +- 8 files changed, 292 insertions(+), 69 deletions(-) diff --git a/apis/operations/v1alpha1/opsrequest_validation.go b/apis/operations/v1alpha1/opsrequest_validation.go index ee3eaada1bb..2b09b2b231e 100644 --- a/apis/operations/v1alpha1/opsrequest_validation.go +++ b/apis/operations/v1alpha1/opsrequest_validation.go @@ -443,12 +443,16 @@ func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, com if err := validateHScaleOperation(scaleOut.ReplicaChanger, scaleOut.NewInstances, scaleOut.OfflineInstancesToOnline, false); err != nil { return err } - if len(scaleOut.OfflineInstancesToOnline) > 0 { - offlineInstanceSet := sets.New(compSpec.OfflineInstances...) - for _, offlineInsName := range scaleOut.OfflineInstancesToOnline { - if _, ok := offlineInstanceSet[offlineInsName]; !ok { - return fmt.Errorf(`cannot find the offline instance "%s" in component "%s" for scaleOut operation`, offlineInsName, hScale.ComponentName) - } + } + // instance cannot be both in OfflineInstancesToOnline and OnlineInstancesToOffline + if scaleIn != nil && scaleOut != nil { + offlineToOnlineSet := make(map[string]struct{}) + for _, instance := range scaleIn.OnlineInstancesToOffline { + offlineToOnlineSet[instance] = struct{}{} + } + for _, instance := range scaleOut.OfflineInstancesToOnline { + if _, exists := offlineToOnlineSet[instance]; exists { + return fmt.Errorf(`instance "%s" cannot be both in "OfflineInstancesToOnline" and "OnlineInstancesToOffline"`, instance) } } } diff --git a/pkg/constant/ops.go b/pkg/constant/ops.go index 91621957c8c..da1a62d8cac 100644 --- a/pkg/constant/ops.go +++ b/pkg/constant/ops.go @@ -32,4 +32,5 @@ const ( DisableHAAnnotationKey = "operations.kubeblocks.io/disable-ha" RelatedOpsAnnotationKey = "operations.kubeblocks.io/related-ops" OpsDependentOnSuccessfulOpsAnnoKey = "operations.kubeblocks.io/dependent-on-successful-ops" // OpsDependentOnSuccessfulOpsAnnoKey wait for the dependent ops to succeed before executing the current ops. If it fails, this ops will also fail. + IgnoreHscaleValidateAnnoKey = "apps.kubeblocks.io/ignore-strict-horizontal-scale-validation" ) diff --git a/pkg/controller/instanceset/object_builder.go b/pkg/controller/instanceset/object_builder.go index e5c0b93a44e..6e49fee6ca4 100644 --- a/pkg/controller/instanceset/object_builder.go +++ b/pkg/controller/instanceset/object_builder.go @@ -26,7 +26,6 @@ import ( "strings" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" workloads "github.com/apecloud/kubeblocks/apis/workloads/v1" diff --git a/pkg/operations/horizontal_scaling.go b/pkg/operations/horizontal_scaling.go index 86ca7199905..b28e2d5e04f 100644 --- a/pkg/operations/horizontal_scaling.go +++ b/pkg/operations/horizontal_scaling.go @@ -22,8 +22,11 @@ package operations import ( "fmt" "slices" + "strings" "time" + appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/utils/pointer" @@ -97,18 +100,9 @@ func (hs horizontalScalingOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli if err := compOpsSet.updateClusterComponentsAndShardings(opsRes.Cluster, func(compSpec *appsv1.ClusterComponentSpec, obj ComponentOpsInterface) error { horizontalScaling := obj.(opsv1alpha1.HorizontalScaling) lastCompConfiguration := opsRes.OpsRequest.Status.LastConfiguration.Components[obj.GetComponentName()] - if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 { - // check if the instances are online. - currPodSet, err := intctrlcomp.GenerateAllPodNamesToSet(*lastCompConfiguration.Replicas, lastCompConfiguration.Instances, lastCompConfiguration.OfflineInstances, - opsRes.Cluster.Name, obj.GetComponentName()) - if err != nil { - return err - } - for _, onlineIns := range horizontalScaling.ScaleIn.OnlineInstancesToOffline { - if _, ok := currPodSet[onlineIns]; !ok { - return intctrlutil.NewFatalError(fmt.Sprintf(`instance "%s" specified in onlineInstancesToOffline is not online`, onlineIns)) - } - } + + if err := hs.validateHorizontalScaling(opsRes, lastCompConfiguration, obj); err != nil { + return err } replicas, instances, offlineInstances, err := hs.getExpectedCompValues(opsRes, compSpec.DeepCopy(), lastCompConfiguration, horizontalScaling) @@ -205,6 +199,16 @@ func (hs horizontalScalingOpsHandler) getCreateAndDeletePodSet(opsRes *OpsResour deletePodSet[k] = appsv1.GetInstanceTemplateName(clusterName, fullCompName, k) } } + if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 { + for _, v := range horizontalScaling.ScaleIn.OnlineInstancesToOffline { + deletePodSet[v] = appsv1alpha1.GetInstanceTemplateName(clusterName, fullCompName, v) + } + } + if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 { + for _, v := range horizontalScaling.ScaleOut.OfflineInstancesToOnline { + createPodSet[v] = appsv1alpha1.GetInstanceTemplateName(clusterName, fullCompName, v) + } + } if opsRes.OpsRequest.Status.Phase == opsv1alpha1.OpsCancellingPhase { // when cancelling this opsRequest, revert the changes. return deletePodSet, createPodSet, nil @@ -294,16 +298,56 @@ func (hs horizontalScalingOpsHandler) getExpectedCompValues( compReplicas := *lastCompConfiguration.Replicas compInstanceTpls := slices.Clone(lastCompConfiguration.Instances) compOfflineInstances := lastCompConfiguration.OfflineInstances - expectOfflineInstances := hs.getCompExpectedOfflineInstances(compOfflineInstances, horizontalScaling) - err := hs.autoSyncReplicaChanges(opsRes, horizontalScaling, compReplicas, compInstanceTpls, expectOfflineInstances) + filteredHorizontal, err := filterHorizontalScalingSpec(opsRes, compReplicas, compInstanceTpls, compOfflineInstances, horizontalScaling.DeepCopy()) + if err != nil { + return 0, nil, nil, err + } + expectOfflineInstances := hs.getCompExpectedOfflineInstances(compOfflineInstances, *filteredHorizontal) + err = hs.autoSyncReplicaChanges(opsRes, *filteredHorizontal, compReplicas, compInstanceTpls, expectOfflineInstances) if err != nil { return 0, nil, nil, err } - return hs.getCompExpectReplicas(horizontalScaling, compReplicas), - hs.getCompExpectedInstances(compInstanceTpls, horizontalScaling), + return hs.getCompExpectReplicas(*filteredHorizontal, compReplicas), + hs.getCompExpectedInstances(compInstanceTpls, *filteredHorizontal), expectOfflineInstances, nil } +// only offlined instances could be taken online. +// and only onlined instances could be taken offline. +func filterHorizontalScalingSpec( + opsRes *OpsResource, + compReplicas int32, + compInstanceTpls []appsv1.InstanceTemplate, + compOfflineInstances []string, + horizontalScaling *opsv1alpha1.HorizontalScaling) (*opsv1alpha1.HorizontalScaling, error) { + offlineInstances := sets.New(compOfflineInstances...) + podSet, err := intctrlcomp.GenerateAllPodNamesToSet(compReplicas, compInstanceTpls, compOfflineInstances, + opsRes.Cluster.Name, horizontalScaling.ComponentName) + if err != nil { + return nil, err + } + if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 { + onlinedInstanceFromOps := sets.Set[string]{} + for _, insName := range horizontalScaling.ScaleIn.OnlineInstancesToOffline { + if _, ok := podSet[insName]; ok { + onlinedInstanceFromOps.Insert(insName) + } + } + horizontalScaling.ScaleIn.OnlineInstancesToOffline = onlinedInstanceFromOps.UnsortedList() + } + if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 { + offlinedInstanceFromOps := sets.Set[string]{} + for _, insName := range horizontalScaling.ScaleOut.OfflineInstancesToOnline { + if _, ok := offlineInstances[insName]; ok { + offlinedInstanceFromOps.Insert(insName) + } + } + horizontalScaling.ScaleOut.OfflineInstancesToOnline = offlinedInstanceFromOps.UnsortedList() + } + return horizontalScaling, nil + +} + // autoSyncReplicaChanges auto-sync the replicaChanges of the component and instance templates. func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges( opsRes *OpsResource, @@ -339,6 +383,7 @@ func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges( } return replicaChanger.Instances, &allReplicaChanges } + // auto sync the replicaChanges. scaleIn := horizontalScaling.ScaleIn if scaleIn != nil { @@ -347,21 +392,7 @@ func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges( } scaleOut := horizontalScaling.ScaleOut if scaleOut != nil { - // get the pod set when removing the specified instances from offlineInstances slice - podSet, err := intctrlcomp.GenerateAllPodNamesToSet(compReplicas, compInstanceTpls, compExpectOfflineInstances, - opsRes.Cluster.Name, horizontalScaling.ComponentName) - if err != nil { - return err - } - onlineInsCountMap := map[string]int32{} - for _, insName := range scaleOut.OfflineInstancesToOnline { - if _, ok := podSet[insName]; !ok { - // if the specified instance will not be created, continue - continue - } - insTplName := appsv1.GetInstanceTemplateName(opsRes.Cluster.Name, horizontalScaling.ComponentName, insName) - onlineInsCountMap[insTplName]++ - } + onlineInsCountMap := opsRes.OpsRequest.CountOfflineOrOnlineInstances(opsRes.Cluster.Name, horizontalScaling.ComponentName, scaleOut.OfflineInstancesToOnline) scaleOut.Instances, scaleOut.ReplicaChanges = getSyncedInstancesAndReplicaChanges(onlineInsCountMap, scaleOut.ReplicaChanger, scaleOut.NewInstances) } return nil @@ -433,3 +464,80 @@ func (hs horizontalScalingOpsHandler) getCompExpectedOfflineInstances( } return compOfflineInstances } + +// validate if there is any instance specified in the request that is not exist, return error. +// if ignoreStrictHorizontalValidation is empty, it would validate the instances if they are already offlined or onlined. +func (hs horizontalScalingOpsHandler) validateHorizontalScaling( + opsRes *OpsResource, + lastCompConfiguration opsv1alpha1.LastComponentConfiguration, + obj ComponentOpsInterface, +) error { + horizontalScaling := obj.(opsv1alpha1.HorizontalScaling) + currPodSet, err := intctrlcomp.GenerateAllPodNamesToSet(*lastCompConfiguration.Replicas, lastCompConfiguration.Instances, lastCompConfiguration.OfflineInstances, + opsRes.Cluster.Name, obj.GetComponentName()) + if err != nil { + return err + } + offlineInstances := sets.New(lastCompConfiguration.OfflineInstances...) + + notExistInstanceList := sets.Set[string]{} + if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 { + onlinedInstances, _, notExistInstances := hs.collectOnlineAndOfflineAndNotExistInstances( + horizontalScaling.ScaleIn.OnlineInstancesToOffline, + offlineInstances, + currPodSet) + notExistInstanceList = notExistInstanceList.Union(notExistInstances) + if value, exist := opsRes.OpsRequest.Annotations[constant.IgnoreHscaleValidateAnnoKey]; !exist || value == "false" { + if onlinedInstances.Len() != len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) { + unscalablePods := getMissingElementsInSetFromList(onlinedInstances, horizontalScaling.ScaleIn.OnlineInstancesToOffline) + if unscalablePods == nil { + return intctrlutil.NewFatalError("instances specified in onlineInstancesToOffline has duplicates") + } + return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in onlineInstancesToOffline is not online or not exist`, strings.Join(unscalablePods, ", "))) + } + } + } + if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 { + _, offlinedInstances, notExistInstances := hs.collectOnlineAndOfflineAndNotExistInstances( + horizontalScaling.ScaleOut.OfflineInstancesToOnline, + offlineInstances, + currPodSet) + notExistInstanceList = notExistInstanceList.Union(notExistInstances) + if value, exist := opsRes.OpsRequest.Annotations[constant.IgnoreHscaleValidateAnnoKey]; !exist || value == "false" { + if offlinedInstances.Len() != len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) { + unscalablePods := getMissingElementsInSetFromList(offlinedInstances, horizontalScaling.ScaleOut.OfflineInstancesToOnline) + if unscalablePods == nil { + return intctrlutil.NewFatalError("instances specified in onlineInstancesToOffline has duplicates") + } + return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in offlineInstancesToOnline is not offline or not exist`, strings.Join(unscalablePods, ", "))) + } + } + } + if notExistInstanceList.Len() > 0 { + return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in the request is not exist`, strings.Join(notExistInstanceList.UnsortedList(), ", "))) + } + return nil +} + +// collect the online and offline instances specified in the request. +func (hs horizontalScalingOpsHandler) collectOnlineAndOfflineAndNotExistInstances( + instance []string, + offlineInstances sets.Set[string], + currPodSet map[string]string) (sets.Set[string], sets.Set[string], sets.Set[string]) { + + offlinedInstanceFromOps := sets.Set[string]{} + onlinedInstanceFromOps := sets.Set[string]{} + notExistInstanceFromOps := sets.Set[string]{} + for _, insName := range instance { + if _, ok := offlineInstances[insName]; ok { + offlinedInstanceFromOps.Insert(insName) + continue + } + if _, ok := currPodSet[insName]; ok { + onlinedInstanceFromOps.Insert(insName) + continue + } + notExistInstanceFromOps.Insert(insName) + } + return onlinedInstanceFromOps, offlinedInstanceFromOps, notExistInstanceFromOps +} diff --git a/pkg/operations/horizontal_scaling_test.go b/pkg/operations/horizontal_scaling_test.go index 4698c9fc386..fd20e78fee2 100644 --- a/pkg/operations/horizontal_scaling_test.go +++ b/pkg/operations/horizontal_scaling_test.go @@ -21,6 +21,7 @@ package operations import ( "fmt" + "strings" "time" . "github.com/onsi/ginkgo/v2" @@ -86,7 +87,8 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { Context("Test OpsRequest", func() { commonHScaleConsensusCompTest := func(reqCtx intctrlutil.RequestCtx, changeClusterSpec func(cluster *appsv1.Cluster), - horizontalScaling opsv1alpha1.HorizontalScaling) (*OpsResource, []*corev1.Pod) { + horizontalScaling opsv1alpha1.HorizontalScaling, + ignoreHscaleStrictValidate bool) (*OpsResource, []*corev1.Pod) { By("init operations resources with CLusterDefinition/Hybrid components Cluster/consensus Pods") opsRes, _, _ := initOperationsResources(compDefName, clusterName) its := testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName) @@ -99,7 +101,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { By("create opsRequest for horizontal scaling of consensus component") initClusterAnnotationAndPhaseForOps(opsRes) horizontalScaling.ComponentName = defaultCompName - opsRes.OpsRequest = createHorizontalScaling(clusterName, horizontalScaling) + opsRes.OpsRequest = createHorizontalScaling(clusterName, horizontalScaling, ignoreHscaleStrictValidate) // set ops phase to Pending opsRes.OpsRequest.Status.Phase = opsv1alpha1.OpsPendingPhase mockComponentIsOperating(opsRes.Cluster, appsv1.UpdatingComponentPhase, defaultCompName) @@ -190,7 +192,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { horizontalScaling opsv1alpha1.HorizontalScaling, mockHScale func(podList []*corev1.Pod)) { reqCtx := intctrlutil.RequestCtx{Ctx: testCtx.Ctx} - opsRes, podList := commonHScaleConsensusCompTest(reqCtx, changeClusterSpec, horizontalScaling) + opsRes, podList := commonHScaleConsensusCompTest(reqCtx, changeClusterSpec, horizontalScaling, false) mockHScale(podList) testapps.MockInstanceSetStatus(testCtx, opsRes.Cluster, defaultCompName) checkOpsRequestPhaseIsSucceed(reqCtx, opsRes) @@ -200,7 +202,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { horizontalScaling opsv1alpha1.HorizontalScaling, isScaleDown bool) { reqCtx := intctrlutil.RequestCtx{Ctx: testCtx.Ctx} - opsRes, podList := commonHScaleConsensusCompTest(reqCtx, nil, horizontalScaling) + opsRes, podList := commonHScaleConsensusCompTest(reqCtx, nil, horizontalScaling, false) var pod *corev1.Pod if isScaleDown { By("delete the pod") @@ -289,9 +291,10 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { testHScaleWithSpecifiedPod := func(changeClusterSpec func(cluster *appsv1.Cluster), horizontalScaling opsv1alpha1.HorizontalScaling, expectOfflineInstances []string, - mockHScale func(podList []*corev1.Pod)) *OpsResource { + mockHScale func(podList []*corev1.Pod), + ignoreHscalingStrictValidate bool) *OpsResource { reqCtx := intctrlutil.RequestCtx{Ctx: testCtx.Ctx} - opsRes, podList := commonHScaleConsensusCompTest(reqCtx, changeClusterSpec, horizontalScaling) + opsRes, podList := commonHScaleConsensusCompTest(reqCtx, changeClusterSpec, horizontalScaling, ignoreHscalingStrictValidate) By("verify cluster spec is correct") targetSpec := opsRes.Cluster.Spec.GetComponentByName(defaultCompName) Expect(targetSpec.OfflineInstances).Should(HaveLen(len(expectOfflineInstances))) @@ -323,7 +326,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { }, offlineInstances, func(podList []*corev1.Pod) { By(fmt.Sprintf(`delete the specified pod "%s"`, toDeletePodName)) deletePods(podList[2]) - }) + }, false) Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("1/1")) }) @@ -347,7 +350,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { deletePods(podList[2]) By("create a new pod(ordinal:2) by replicas") createPods("", 2) - }) + }, false) Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("2/2")) }) @@ -365,7 +368,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { }, offlineInstances, func(podList []*corev1.Pod) { By("delete the specified pod " + offlineInstanceName) deletePods(podList[0]) - }) + }, false) Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("1/1")) By("expect replicas to 2 and template " + insTplName + " replicas to 0") compSpec := opsRes.Cluster.Spec.GetComponentByName(defaultCompName) @@ -387,7 +390,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { }, []string{}, func(podList []*corev1.Pod) { By("create the specified pod " + offlineInstanceName) testapps.MockInstanceSetPod(&testCtx, nil, clusterName, defaultCompName, offlineInstanceName, "follower", "Readonly") - }) + }, false) Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("1/1")) By("expect replicas to 4") compSpec := opsRes.Cluster.Spec.GetComponentByName(defaultCompName) @@ -415,7 +418,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { By(fmt.Sprintf(`create the pod "%s" which is removed from offlineInstances`, onlinePodName)) createPods("", 1) - }) + }, false) Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("2/2")) By("expect replicas to 3") Expect(opsRes.Cluster.Spec.GetComponentByName(defaultCompName).Replicas).Should(BeEquivalentTo(3)) @@ -439,7 +442,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { ScaleIn: &opsv1alpha1.ScaleIn{ ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(3)}, }, - }) + }, false) By("verify cluster spec is correct") var targetSpec *appsv1.ClusterComponentSpec for i := range opsRes.Cluster.Spec.ComponentSpecs { @@ -461,9 +464,9 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { testapps.MockInstanceSetStatus(testCtx, opsRes.Cluster, defaultCompName) checkOpsRequestPhaseIsSucceed(reqCtx, opsRes) }) - createOpsAndToCreatingPhase := func(reqCtx intctrlutil.RequestCtx, opsRes *OpsResource, horizontalScaling opsv1alpha1.HorizontalScaling) *opsv1alpha1.OpsRequest { + createOpsAndToCreatingPhase := func(reqCtx intctrlutil.RequestCtx, opsRes *OpsResource, horizontalScaling opsv1alpha1.HorizontalScaling, ignoreHscalingStrictValidate bool) *opsv1alpha1.OpsRequest { horizontalScaling.ComponentName = defaultCompName - opsRes.OpsRequest = createHorizontalScaling(clusterName, horizontalScaling) + opsRes.OpsRequest = createHorizontalScaling(clusterName, horizontalScaling, ignoreHscalingStrictValidate) opsRes.OpsRequest.Spec.Force = true // set ops phase to Pending opsRes.OpsRequest.Status.Phase = opsv1alpha1.OpsPendingPhase @@ -480,24 +483,113 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { return opsRes.OpsRequest } - It("test offline the specified pod but it is not online", func() { + It("test offline the specified pod but it is not online with the ignore policy", func() { By("init operations resources with CLusterDefinition/Hybrid components Cluster/consensus Pods") opsRes, _, _ := initOperationsResources(compDefName, clusterName) testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName) reqCtx := intctrlutil.RequestCtx{Ctx: ctx} - By("offline the specified pod but it is not online") + By("offline the specified pod but it is not exist, expect replicas not be changed") + offlineInsName := fmt.Sprintf("%s-%s-4", clusterName, defaultCompName) + _ = createOpsAndToCreatingPhase(reqCtx, opsRes, opsv1alpha1.HorizontalScaling{ + ScaleIn: &opsv1alpha1.ScaleIn{ + OnlineInstancesToOffline: []string{offlineInsName}, + }, + }, true) + By("expect replicas not be changed") + Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsFailedPhase), fmt.Sprintf("info: %v", opsRes.OpsRequest)) + }) + + It("test offline the specified pod but it is not exist", func() { + By("init operations resources with CLusterDefinition/ClusterVersion/Hybrid components Cluster/consensus Pods") + opsRes, _, _ := initOperationsResources(compDefName, clusterName) + testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName) + reqCtx := intctrlutil.RequestCtx{Ctx: ctx} + + By("offline the specified pod but it is not exist") offlineInsName := fmt.Sprintf("%s-%s-4", clusterName, defaultCompName) _ = createOpsAndToCreatingPhase(reqCtx, opsRes, opsv1alpha1.HorizontalScaling{ ScaleIn: &opsv1alpha1.ScaleIn{ ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)}, OnlineInstancesToOffline: []string{offlineInsName}, }, - }) + }, true) Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsFailedPhase)) conditions := opsRes.OpsRequest.Status.Conditions + unscalablePods := []string{offlineInsName} Expect(conditions[len(conditions)-1].Message).Should(ContainSubstring( - fmt.Sprintf(`instance "%s" specified in onlineInstancesToOffline is not online`, offlineInsName))) + fmt.Sprintf(`instances "%s" specified in the request is not exist`, strings.Join(unscalablePods, ", ")))) + }) + + It("test offline two specified pods with same pod name with ignore policy", func() { + By("init operations resources with CLusterDefinition/ClusterVersion/Hybrid components Cluster/consensus Pods") + opsRes, _, _ := initOperationsResources(compDefName, clusterName) + testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName) + reqCtx := intctrlutil.RequestCtx{Ctx: ctx} + testPodName := fmt.Sprintf("%s-%s-1", clusterName, defaultCompName) + + By("offline two pod with same pod name") + _ = createOpsAndToCreatingPhase(reqCtx, opsRes, opsv1alpha1.HorizontalScaling{ + ScaleIn: &opsv1alpha1.ScaleIn{ + OnlineInstancesToOffline: []string{testPodName, testPodName}, + }, + }, true) + Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsCreatingPhase)) + Expect(opsRes.Cluster.Spec.GetComponentByName(defaultCompName).Replicas).Should(BeEquivalentTo(2)) + // expect the not exist pod still in opsRequest + onlineToOfflineInstances := opsRes.OpsRequest.Spec.HorizontalScalingList[0].ScaleIn.OnlineInstancesToOffline + Expect(onlineToOfflineInstances).Should(Equal([]string{testPodName, testPodName}), fmt.Sprintf("info: %v", opsRes.OpsRequest)) + // expect for opsRequest phase is Succeed after pods has been scaled and component phase is Running + checkOpsRequestPhaseIsSucceed(reqCtx, opsRes) + }) + + It("test online two specified pods with same pod name with ignore policy", func() { + By("init operations resources with CLusterDefinition/ClusterVersion/Hybrid components Cluster/consensus Pods") + By("init operations resources with CLusterDefinition/ClusterVersion/Hybrid components Cluster/consensus Pods") + opsRes, _, _ := initOperationsResources(compDefName, clusterName) + testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName) + reqCtx := intctrlutil.RequestCtx{Ctx: ctx} + testPodName := fmt.Sprintf("%s-%s-1", clusterName, defaultCompName) + + By("offline two pod with same pod name") + _ = createOpsAndToCreatingPhase(reqCtx, opsRes, opsv1alpha1.HorizontalScaling{ + ScaleIn: &opsv1alpha1.ScaleIn{ + OnlineInstancesToOffline: []string{testPodName, testPodName}, + }, + }, true) + Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsCreatingPhase)) + Expect(opsRes.Cluster.Spec.GetComponentByName(defaultCompName).Replicas).Should(BeEquivalentTo(2)) + By("expect the not exist pod still in opsRequest") + onlineToOfflineInstances := opsRes.OpsRequest.Spec.HorizontalScalingList[0].ScaleIn.OnlineInstancesToOffline + Expect(onlineToOfflineInstances).Should(Equal([]string{testPodName, testPodName}), fmt.Sprintf("info: %v", opsRes.OpsRequest)) + By("expect for opsRequest phase is Succeed after pods has been scaled and component phase is Running") + checkOpsRequestPhaseIsSucceed(reqCtx, opsRes) + Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("1/1"), fmt.Sprintf("info: %v", opsRes.OpsRequest)) + + }) + + It("test offline and online two pods in the same time with the ignore policy", func() { + onlinePodName := fmt.Sprintf("%s-%s-1", clusterName, defaultCompName) + offlinePodName := fmt.Sprintf("%s-%s-%s-0", clusterName, defaultCompName, insTplName) + opsRes := testHScaleWithSpecifiedPod(func(cluster *appsv1.Cluster) { + setClusterCompSpec(cluster, []appsv1.InstanceTemplate{ + {Name: insTplName, Replicas: pointer.Int32(1)}, + }, []string{onlinePodName}) + }, opsv1alpha1.HorizontalScaling{ + ScaleIn: &opsv1alpha1.ScaleIn{ + OnlineInstancesToOffline: []string{offlinePodName}, + }, + ScaleOut: &opsv1alpha1.ScaleOut{ + OfflineInstancesToOnline: []string{onlinePodName}, + }, + }, []string{offlinePodName}, func(podList []*corev1.Pod) { + By(fmt.Sprintf(`delete the specified pod"%s"`, offlinePodName)) + deletePods(podList[0]) + + By(fmt.Sprintf(`create the pod "%s" which is removed from offlineInstances`, onlinePodName)) + createPods("", 1) + }, true) + Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("2/2")) }) It("test run multi horizontalScaling opsRequest with force flag", func() { @@ -508,13 +600,13 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { By("create first opsRequest to add 1 replicas with `scaleOut` field and expect replicas to 4") createOpsAndToCreatingPhase(reqCtx, opsRes, opsv1alpha1.HorizontalScaling{ ScaleOut: &opsv1alpha1.ScaleOut{ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)}}, - }) + }, false) Expect(opsRes.Cluster.Spec.GetComponentByName(defaultCompName).Replicas).Should(BeEquivalentTo(4)) By("create secondary opsRequest to add 1 replicas with `replicasToAdd` field and expect replicas to 5") createOpsAndToCreatingPhase(reqCtx, opsRes, opsv1alpha1.HorizontalScaling{ ScaleOut: &opsv1alpha1.ScaleOut{ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)}}, - }) + }, false) Expect(opsRes.Cluster.Spec.GetComponentByName(defaultCompName).Replicas).Should(BeEquivalentTo(5)) By("create third opsRequest to offline a pod which is created by another running opsRequest and expect it to fail") @@ -524,7 +616,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)}, OnlineInstancesToOffline: []string{offlineInsName}, }, - }) + }, false) Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsFailedPhase)) conditions := opsRes.OpsRequest.Status.Conditions Expect(conditions[len(conditions)-1].Message).Should(ContainSubstring(fmt.Sprintf(`instance "%s" cannot be taken offline as it has been created by another running opsRequest`, offlineInsName))) @@ -532,7 +624,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { By("create a opsRequest to delete 1 replicas which is created by another running opsRequest and expect it to fail") _ = createOpsAndToCreatingPhase(reqCtx, opsRes, opsv1alpha1.HorizontalScaling{ ScaleIn: &opsv1alpha1.ScaleIn{ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)}}, - }) + }, false) Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsFailedPhase)) conditions = opsRes.OpsRequest.Status.Conditions Expect(conditions[len(conditions)-1].Message).Should(ContainSubstring(`cannot be taken offline as it has been created by another running opsRequest`)) @@ -540,13 +632,21 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { }) }) -func createHorizontalScaling(clusterName string, horizontalScaling opsv1alpha1.HorizontalScaling) *opsv1alpha1.OpsRequest { +func createHorizontalScaling(clusterName string, horizontalScaling opsv1alpha1.HorizontalScaling, ifIgnore bool) *opsv1alpha1.OpsRequest { horizontalOpsName := "horizontal-scaling-ops-" + testCtx.GetRandomStr() + var ignoreStrictValidation string + if ifIgnore { + ignoreStrictValidation = "true" + } else { + ignoreStrictValidation = "false" + } ops := testops.NewOpsRequestObj(horizontalOpsName, testCtx.DefaultNamespace, clusterName, opsv1alpha1.HorizontalScalingType) ops.Spec.HorizontalScalingList = []opsv1alpha1.HorizontalScaling{ horizontalScaling, } + ops.Annotations = map[string]string{} + ops.Annotations[constant.IgnoreHscaleValidateAnnoKey] = ignoreStrictValidation opsRequest := testops.CreateOpsRequest(ctx, testCtx, ops) opsRequest.Status.Phase = opsv1alpha1.OpsPendingPhase return opsRequest diff --git a/pkg/operations/ops_progress_util_test.go b/pkg/operations/ops_progress_util_test.go index f8e7ef54a3e..28c0d6c351f 100644 --- a/pkg/operations/ops_progress_util_test.go +++ b/pkg/operations/ops_progress_util_test.go @@ -131,7 +131,7 @@ var _ = Describe("Ops ProgressDetails", func() { ReplicaChanges: pointer.Int32(2), }, }, - }) + }, false) mockComponentIsOperating(opsRes.Cluster, appsv1.UpdatingComponentPhase, defaultCompName) // appsv1.HorizontalScalingPhase initClusterForOps(opsRes) @@ -188,7 +188,7 @@ var _ = Describe("Ops ProgressDetails", func() { ReplicaChanges: pointer.Int32(1), }, }, - }) + }, false) mockComponentIsOperating(opsRes.Cluster, appsv1.UpdatingComponentPhase, defaultCompName) // appsv1.HorizontalScalingPhase initClusterForOps(opsRes) diff --git a/pkg/operations/ops_util.go b/pkg/operations/ops_util.go index ee4133a3c8b..974cca91944 100644 --- a/pkg/operations/ops_util.go +++ b/pkg/operations/ops_util.go @@ -28,6 +28,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" @@ -349,3 +350,13 @@ func getComponentSpecOrShardingTemplate(cluster *appsv1.Cluster, componentName s } return nil } + +func getMissingElementsInSetFromList(set sets.Set[string], list []string) []string { + var diff []string + for _, v := range list { + if !set.Has(v) { + diff = append(diff, v) + } + } + return diff +} diff --git a/pkg/operations/ops_util_test.go b/pkg/operations/ops_util_test.go index c96eb2ec455..53ce59e9eb0 100644 --- a/pkg/operations/ops_util_test.go +++ b/pkg/operations/ops_util_test.go @@ -87,7 +87,7 @@ var _ = Describe("OpsUtil functions", func() { ReplicaChanges: pointer.Int32(2), }, }, - }) + }, false) Expect(patchValidateErrorCondition(ctx, k8sClient, opsRes, "validate error")).Should(Succeed()) Expect(PatchOpsHandlerNotSupported(ctx, k8sClient, opsRes)).Should(Succeed()) Expect(isOpsRequestFailedPhase(opsv1alpha1.OpsFailedPhase)).Should(BeTrue()) @@ -209,7 +209,7 @@ var _ = Describe("OpsUtil functions", func() { ReplicaChanges: pointer.Int32(1), }, }, - }) + }, false) opsRes.OpsRequest = ops _, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes) Expect(err).ShouldNot(HaveOccurred()) @@ -266,7 +266,7 @@ var _ = Describe("OpsUtil functions", func() { ScaleIn: &opsv1alpha1.ScaleIn{ ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)}, }, - }) + }, false) opsRes.OpsRequest = ops1 _, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes) Expect(err).ShouldNot(HaveOccurred()) @@ -278,7 +278,7 @@ var _ = Describe("OpsUtil functions", func() { ScaleOut: &opsv1alpha1.ScaleOut{ ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)}, }, - }) + }, false) ops2.Annotations = map[string]string{constant.OpsDependentOnSuccessfulOpsAnnoKey: ops1.Name} ops2.Spec.Force = true opsRes.OpsRequest = ops2 @@ -329,7 +329,7 @@ var _ = Describe("OpsUtil functions", func() { ReplicaChanges: pointer.Int32(1), }, }, - }) + }, false) reqCtx := intctrlutil.RequestCtx{Ctx: testCtx.Ctx} _, _ = GetOpsManager().Do(reqCtx, k8sClient, opsRes) Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsFailedPhase)) @@ -342,7 +342,7 @@ var _ = Describe("OpsUtil functions", func() { ReplicaChanges: pointer.Int32(1), }, }, - }) + }, false) opsRes.OpsRequest.Spec.Force = true opsRes.OpsRequest.Spec.EnqueueOnForce = true opsRes.OpsRequest.Status.Phase = opsv1alpha1.OpsPendingPhase