Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support validate policy of opsrequest #8232

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions apis/operations/v1alpha1/opsrequest_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,12 +443,16 @@ func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, com
if err := validateHScaleOperation(scaleOut.ReplicaChanger, scaleOut.NewInstances, scaleOut.OfflineInstancesToOnline, false); err != nil {
return err
}
if len(scaleOut.OfflineInstancesToOnline) > 0 {
offlineInstanceSet := sets.New(compSpec.OfflineInstances...)
for _, offlineInsName := range scaleOut.OfflineInstancesToOnline {
if _, ok := offlineInstanceSet[offlineInsName]; !ok {
return fmt.Errorf(`cannot find the offline instance "%s" in component "%s" for scaleOut operation`, offlineInsName, hScale.ComponentName)
}
}
// instance cannot be both in OfflineInstancesToOnline and OnlineInstancesToOffline
if scaleIn != nil && scaleOut != nil {
offlineToOnlineSet := make(map[string]struct{})
for _, instance := range scaleIn.OnlineInstancesToOffline {
offlineToOnlineSet[instance] = struct{}{}
}
for _, instance := range scaleOut.OfflineInstancesToOnline {
if _, exists := offlineToOnlineSet[instance]; exists {
return fmt.Errorf(`instance "%s" cannot be both in "OfflineInstancesToOnline" and "OnlineInstancesToOffline"`, instance)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/constant/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ const (
DisableHAAnnotationKey = "operations.kubeblocks.io/disable-ha"
RelatedOpsAnnotationKey = "operations.kubeblocks.io/related-ops"
OpsDependentOnSuccessfulOpsAnnoKey = "operations.kubeblocks.io/dependent-on-successful-ops" // OpsDependentOnSuccessfulOpsAnnoKey wait for the dependent ops to succeed before executing the current ops. If it fails, this ops will also fail.
IgnoreHscaleValidateAnnoKey = "apps.kubeblocks.io/ignore-strict-horizontal-scale-validation"
)
1 change: 0 additions & 1 deletion pkg/controller/instanceset/object_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebase main branch

"k8s.io/apimachinery/pkg/util/sets"

workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
Expand Down
170 changes: 139 additions & 31 deletions pkg/operations/horizontal_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ package operations
import (
"fmt"
"slices"
"strings"
"time"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -97,18 +100,9 @@ func (hs horizontalScalingOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli
if err := compOpsSet.updateClusterComponentsAndShardings(opsRes.Cluster, func(compSpec *appsv1.ClusterComponentSpec, obj ComponentOpsInterface) error {
horizontalScaling := obj.(opsv1alpha1.HorizontalScaling)
lastCompConfiguration := opsRes.OpsRequest.Status.LastConfiguration.Components[obj.GetComponentName()]
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
// check if the instances are online.
currPodSet, err := intctrlcomp.GenerateAllPodNamesToSet(*lastCompConfiguration.Replicas, lastCompConfiguration.Instances, lastCompConfiguration.OfflineInstances,
opsRes.Cluster.Name, obj.GetComponentName())
if err != nil {
return err
}
for _, onlineIns := range horizontalScaling.ScaleIn.OnlineInstancesToOffline {
if _, ok := currPodSet[onlineIns]; !ok {
return intctrlutil.NewFatalError(fmt.Sprintf(`instance "%s" specified in onlineInstancesToOffline is not online`, onlineIns))
}
}

if err := hs.validateHorizontalScaling(opsRes, lastCompConfiguration, obj); err != nil {
return err
}
replicas, instances, offlineInstances, err := hs.getExpectedCompValues(opsRes, compSpec.DeepCopy(),
lastCompConfiguration, horizontalScaling)
Expand Down Expand Up @@ -205,6 +199,16 @@ func (hs horizontalScalingOpsHandler) getCreateAndDeletePodSet(opsRes *OpsResour
deletePodSet[k] = appsv1.GetInstanceTemplateName(clusterName, fullCompName, k)
}
}
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
for _, v := range horizontalScaling.ScaleIn.OnlineInstancesToOffline {
deletePodSet[v] = appsv1alpha1.GetInstanceTemplateName(clusterName, fullCompName, v)
}
}
if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 {
for _, v := range horizontalScaling.ScaleOut.OfflineInstancesToOnline {
createPodSet[v] = appsv1alpha1.GetInstanceTemplateName(clusterName, fullCompName, v)
}
}
if opsRes.OpsRequest.Status.Phase == opsv1alpha1.OpsCancellingPhase {
// when cancelling this opsRequest, revert the changes.
return deletePodSet, createPodSet, nil
Expand Down Expand Up @@ -294,16 +298,56 @@ func (hs horizontalScalingOpsHandler) getExpectedCompValues(
compReplicas := *lastCompConfiguration.Replicas
compInstanceTpls := slices.Clone(lastCompConfiguration.Instances)
compOfflineInstances := lastCompConfiguration.OfflineInstances
expectOfflineInstances := hs.getCompExpectedOfflineInstances(compOfflineInstances, horizontalScaling)
err := hs.autoSyncReplicaChanges(opsRes, horizontalScaling, compReplicas, compInstanceTpls, expectOfflineInstances)
filteredHorizontal, err := filterHorizontalScalingSpec(opsRes, compReplicas, compInstanceTpls, compOfflineInstances, horizontalScaling.DeepCopy())
if err != nil {
return 0, nil, nil, err
}
expectOfflineInstances := hs.getCompExpectedOfflineInstances(compOfflineInstances, *filteredHorizontal)
err = hs.autoSyncReplicaChanges(opsRes, *filteredHorizontal, compReplicas, compInstanceTpls, expectOfflineInstances)
if err != nil {
return 0, nil, nil, err
}
return hs.getCompExpectReplicas(horizontalScaling, compReplicas),
hs.getCompExpectedInstances(compInstanceTpls, horizontalScaling),
return hs.getCompExpectReplicas(*filteredHorizontal, compReplicas),
hs.getCompExpectedInstances(compInstanceTpls, *filteredHorizontal),
expectOfflineInstances, nil
}

// only offlined instances could be taken online.
// and only onlined instances could be taken offline.
func filterHorizontalScalingSpec(
opsRes *OpsResource,
compReplicas int32,
compInstanceTpls []appsv1.InstanceTemplate,
compOfflineInstances []string,
horizontalScaling *opsv1alpha1.HorizontalScaling) (*opsv1alpha1.HorizontalScaling, error) {
offlineInstances := sets.New(compOfflineInstances...)
podSet, err := intctrlcomp.GenerateAllPodNamesToSet(compReplicas, compInstanceTpls, compOfflineInstances,
opsRes.Cluster.Name, horizontalScaling.ComponentName)
if err != nil {
return nil, err
}
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
onlinedInstanceFromOps := sets.Set[string]{}
for _, insName := range horizontalScaling.ScaleIn.OnlineInstancesToOffline {
if _, ok := podSet[insName]; ok {
onlinedInstanceFromOps.Insert(insName)
}
}
horizontalScaling.ScaleIn.OnlineInstancesToOffline = onlinedInstanceFromOps.UnsortedList()
}
if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 {
offlinedInstanceFromOps := sets.Set[string]{}
for _, insName := range horizontalScaling.ScaleOut.OfflineInstancesToOnline {
if _, ok := offlineInstances[insName]; ok {
offlinedInstanceFromOps.Insert(insName)
}
}
horizontalScaling.ScaleOut.OfflineInstancesToOnline = offlinedInstanceFromOps.UnsortedList()
}
return horizontalScaling, nil

}

// autoSyncReplicaChanges auto-sync the replicaChanges of the component and instance templates.
func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges(
opsRes *OpsResource,
Expand Down Expand Up @@ -339,6 +383,7 @@ func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges(
}
return replicaChanger.Instances, &allReplicaChanges
}

// auto sync the replicaChanges.
scaleIn := horizontalScaling.ScaleIn
if scaleIn != nil {
Expand All @@ -347,21 +392,7 @@ func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges(
}
scaleOut := horizontalScaling.ScaleOut
if scaleOut != nil {
// get the pod set when removing the specified instances from offlineInstances slice
podSet, err := intctrlcomp.GenerateAllPodNamesToSet(compReplicas, compInstanceTpls, compExpectOfflineInstances,
opsRes.Cluster.Name, horizontalScaling.ComponentName)
if err != nil {
return err
}
onlineInsCountMap := map[string]int32{}
for _, insName := range scaleOut.OfflineInstancesToOnline {
if _, ok := podSet[insName]; !ok {
// if the specified instance will not be created, continue
continue
}
insTplName := appsv1.GetInstanceTemplateName(opsRes.Cluster.Name, horizontalScaling.ComponentName, insName)
onlineInsCountMap[insTplName]++
}
onlineInsCountMap := opsRes.OpsRequest.CountOfflineOrOnlineInstances(opsRes.Cluster.Name, horizontalScaling.ComponentName, scaleOut.OfflineInstancesToOnline)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert it.

There is a scenario where the original offlineInstances were [pod1, pod3, pod10], and the offlineInstancesToOnline was pod10. However, the current replicas count is 2, which means that pod10 will not actually go online.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or, calculate the real onlineInstances in filterHorizontalScalingSpec

scaleOut.Instances, scaleOut.ReplicaChanges = getSyncedInstancesAndReplicaChanges(onlineInsCountMap, scaleOut.ReplicaChanger, scaleOut.NewInstances)
}
return nil
Expand Down Expand Up @@ -433,3 +464,80 @@ func (hs horizontalScalingOpsHandler) getCompExpectedOfflineInstances(
}
return compOfflineInstances
}

// validate if there is any instance specified in the request that is not exist, return error.
// if ignoreStrictHorizontalValidation is empty, it would validate the instances if they are already offlined or onlined.
func (hs horizontalScalingOpsHandler) validateHorizontalScaling(
opsRes *OpsResource,
lastCompConfiguration opsv1alpha1.LastComponentConfiguration,
obj ComponentOpsInterface,
) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better with the following function:

func (hs horizontalScalingOpsHandler) validateHorizontalScaling(
	opsRes *OpsResource,
	lastCompConfiguration opsv1alpha1.LastComponentConfiguration,
	horizontalScaling opsv1alpha1.HorizontalScaling,
) error {
	if opsRes.OpsRequest.Annotations[constant.IgnoreHscaleValidateAnnoKey] == "true" {
		return nil
	}
	if horizontalScaling.ScaleIn != nil {
		if err := hs.validateOnlineInstancesToOffline(lastCompConfiguration,
			horizontalScaling.ScaleIn.OnlineInstancesToOffline, opsRes.Cluster.Name, horizontalScaling.ComponentName); err != nil {
			return err
		}
	}
	if horizontalScaling.ScaleOut != nil {
		if err := hs.validateOfflineInstancesToOnline(lastCompConfiguration,
			horizontalScaling.ScaleOut.OfflineInstancesToOnline, horizontalScaling.ComponentName); err != nil {
			return err
		}
	}
	return nil
}

func (hs horizontalScalingOpsHandler) validateOnlineInstancesToOffline(
	lastCompConfiguration opsv1alpha1.LastComponentConfiguration,
	onlineInstancesToOffline []string,
	clusterName, componentName string) error {
	if len(onlineInstancesToOffline) == 0 {
		return nil
	}
	toOfflineSet := sets.New(onlineInstancesToOffline...)
	if len(toOfflineSet) < len(onlineInstancesToOffline) {
		return intctrlutil.NewFatalError("instances specified in onlineInstancesToOffline has duplicates")
	}
	currPodSet, err := intctrlcomp.GenerateAllPodNamesToSet(*lastCompConfiguration.Replicas, lastCompConfiguration.Instances,
		lastCompConfiguration.OfflineInstances, clusterName, componentName)
	if err != nil {
		return err
	}
	for _, onlineIns := range onlineInstancesToOffline {
		if _, ok := currPodSet[onlineIns]; !ok {
			return intctrlutil.NewFatalError(fmt.Sprintf(`instance "%s" specified in onlineInstancesToOffline is not onlinee`, onlineIns))
		}
	}
	return nil
}

func (hs horizontalScalingOpsHandler) validateOfflineInstancesToOnline(
	lastCompConfiguration opsv1alpha1.LastComponentConfiguration,
	offlineInstancesToOnline []string,
	componentName string) error {
	if len(offlineInstancesToOnline) == 0 {
		return nil
	}
	toOnlineSet := sets.New(offlineInstancesToOnline...)
	if len(toOnlineSet) < len(offlineInstancesToOnline) {
		return intctrlutil.NewFatalError("instances specified in offlineInstancesToOnline has duplicates")
	}
	offlineInstanceSet := sets.New(lastCompConfiguration.OfflineInstances...)
	for _, offlineIns := range offlineInstancesToOnline {
		if _, ok := offlineInstanceSet[offlineIns]; !ok {
			return intctrlutil.NewFatalError(fmt.Sprintf(`cannot find the offline instance "%s" in component "%s" for scaleOut operation`, offlineIns, componentName))
		}
	}
	return nil
}

horizontalScaling := obj.(opsv1alpha1.HorizontalScaling)
currPodSet, err := intctrlcomp.GenerateAllPodNamesToSet(*lastCompConfiguration.Replicas, lastCompConfiguration.Instances, lastCompConfiguration.OfflineInstances,
opsRes.Cluster.Name, obj.GetComponentName())
if err != nil {
return err
}
offlineInstances := sets.New(lastCompConfiguration.OfflineInstances...)

notExistInstanceList := sets.Set[string]{}
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
onlinedInstances, _, notExistInstances := hs.collectOnlineAndOfflineAndNotExistInstances(
horizontalScaling.ScaleIn.OnlineInstancesToOffline,
offlineInstances,
currPodSet)
notExistInstanceList = notExistInstanceList.Union(notExistInstances)
if value, exist := opsRes.OpsRequest.Annotations[constant.IgnoreHscaleValidateAnnoKey]; !exist || value == "false" {
if onlinedInstances.Len() != len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) {
unscalablePods := getMissingElementsInSetFromList(onlinedInstances, horizontalScaling.ScaleIn.OnlineInstancesToOffline)
if unscalablePods == nil {
return intctrlutil.NewFatalError("instances specified in onlineInstancesToOffline has duplicates")
}
return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in onlineInstancesToOffline is not online or not exist`, strings.Join(unscalablePods, ", ")))
}
}
}
if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 {
_, offlinedInstances, notExistInstances := hs.collectOnlineAndOfflineAndNotExistInstances(
horizontalScaling.ScaleOut.OfflineInstancesToOnline,
offlineInstances,
currPodSet)
notExistInstanceList = notExistInstanceList.Union(notExistInstances)
if value, exist := opsRes.OpsRequest.Annotations[constant.IgnoreHscaleValidateAnnoKey]; !exist || value == "false" {
if offlinedInstances.Len() != len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) {
unscalablePods := getMissingElementsInSetFromList(offlinedInstances, horizontalScaling.ScaleOut.OfflineInstancesToOnline)
if unscalablePods == nil {
return intctrlutil.NewFatalError("instances specified in onlineInstancesToOffline has duplicates")
}
return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in offlineInstancesToOnline is not offline or not exist`, strings.Join(unscalablePods, ", ")))
}
}
}
if notExistInstanceList.Len() > 0 {
return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in the request is not exist`, strings.Join(notExistInstanceList.UnsortedList(), ", ")))
}
return nil
}

// collect the online and offline instances specified in the request.
func (hs horizontalScalingOpsHandler) collectOnlineAndOfflineAndNotExistInstances(
instance []string,
offlineInstances sets.Set[string],
currPodSet map[string]string) (sets.Set[string], sets.Set[string], sets.Set[string]) {

offlinedInstanceFromOps := sets.Set[string]{}
onlinedInstanceFromOps := sets.Set[string]{}
notExistInstanceFromOps := sets.Set[string]{}
for _, insName := range instance {
if _, ok := offlineInstances[insName]; ok {
offlinedInstanceFromOps.Insert(insName)
continue
}
if _, ok := currPodSet[insName]; ok {
onlinedInstanceFromOps.Insert(insName)
continue
}
notExistInstanceFromOps.Insert(insName)
}
return onlinedInstanceFromOps, offlinedInstanceFromOps, notExistInstanceFromOps
}
Loading
Loading