-
Notifications
You must be signed in to change notification settings - Fork 40.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add pvc as part of equivalence hash #56577
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
/* | ||
Copyright 2017 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package predicates | ||
|
||
import ( | ||
"fmt" | ||
|
||
"k8s.io/api/core/v1" | ||
storagev1 "k8s.io/api/storage/v1" | ||
) | ||
|
||
type FakePersistentVolumeClaimInfo []v1.PersistentVolumeClaim | ||
|
||
func (pvcs FakePersistentVolumeClaimInfo) GetPersistentVolumeClaimInfo(namespace string, pvcID string) (*v1.PersistentVolumeClaim, error) { | ||
for _, pvc := range pvcs { | ||
if pvc.Name == pvcID && pvc.Namespace == namespace { | ||
return &pvc, nil | ||
} | ||
} | ||
return nil, fmt.Errorf("Unable to find persistent volume claim: %s/%s", namespace, pvcID) | ||
} | ||
|
||
type FakeNodeInfo v1.Node | ||
|
||
func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*v1.Node, error) { | ||
node := v1.Node(n) | ||
return &node, nil | ||
} | ||
|
||
type FakeNodeListInfo []v1.Node | ||
|
||
func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*v1.Node, error) { | ||
for _, node := range nodes { | ||
if node.Name == nodeName { | ||
return &node, nil | ||
} | ||
} | ||
return nil, fmt.Errorf("Unable to find node: %s", nodeName) | ||
} | ||
|
||
type FakePersistentVolumeInfo []v1.PersistentVolume | ||
|
||
func (pvs FakePersistentVolumeInfo) GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error) { | ||
for _, pv := range pvs { | ||
if pv.Name == pvID { | ||
return &pv, nil | ||
} | ||
} | ||
return nil, fmt.Errorf("Unable to find persistent volume: %s", pvID) | ||
} | ||
|
||
type FakeStorageClassInfo []storagev1.StorageClass | ||
|
||
func (classes FakeStorageClassInfo) GetStorageClassInfo(name string) (*storagev1.StorageClass, error) { | ||
for _, sc := range classes { | ||
if sc.Name == name { | ||
return &sc, nil | ||
} | ||
} | ||
return nil, fmt.Errorf("Unable to find storage class: %s", name) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -173,7 +173,7 @@ func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName stri | |
|
||
// InvalidateCachedPredicateItemForPodAdd is a wrapper of InvalidateCachedPredicateItem for pod add case | ||
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { | ||
// MatchInterPodAffinity: we assume scheduler can make sure newly binded pod | ||
// MatchInterPodAffinity: we assume scheduler can make sure newly bound pod | ||
// will not break the existing inter pod affinity. So we does not need to invalidate | ||
// MatchInterPodAffinity when pod added. | ||
// | ||
|
@@ -188,12 +188,29 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, | |
|
||
// GeneralPredicates: will always be affected by adding a new pod | ||
invalidPredicates := sets.NewString("GeneralPredicates") | ||
|
||
// MaxPDVolumeCountPredicate: we check the volumes of pod to make decision. | ||
for _, vol := range pod.Spec.Volumes { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this need to be added here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed Hope I am understanding right :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the volume count checker counts the volumes on pods that are assigned on nodes. I guess for the case where you create the Pod with NodeName already set, then yes, the predicate should be invalidated. |
||
if vol.PersistentVolumeClaim != nil { | ||
invalidPredicates.Insert("MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount") | ||
} else { | ||
if vol.AWSElasticBlockStore != nil { | ||
invalidPredicates.Insert("MaxEBSVolumeCount") | ||
} | ||
if vol.GCEPersistentDisk != nil { | ||
invalidPredicates.Insert("MaxGCEPDVolumeCount") | ||
} | ||
if vol.AzureDisk != nil { | ||
invalidPredicates.Insert("MaxAzureDiskVolumeCount") | ||
} | ||
} | ||
} | ||
ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates) | ||
} | ||
|
||
// getHashEquivalencePod returns the hash of equivalence pod. | ||
// 1. equivalenceHash | ||
// 2. if equivalence pod is found | ||
// 2. if equivalence hash is valid | ||
func (ec *EquivalenceCache) getHashEquivalencePod(pod *v1.Pod) (uint64, bool) { | ||
equivalencePod := ec.getEquivalencePod(pod) | ||
if equivalencePod != nil { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,11 +71,11 @@ const ( | |
) | ||
|
||
var ( | ||
serviceAffinitySet = sets.NewString("ServiceAffinity") | ||
maxPDVolumeCountPredicateSet = sets.NewString("MaxPDVolumeCountPredicate") | ||
matchInterPodAffinitySet = sets.NewString("MatchInterPodAffinity") | ||
generalPredicatesSets = sets.NewString("GeneralPredicates") | ||
noDiskConflictSet = sets.NewString("NoDiskConflict") | ||
serviceAffinitySet = sets.NewString("ServiceAffinity") | ||
matchInterPodAffinitySet = sets.NewString("MatchInterPodAffinity") | ||
generalPredicatesSets = sets.NewString("GeneralPredicates") | ||
noDiskConflictSet = sets.NewString("NoDiskConflict") | ||
maxPDVolumeCountPredicateKeys = []string{"MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount", "MaxEBSVolumeCount"} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought I had asked this question before, but looks like I haven't: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually the registered predicates are three separated names instead of "MaxPDVolumeCount". Otherwise we may need to check the type of volume before invalidating, while just as you pointed out, it's rare to have more than one of them as PV, so I chose to invalidate them all in most cases unless we can check the PV type very easily. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Thanks for pointing it out. |
||
) | ||
|
||
// configFactory is the default implementation of the scheduler.Configurator interface. | ||
|
@@ -384,7 +384,11 @@ func (c *configFactory) onPvDelete(obj interface{}) { | |
} | ||
|
||
func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { | ||
invalidPredicates := sets.NewString("MaxPDVolumeCountPredicate") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs to stay. You could have a PVC that points to a PV, but the PV object doesn't exist. So when the PV object gets added, we can recount. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed, and also updated other comments to clarify the invalidation op reasosn. |
||
// You could have a PVC that points to a PV, but the PV object doesn't exist. | ||
// So when the PV object gets added, we can recount. | ||
invalidPredicates := sets.NewString() | ||
|
||
// PV types which impact MaxPDVolumeCountPredicate | ||
if pv.Spec.AWSElasticBlockStore != nil { | ||
invalidPredicates.Insert("MaxEBSVolumeCount") | ||
} | ||
|
@@ -395,6 +399,14 @@ func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { | |
invalidPredicates.Insert("MaxAzureDiskVolumeCount") | ||
} | ||
|
||
// If PV contains zone related label, it may impact cached NoVolumeZoneConflict | ||
for k := range pv.ObjectMeta.Labels { | ||
if k == kubeletapis.LabelZoneFailureDomain || k == kubeletapis.LabelZoneRegion { | ||
invalidPredicates.Insert("NoVolumeZoneConflict") | ||
break | ||
} | ||
} | ||
|
||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { | ||
// Add/delete impacts the available PVs to choose from | ||
invalidPredicates.Insert(predicates.CheckVolumeBinding) | ||
|
@@ -458,24 +470,36 @@ func (c *configFactory) onPvcDelete(obj interface{}) { | |
} | ||
|
||
func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) { | ||
if pvc.Spec.VolumeName != "" { | ||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(maxPDVolumeCountPredicateSet) | ||
// We need to do this here because the ecache uses PVC uid as part of equivalence hash of pod | ||
|
||
// The bound volume type may change | ||
invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...) | ||
|
||
// The bound volume's label may change | ||
invalidPredicates.Insert("NoVolumeZoneConflict") | ||
|
||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { | ||
// Add/delete impacts the available PVs to choose from | ||
invalidPredicates.Insert(predicates.CheckVolumeBinding) | ||
} | ||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) | ||
} | ||
|
||
func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) { | ||
invalidPredicates := sets.NewString() | ||
|
||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { | ||
if old.Spec.VolumeName != new.Spec.VolumeName { | ||
if old.Spec.VolumeName != new.Spec.VolumeName { | ||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { | ||
// PVC volume binding has changed | ||
invalidPredicates.Insert(predicates.CheckVolumeBinding) | ||
} | ||
// The bound volume type may change | ||
invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...) | ||
// The bound volume's label may change | ||
invalidPredicates.Insert("NoVolumeZoneConflict") | ||
} | ||
|
||
if invalidPredicates.Len() > 0 { | ||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) | ||
} | ||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) | ||
} | ||
|
||
func (c *configFactory) onServiceAdd(obj interface{}) { | ||
|
@@ -541,7 +565,7 @@ func (c *configFactory) addPodToCache(obj interface{}) { | |
c.podQueue.AssignedPodAdded(pod) | ||
|
||
// NOTE: Updating equivalence cache of addPodToCache has been | ||
// handled optimistically in InvalidateCachedPredicateItemForPodAdd. | ||
// handled optimistically in: plugin/pkg/scheduler/scheduler.go#assume() | ||
} | ||
|
||
func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) { | ||
|
@@ -566,8 +590,8 @@ func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) { | |
|
||
func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) { | ||
if c.enableEquivalenceClassCache { | ||
// if the pod does not have binded node, updating equivalence cache is meaningless; | ||
// if pod's binded node has been changed, that case should be handled by pod add & delete. | ||
// if the pod does not have bound node, updating equivalence cache is meaningless; | ||
// if pod's bound node has been changed, that case should be handled by pod add & delete. | ||
if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName { | ||
if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) { | ||
// MatchInterPodAffinity need to be reconsidered for this node, | ||
|
@@ -898,8 +922,14 @@ func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, | |
} | ||
|
||
// Init equivalence class cache | ||
if f.enableEquivalenceClassCache && getEquivalencePodFunc != nil { | ||
f.equivalencePodCache = core.NewEquivalenceCache(getEquivalencePodFunc) | ||
if f.enableEquivalenceClassCache && getEquivalencePodFuncFactory != nil { | ||
pluginArgs, err := f.getPluginArgs() | ||
if err != nil { | ||
return nil, err | ||
} | ||
f.equivalencePodCache = core.NewEquivalenceCache( | ||
getEquivalencePodFuncFactory(*pluginArgs), | ||
) | ||
glog.Info("Created equivalence class cache") | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you also need to
*ref.Controller
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@msau42
ref.Controller
is a*bool
. Its value can be false.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah thanks, makes sense