Skip to content

Commit

Permalink
fix a bug where the structured log is not displayed correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
qiushida committed Dec 12, 2023
1 parent f0f7ebb commit 50d8a53
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ type topology struct {
// topologySpreadConstraint is an internal version for v1.TopologySpreadConstraint
// and where the selector is parsed.
// This mirrors scheduler: https://github.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/plugins/podtopologyspread/common.go#L37
// Fields are exported for structured logging.
type topologySpreadConstraint struct {
maxSkew int32
topologyKey string
selector labels.Selector
nodeAffinityPolicy v1.NodeInclusionPolicy
nodeTaintsPolicy v1.NodeInclusionPolicy
podNodeAffinity nodeaffinity.RequiredNodeAffinity
podTolerations []v1.Toleration
MaxSkew int32
TopologyKey string
Selector labels.Selector
NodeAffinityPolicy v1.NodeInclusionPolicy
NodeTaintsPolicy v1.NodeInclusionPolicy
PodNodeAffinity nodeaffinity.RequiredNodeAffinity
PodTolerations []v1.Toleration
}

// RemovePodsViolatingTopologySpreadConstraint evicts pods which violate their topology spread constraints
Expand Down Expand Up @@ -180,9 +181,9 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
// pre-populate the topologyPair map with all the topologies available from the nodeMap
// (we can't just build it from existing pods' nodes because a topology may have 0 pods)
for _, node := range nodeMap {
if val, ok := node.Labels[tsc.topologyKey]; ok {
if val, ok := node.Labels[tsc.TopologyKey]; ok {
if matchNodeInclusionPolicies(tsc, node) {
constraintTopologies[topologyPair{key: tsc.topologyKey, value: val}] = make([]*v1.Pod, 0)
constraintTopologies[topologyPair{key: tsc.TopologyKey, value: val}] = make([]*v1.Pod, 0)
}
}
}
Expand All @@ -196,7 +197,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
continue
}
// 4. if the pod matches this TopologySpreadConstraint LabelSelector
if !tsc.selector.Matches(labels.Set(pod.Labels)) {
if !tsc.Selector.Matches(labels.Set(pod.Labels)) {
continue
}

Expand All @@ -206,12 +207,12 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
// If ok is false, node is nil in which case node.Labels will panic. In which case a pod is yet to be scheduled. So it's safe to just continue here.
continue
}
nodeValue, ok := node.Labels[tsc.topologyKey]
nodeValue, ok := node.Labels[tsc.TopologyKey]
if !ok {
continue
}
// 6. create a topoPair with key as this TopologySpreadConstraint
topoPair := topologyPair{key: tsc.topologyKey, value: nodeValue}
topoPair := topologyPair{key: tsc.TopologyKey, value: nodeValue}
// 7. add the pod with key as this topoPair
constraintTopologies[topoPair] = append(constraintTopologies[topoPair], pod)
sumPods++
Expand Down Expand Up @@ -266,7 +267,7 @@ func topologyIsBalanced(topology map[topologyPair][]*v1.Pod, tsc topologySpreadC
if len(pods) > maxDomainSize {
maxDomainSize = len(pods)
}
if int32(maxDomainSize-minDomainSize) > tsc.maxSkew {
if int32(maxDomainSize-minDomainSize) > tsc.MaxSkew {
return false
}
}
Expand Down Expand Up @@ -307,7 +308,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) balanceDomains(
topologyBalanceNodeFit := utilpointer.BoolDeref(d.args.TopologyBalanceNodeFit, true)

eligibleNodes := filterEligibleNodes(nodes, tsc)
nodesBelowIdealAvg := filterNodesBelowIdealAvg(eligibleNodes, sortedDomains, tsc.topologyKey, idealAvg)
nodesBelowIdealAvg := filterNodesBelowIdealAvg(eligibleNodes, sortedDomains, tsc.TopologyKey, idealAvg)

// i is the index for belowOrEqualAvg
// j is the index for aboveAvg
Expand All @@ -323,7 +324,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) balanceDomains(
skew := float64(len(sortedDomains[j].pods) - len(sortedDomains[i].pods))

// if k and j are within the maxSkew of each other, move to next belowOrEqualAvg
if int32(skew) <= tsc.maxSkew {
if int32(skew) <= tsc.MaxSkew {
i++
continue
}
Expand All @@ -337,7 +338,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) balanceDomains(
aboveAvg := math.Ceil(float64(len(sortedDomains[j].pods)) - idealAvg)
belowAvg := math.Ceil(idealAvg - float64(len(sortedDomains[i].pods)))
smallestDiff := math.Min(aboveAvg, belowAvg)
halfSkew := math.Ceil((skew - float64(tsc.maxSkew)) / 2)
halfSkew := math.Ceil((skew - float64(tsc.MaxSkew)) / 2)
movePods := int(math.Min(smallestDiff, halfSkew))
if movePods <= 0 {
i++
Expand Down Expand Up @@ -475,15 +476,15 @@ func filterEligibleNodes(nodes []*v1.Node, tsc topologySpreadConstraint) []*v1.N
}

func matchNodeInclusionPolicies(tsc topologySpreadConstraint, node *v1.Node) bool {
if tsc.nodeAffinityPolicy == v1.NodeInclusionPolicyHonor {
if tsc.NodeAffinityPolicy == v1.NodeInclusionPolicyHonor {
// We ignore parsing errors here for backwards compatibility.
if match, _ := tsc.podNodeAffinity.Match(node); !match {
if match, _ := tsc.PodNodeAffinity.Match(node); !match {
return false
}
}

if tsc.nodeTaintsPolicy == v1.NodeInclusionPolicyHonor {
if _, untolerated := v1helper.FindMatchingUntoleratedTaint(node.Spec.Taints, tsc.podTolerations, doNotScheduleTaintsFilterFunc()); untolerated {
if tsc.NodeTaintsPolicy == v1.NodeInclusionPolicyHonor {
if _, untolerated := v1helper.FindMatchingUntoleratedTaint(node.Spec.Taints, tsc.PodTolerations, doNotScheduleTaintsFilterFunc()); untolerated {
return false
}
}
Expand All @@ -510,19 +511,19 @@ func newTopologySpreadConstraint(constraint v1.TopologySpreadConstraint, pod *v1
}

tsc := topologySpreadConstraint{
maxSkew: constraint.MaxSkew,
topologyKey: constraint.TopologyKey,
selector: selector,
nodeAffinityPolicy: v1.NodeInclusionPolicyHonor, // If NodeAffinityPolicy is nil, we treat NodeAffinityPolicy as "Honor".
nodeTaintsPolicy: v1.NodeInclusionPolicyIgnore, // If NodeTaintsPolicy is nil, we treat NodeTaintsPolicy as "Ignore".
podNodeAffinity: nodeaffinity.GetRequiredNodeAffinity(pod),
podTolerations: pod.Spec.Tolerations,
MaxSkew: constraint.MaxSkew,
TopologyKey: constraint.TopologyKey,
Selector: selector,
NodeAffinityPolicy: v1.NodeInclusionPolicyHonor, // If NodeAffinityPolicy is nil, we treat NodeAffinityPolicy as "Honor".
NodeTaintsPolicy: v1.NodeInclusionPolicyIgnore, // If NodeTaintsPolicy is nil, we treat NodeTaintsPolicy as "Ignore".
PodNodeAffinity: nodeaffinity.GetRequiredNodeAffinity(pod),
PodTolerations: pod.Spec.Tolerations,
}
if constraint.NodeAffinityPolicy != nil {
tsc.nodeAffinityPolicy = *constraint.NodeAffinityPolicy
tsc.NodeAffinityPolicy = *constraint.NodeAffinityPolicy
}
if constraint.NodeTaintsPolicy != nil {
tsc.nodeTaintsPolicy = *constraint.NodeTaintsPolicy
tsc.NodeTaintsPolicy = *constraint.NodeTaintsPolicy
}

return tsc, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1608,20 +1608,20 @@ func TestCheckIdenticalConstraints(t *testing.T) {
selector, _ := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}})

newConstraintSame := topologySpreadConstraint{
maxSkew: 2,
topologyKey: "zone",
selector: selector.DeepCopySelector(),
MaxSkew: 2,
TopologyKey: "zone",
Selector: selector.DeepCopySelector(),
}
newConstraintDifferent := topologySpreadConstraint{
maxSkew: 3,
topologyKey: "node",
selector: selector.DeepCopySelector(),
MaxSkew: 3,
TopologyKey: "node",
Selector: selector.DeepCopySelector(),
}
namespaceTopologySpreadConstraint := []topologySpreadConstraint{
{
maxSkew: 2,
topologyKey: "zone",
selector: selector.DeepCopySelector(),
MaxSkew: 2,
TopologyKey: "zone",
Selector: selector.DeepCopySelector(),
},
}
testCases := []struct {
Expand Down

0 comments on commit 50d8a53

Please sign in to comment.