Skip to content

Commit

Permalink
set schedulingPolicy to each component/shard
Browse files Browse the repository at this point in the history
  • Loading branch information
yipeng1030 committed Nov 8, 2024
1 parent 891d171 commit 0a65def
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 26 deletions.
33 changes: 28 additions & 5 deletions pkg/cmd/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ var _ = Describe("Cluster", func() {
Expect(o.Name).ShouldNot(BeEmpty())
Expect(o.Run()).Should(Succeed())
})
It("with schedulingPolicy", func() {
It("should apply SharedNode tenancy and Preferred PodAntiAffinity", func() {
o, err := NewSubCmdsOptions(createOptions, clusterType)
Expect(err).Should(Succeed())
o.Tenancy = "SharedNode"
o.TopologyKeys = []string{"zone", "hostname"}
o.NodeLabels = map[string]string{"environment": "environment", "region": "region"}
o.TolerationsRaw = []string{"key=value:effect", " key:effect"}
o.TopologyKeys = []string{"test-topology1", "test-topology2"}
o.NodeLabels = map[string]string{"environment": "test-env", "region": "test-region"}
o.TolerationsRaw = []string{"testKey1=testValue1:NoSchedule", "testKey2=testValue2:NoExecute"}
o.PodAntiAffinity = "Preferred"

Expect(err).Should(Succeed())
Expect(o).ShouldNot(BeNil())
Expect(o.ChartInfo).ShouldNot(BeNil())
o.Format = printer.YAML
Expand All @@ -124,6 +124,29 @@ var _ = Describe("Cluster", func() {
Expect(o.Name).ShouldNot(BeEmpty())
Expect(o.Run()).Should(Succeed())
})

It("should apply DedicatedNode tenancy and Required PodAntiAffinity", func() {
o, err := NewSubCmdsOptions(createOptions, clusterType)
Expect(err).Should(Succeed())
o.Tenancy = "DedicatedNode"
o.TopologyKeys = []string{"test-region", "test-zone"}
o.NodeLabels = map[string]string{"cluster": "test-cluster", "env": "test-production"}
o.TolerationsRaw = []string{"testKey3=testValue3:NoSchedule", "testKey4=testValue4:NoExecute"}
o.PodAntiAffinity = "Required"

Expect(o).ShouldNot(BeNil())
Expect(o.ChartInfo).ShouldNot(BeNil())
o.Format = printer.YAML

Expect(o.CreateOptions.Complete()).To(Succeed())
o.Client = testing.FakeClientSet()
fakeDiscovery2, _ := o.Client.Discovery().(*fakediscovery.FakeDiscovery)
fakeDiscovery2.FakedServerVersion = &version.Info{Major: "1", Minor: "27", GitVersion: "v1.27.0"}
Expect(o.Complete(nil)).To(Succeed())
Expect(o.Validate()).To(Succeed())
Expect(o.Name).ShouldNot(BeEmpty())
Expect(o.Run()).Should(Succeed())
})
})

Context("create validate", func() {
Expand Down
56 changes: 38 additions & 18 deletions pkg/cmd/cluster/create_subcmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
cmdutil "k8s.io/kubectl/pkg/cmd/util"

v1 "github.com/apecloud/kubeblocks/apis/apps/v1"

"github.com/apecloud/kbcli/pkg/action"
"github.com/apecloud/kbcli/pkg/cluster"
"github.com/apecloud/kbcli/pkg/printer"
Expand All @@ -62,11 +60,11 @@ type CreateSubCmdsOptions struct {
// Configuration and options for cluster affinity and tolerations
PodAntiAffinity string `json:"podAntiAffinity"`
// TopologyKeys if TopologyKeys is nil, add omitempty json tag, because CueLang can not covert null to list.
TopologyKeys []string `json:"topologyKeys,omitempty"`
NodeLabels map[string]string `json:"nodeLabels,omitempty"`
Tenancy string `json:"tenancy"`
TolerationsRaw []string `json:"-"`
schedulingPolicy *v1.SchedulingPolicy
TopologyKeys []string `json:"topologyKeys,omitempty"`
NodeLabels map[string]string `json:"nodeLabels,omitempty"`
Tenancy string `json:"tenancy"`
TolerationsRaw []string `json:"-"`
Tolerations []corev1.Toleration

// SkipSchemaValidation is used to skip the schema validation of the helm chart.
SkipSchemaValidation bool `json:"-"`
Expand Down Expand Up @@ -203,7 +201,6 @@ func (o *CreateSubCmdsOptions) Complete(cmd *cobra.Command) error {
}

// Build tolerations if raw toleration rules are configured
tolerations := make([]corev1.Toleration, 0)
if o.TolerationsRaw != nil {
tolerationsResult, err := util.BuildTolerations(o.TolerationsRaw)
if err != nil {
Expand All @@ -213,14 +210,11 @@ func (o *CreateSubCmdsOptions) Complete(cmd *cobra.Command) error {
if err != nil {
return err
}
err = json.Unmarshal(jsonData, &tolerations)
err = json.Unmarshal(jsonData, &o.Tolerations)
if err != nil {
return err
}
}

o.schedulingPolicy = util.BuildSchedulingPolicy(o.Name, tolerations, o.NodeLabels, o.PodAntiAffinity, o.TopologyKeys)

return nil
}

Expand Down Expand Up @@ -256,15 +250,27 @@ func (o *CreateSubCmdsOptions) Run() error {
return err
}

if clusterObj != nil {
if o.schedulingPolicy != nil {
converted, err := runtime.DefaultUnstructuredConverter.ToUnstructured(o.schedulingPolicy)
if err != nil {
spec, _ := clusterObj.Object["spec"].(map[string]interface{})
if compSpec, ok := spec["componentSpecs"].([]interface{}); ok {
for i := range compSpec {
comp := compSpec[i].(map[string]interface{})
if err := o.applySchedulingPolicyToComponent(comp); err != nil {
return err
}
_ = unstructured.SetNestedField(clusterObj.Object, converted, "spec", "schedulingPolicy")
compSpec[i] = comp
}
}

if shardingSpec, ok := spec["shardings"].([]interface{}); ok {
for i := range shardingSpec {
shard := shardingSpec[i].(map[string]interface{})
if compSpec, ok := shard["template"].(map[string]interface{}); ok {
if err := o.applySchedulingPolicyToComponent(compSpec); err != nil {
return err
}
shard["template"] = compSpec
}
}
_ = unstructured.SetNestedField(clusterObj.Object, o.Tenancy, "spec", "tenancy")
}

// only edits the cluster object, other dependency objects are created directly
Expand Down Expand Up @@ -356,3 +362,17 @@ func (o *CreateSubCmdsOptions) getClusterObj(objs []*objectInfo) (*unstructured.
}
return nil, fmt.Errorf("failed to find cluster object from manifests rendered from %s chart", o.ClusterType)
}

func (o *CreateSubCmdsOptions) applySchedulingPolicyToComponent(item map[string]interface{}) error {
if compName, ok := item["name"]; ok {
schedulingPolicy, needSet := util.BuildSchedulingPolicy(o.Tenancy, o.Name, compName.(string), o.Tolerations, o.NodeLabels, o.PodAntiAffinity, o.TopologyKeys)
if needSet {
converted, err := runtime.DefaultUnstructuredConverter.ToUnstructured(schedulingPolicy)
if err != nil {
return err
}
_ = unstructured.SetNestedField(item, converted, "schedulingPolicy")
}
}
return nil
}
64 changes: 61 additions & 3 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,14 @@ func DisplayDiffWithColor(out io.Writer, diffText string) {
}
}

func BuildSchedulingPolicy(clusterName string, tolerations []corev1.Toleration, nodeLabels map[string]string, podAntiAffinity string, topologyKeys []string) *kbappsv1.SchedulingPolicy {
func BuildSchedulingPolicy(tenancy string, clusterName string, compName string, tolerations []corev1.Toleration, nodeLabels map[string]string, podAntiAffinity string, topologyKeys []string) (*kbappsv1.SchedulingPolicy, bool) {
if len(tolerations) == 0 && len(nodeLabels) == 0 && len(topologyKeys) == 0 {
return nil, false
}
affinity := &corev1.Affinity{}
if podAntiAffinity != "" || len(topologyKeys) > 0 {
affinity.PodAntiAffinity = BuildPodAntiAffinityForComponent(tenancy, clusterName, compName, podAntiAffinity, topologyKeys)
}

var topologySpreadConstraints []corev1.TopologySpreadConstraint

Expand All @@ -1100,19 +1107,21 @@ func BuildSchedulingPolicy(clusterName string, tolerations []corev1.Toleration,
TopologyKey: topologyKey,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
constant.AppInstanceLabelKey: clusterName,
constant.AppInstanceLabelKey: clusterName,
constant.KBAppComponentLabelKey: compName,
},
},
})
}

schedulingPolicy := &kbappsv1.SchedulingPolicy{
Affinity: affinity,
NodeSelector: nodeLabels,
Tolerations: tolerations,
TopologySpreadConstraints: topologySpreadConstraints,
}

return schedulingPolicy
return schedulingPolicy, true
}

// BuildTolerations toleration format: key=value:effect or key:effect,
Expand Down Expand Up @@ -1203,6 +1212,55 @@ func BuildPodAntiAffinity(podAntiAffinityStrategy string, topologyKeys []string)
return podAntiAffinity
}

// BuildPodAntiAffinityForComponent build pod anti affinity from topology keys and tenancy for cluster
func BuildPodAntiAffinityForComponent(tenancy string, clusterName string, compName string, podAntiAffinityStrategy string, topologyKeys []string) *corev1.PodAntiAffinity {
var podAntiAffinity *corev1.PodAntiAffinity
var podAffinityTerms []corev1.PodAffinityTerm
for _, topologyKey := range topologyKeys {
podAffinityTerms = append(podAffinityTerms, corev1.PodAffinityTerm{
TopologyKey: topologyKey,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
constant.AppInstanceLabelKey: clusterName,
constant.KBAppComponentLabelKey: compName,
},
},
})
}
if podAntiAffinityStrategy == string(kbappsv1alpha1.Required) {
podAntiAffinity = &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms,
}
} else {
var weightedPodAffinityTerms []corev1.WeightedPodAffinityTerm
for _, podAffinityTerm := range podAffinityTerms {
weightedPodAffinityTerms = append(weightedPodAffinityTerms, corev1.WeightedPodAffinityTerm{
Weight: 100,
PodAffinityTerm: podAffinityTerm,
})
}
podAntiAffinity = &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: weightedPodAffinityTerms,
}
}

// Add pod PodAffinityTerm for dedicated node
if kbappsv1alpha1.TenancyType(tenancy) == kbappsv1alpha1.DedicatedNode {
podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(
podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution, corev1.PodAffinityTerm{
TopologyKey: corev1.LabelHostname,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
constant.AppInstanceLabelKey: clusterName,
constant.KBAppComponentLabelKey: compName,
},
},
})
}

return podAntiAffinity
}

// AddDirToPath add a dir to the PATH environment variable
func AddDirToPath(dir string) error {
if dir == "" {
Expand Down

0 comments on commit 0a65def

Please sign in to comment.