Skip to content

Commit

Permalink
feat: support ignore strict horizontal scaling validation
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-hui authored and ian-hui committed Nov 6, 2024
1 parent 19fb216 commit 088b7ce
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 68 deletions.
16 changes: 10 additions & 6 deletions apis/operations/v1alpha1/opsrequest_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,12 +443,16 @@ func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, com
if err := validateHScaleOperation(scaleOut.ReplicaChanger, scaleOut.NewInstances, scaleOut.OfflineInstancesToOnline, false); err != nil {
return err
}
if len(scaleOut.OfflineInstancesToOnline) > 0 {
offlineInstanceSet := sets.New(compSpec.OfflineInstances...)
for _, offlineInsName := range scaleOut.OfflineInstancesToOnline {
if _, ok := offlineInstanceSet[offlineInsName]; !ok {
return fmt.Errorf(`cannot find the offline instance "%s" in component "%s" for scaleOut operation`, offlineInsName, hScale.ComponentName)
}
}
// instance cannot be both in OfflineInstancesToOnline and OnlineInstancesToOffline
if scaleIn != nil && scaleOut != nil {
offlineToOnlineSet := make(map[string]struct{})
for _, instance := range scaleIn.OnlineInstancesToOffline {
offlineToOnlineSet[instance] = struct{}{}
}
for _, instance := range scaleOut.OfflineInstancesToOnline {
if _, exists := offlineToOnlineSet[instance]; exists {
return fmt.Errorf(`instance "%s" cannot be both in "OfflineInstancesToOnline" and "OnlineInstancesToOffline"`, instance)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/constant/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ const (
DisableHAAnnotationKey = "operations.kubeblocks.io/disable-ha"
RelatedOpsAnnotationKey = "operations.kubeblocks.io/related-ops"
OpsDependentOnSuccessfulOpsAnnoKey = "operations.kubeblocks.io/dependent-on-successful-ops" // OpsDependentOnSuccessfulOpsAnnoKey wait for the dependent ops to succeed before executing the current ops. If it fails, this ops will also fail.
IgnoreHscaleValidateAnnoKey = "apps.kubeblocks.io/ignore-strict-horizontal-scale-validation"
)
170 changes: 139 additions & 31 deletions pkg/operations/horizontal_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ package operations
import (
"fmt"
"slices"
"strings"
"time"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -97,18 +100,9 @@ func (hs horizontalScalingOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli
if err := compOpsSet.updateClusterComponentsAndShardings(opsRes.Cluster, func(compSpec *appsv1.ClusterComponentSpec, obj ComponentOpsInterface) error {
horizontalScaling := obj.(opsv1alpha1.HorizontalScaling)
lastCompConfiguration := opsRes.OpsRequest.Status.LastConfiguration.Components[obj.GetComponentName()]
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
// check if the instances are online.
currPodSet, err := intctrlcomp.GenerateAllPodNamesToSet(*lastCompConfiguration.Replicas, lastCompConfiguration.Instances, lastCompConfiguration.OfflineInstances,
opsRes.Cluster.Name, obj.GetComponentName())
if err != nil {
return err
}
for _, onlineIns := range horizontalScaling.ScaleIn.OnlineInstancesToOffline {
if _, ok := currPodSet[onlineIns]; !ok {
return intctrlutil.NewFatalError(fmt.Sprintf(`instance "%s" specified in onlineInstancesToOffline is not online`, onlineIns))
}
}

if err := hs.validateHorizontalScaling(opsRes, lastCompConfiguration, obj); err != nil {
return err
}
replicas, instances, offlineInstances, err := hs.getExpectedCompValues(opsRes, compSpec.DeepCopy(),
lastCompConfiguration, horizontalScaling)
Expand Down Expand Up @@ -205,6 +199,16 @@ func (hs horizontalScalingOpsHandler) getCreateAndDeletePodSet(opsRes *OpsResour
deletePodSet[k] = appsv1.GetInstanceTemplateName(clusterName, fullCompName, k)
}
}
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
for _, v := range horizontalScaling.ScaleIn.OnlineInstancesToOffline {
deletePodSet[v] = appsv1alpha1.GetInstanceTemplateName(clusterName, fullCompName, v)
}
}
if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 {
for _, v := range horizontalScaling.ScaleOut.OfflineInstancesToOnline {
createPodSet[v] = appsv1alpha1.GetInstanceTemplateName(clusterName, fullCompName, v)
}
}
if opsRes.OpsRequest.Status.Phase == opsv1alpha1.OpsCancellingPhase {
// when cancelling this opsRequest, revert the changes.
return deletePodSet, createPodSet, nil
Expand Down Expand Up @@ -294,16 +298,56 @@ func (hs horizontalScalingOpsHandler) getExpectedCompValues(
compReplicas := *lastCompConfiguration.Replicas
compInstanceTpls := slices.Clone(lastCompConfiguration.Instances)
compOfflineInstances := lastCompConfiguration.OfflineInstances
expectOfflineInstances := hs.getCompExpectedOfflineInstances(compOfflineInstances, horizontalScaling)
err := hs.autoSyncReplicaChanges(opsRes, horizontalScaling, compReplicas, compInstanceTpls, expectOfflineInstances)
filteredHorizontal, err := filterHorizontalScalingSpec(opsRes, compReplicas, compInstanceTpls, compOfflineInstances, horizontalScaling.DeepCopy())
if err != nil {
return 0, nil, nil, err
}
expectOfflineInstances := hs.getCompExpectedOfflineInstances(compOfflineInstances, *filteredHorizontal)
err = hs.autoSyncReplicaChanges(opsRes, *filteredHorizontal, compReplicas, compInstanceTpls, expectOfflineInstances)
if err != nil {
return 0, nil, nil, err
}
return hs.getCompExpectReplicas(horizontalScaling, compReplicas),
hs.getCompExpectedInstances(compInstanceTpls, horizontalScaling),
return hs.getCompExpectReplicas(*filteredHorizontal, compReplicas),
hs.getCompExpectedInstances(compInstanceTpls, *filteredHorizontal),
expectOfflineInstances, nil
}

// only offlined instances could be taken online.
// and only onlined instances could be taken offline.
func filterHorizontalScalingSpec(
opsRes *OpsResource,
compReplicas int32,
compInstanceTpls []appsv1.InstanceTemplate,
compOfflineInstances []string,
horizontalScaling *opsv1alpha1.HorizontalScaling) (*opsv1alpha1.HorizontalScaling, error) {
offlineInstances := sets.New(compOfflineInstances...)
podSet, err := intctrlcomp.GenerateAllPodNamesToSet(compReplicas, compInstanceTpls, compOfflineInstances,
opsRes.Cluster.Name, horizontalScaling.ComponentName)
if err != nil {
return nil, err
}
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
onlinedInstanceFromOps := sets.Set[string]{}
for _, insName := range horizontalScaling.ScaleIn.OnlineInstancesToOffline {
if _, ok := podSet[insName]; ok {
onlinedInstanceFromOps.Insert(insName)
}
}
horizontalScaling.ScaleIn.OnlineInstancesToOffline = onlinedInstanceFromOps.UnsortedList()
}
if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 {
offlinedInstanceFromOps := sets.Set[string]{}
for _, insName := range horizontalScaling.ScaleOut.OfflineInstancesToOnline {
if _, ok := offlineInstances[insName]; ok {
offlinedInstanceFromOps.Insert(insName)
}
}
horizontalScaling.ScaleOut.OfflineInstancesToOnline = offlinedInstanceFromOps.UnsortedList()
}
return horizontalScaling, nil

}

// autoSyncReplicaChanges auto-sync the replicaChanges of the component and instance templates.
func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges(
opsRes *OpsResource,
Expand Down Expand Up @@ -339,6 +383,7 @@ func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges(
}
return replicaChanger.Instances, &allReplicaChanges
}

// auto sync the replicaChanges.
scaleIn := horizontalScaling.ScaleIn
if scaleIn != nil {
Expand All @@ -347,21 +392,7 @@ func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges(
}
scaleOut := horizontalScaling.ScaleOut
if scaleOut != nil {
// get the pod set when removing the specified instances from offlineInstances slice
podSet, err := intctrlcomp.GenerateAllPodNamesToSet(compReplicas, compInstanceTpls, compExpectOfflineInstances,
opsRes.Cluster.Name, horizontalScaling.ComponentName)
if err != nil {
return err
}
onlineInsCountMap := map[string]int32{}
for _, insName := range scaleOut.OfflineInstancesToOnline {
if _, ok := podSet[insName]; !ok {
// if the specified instance will not be created, continue
continue
}
insTplName := appsv1.GetInstanceTemplateName(opsRes.Cluster.Name, horizontalScaling.ComponentName, insName)
onlineInsCountMap[insTplName]++
}
onlineInsCountMap := opsRes.OpsRequest.CountOfflineOrOnlineInstances(opsRes.Cluster.Name, horizontalScaling.ComponentName, scaleOut.OfflineInstancesToOnline)
scaleOut.Instances, scaleOut.ReplicaChanges = getSyncedInstancesAndReplicaChanges(onlineInsCountMap, scaleOut.ReplicaChanger, scaleOut.NewInstances)
}
return nil
Expand Down Expand Up @@ -433,3 +464,80 @@ func (hs horizontalScalingOpsHandler) getCompExpectedOfflineInstances(
}
return compOfflineInstances
}

// validate if there is any instance specified in the request that is not exist, return error.
// if ignoreStrictHorizontalValidation is empty, it would validate the instances if they are already offlined or onlined.
func (hs horizontalScalingOpsHandler) validateHorizontalScaling(
opsRes *OpsResource,
lastCompConfiguration opsv1alpha1.LastComponentConfiguration,
obj ComponentOpsInterface,
) error {
horizontalScaling := obj.(opsv1alpha1.HorizontalScaling)
currPodSet, err := intctrlcomp.GenerateAllPodNamesToSet(*lastCompConfiguration.Replicas, lastCompConfiguration.Instances, lastCompConfiguration.OfflineInstances,
opsRes.Cluster.Name, obj.GetComponentName())
if err != nil {
return err
}
offlineInstances := sets.New(lastCompConfiguration.OfflineInstances...)

notExistInstanceList := sets.Set[string]{}
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
onlinedInstances, _, notExistInstances := hs.collectOnlineAndOfflineAndNotExistInstances(
horizontalScaling.ScaleIn.OnlineInstancesToOffline,
offlineInstances,
currPodSet)
notExistInstanceList = notExistInstanceList.Union(notExistInstances)
if value, exist := opsRes.OpsRequest.Annotations[constant.IgnoreHscaleValidateAnnoKey]; !exist || value == "false" {
if onlinedInstances.Len() != len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) {
unscalablePods := getMissingElementsInSetFromList(onlinedInstances, horizontalScaling.ScaleIn.OnlineInstancesToOffline)
if unscalablePods == nil {
return intctrlutil.NewFatalError("instances specified in onlineInstancesToOffline has duplicates")
}
return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in onlineInstancesToOffline is not online or not exist`, strings.Join(unscalablePods, ", ")))
}
}
}
if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 {
_, offlinedInstances, notExistInstances := hs.collectOnlineAndOfflineAndNotExistInstances(
horizontalScaling.ScaleOut.OfflineInstancesToOnline,
offlineInstances,
currPodSet)
notExistInstanceList = notExistInstanceList.Union(notExistInstances)
if value, exist := opsRes.OpsRequest.Annotations[constant.IgnoreHscaleValidateAnnoKey]; !exist || value == "false" {
if offlinedInstances.Len() != len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) {
unscalablePods := getMissingElementsInSetFromList(offlinedInstances, horizontalScaling.ScaleOut.OfflineInstancesToOnline)
if unscalablePods == nil {
return intctrlutil.NewFatalError("instances specified in onlineInstancesToOffline has duplicates")
}
return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in offlineInstancesToOnline is not offline or not exist`, strings.Join(unscalablePods, ", ")))
}
}
}
if notExistInstanceList.Len() > 0 {
return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in the request is not exist`, strings.Join(notExistInstanceList.UnsortedList(), ", ")))
}
return nil
}

// collect the online and offline instances specified in the request.
func (hs horizontalScalingOpsHandler) collectOnlineAndOfflineAndNotExistInstances(
instance []string,
offlineInstances sets.Set[string],
currPodSet map[string]string) (sets.Set[string], sets.Set[string], sets.Set[string]) {

offlinedInstanceFromOps := sets.Set[string]{}
onlinedInstanceFromOps := sets.Set[string]{}
notExistInstanceFromOps := sets.Set[string]{}
for _, insName := range instance {
if _, ok := offlineInstances[insName]; ok {
offlinedInstanceFromOps.Insert(insName)
continue
}
if _, ok := currPodSet[insName]; ok {
onlinedInstanceFromOps.Insert(insName)
continue
}
notExistInstanceFromOps.Insert(insName)
}
return onlinedInstanceFromOps, offlinedInstanceFromOps, notExistInstanceFromOps
}
Loading

0 comments on commit 088b7ce

Please sign in to comment.