diff --git a/cmd/base/options/qos.go b/cmd/base/options/qos.go index 46272085a..33b759c3a 100644 --- a/cmd/base/options/qos.go +++ b/cmd/base/options/qos.go @@ -106,7 +106,7 @@ func (o *QoSOptions) applyToExpandQoSEnhancement(c *generic.QoSConfiguration, po enhancementMap[strings.TrimSpace(sList[0])] = strings.TrimSpace(sList[1]) } - c.SetExpandQoSEnhancementSelector(enhancementMap) + c.SetExpandQoSEnhancementKey(enhancementMap) return nil } diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go index fa1d4d0c2..51eae5b07 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go @@ -29,14 +29,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" utilfs "k8s.io/kubernetes/pkg/util/filesystem" - "github.com/stretchr/testify/require" - "github.com/kubewharf/katalyst-api/pkg/consts" cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/calculator" @@ -679,7 +678,7 @@ func TestAllocate(t *testing.T) { as.Nil(err) if tc.enhancementDefaultValues != nil { - dynamicPolicy.qosConfig.EnhancementDefaultValues = tc.enhancementDefaultValues + dynamicPolicy.qosConfig.QoSEnhancementDefaultValues = tc.enhancementDefaultValues } resp, err := dynamicPolicy.Allocate(context.Background(), tc.req) @@ -1095,7 +1094,7 @@ func TestGetTopologyHints(t *testing.T) { as.Nil(err) if tc.enhancementDefaultValues != nil { - dynamicPolicy.qosConfig.EnhancementDefaultValues = tc.enhancementDefaultValues + dynamicPolicy.qosConfig.QoSEnhancementDefaultValues = tc.enhancementDefaultValues } resp, err := dynamicPolicy.GetTopologyHints(context.Background(), tc.req) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go index a45cc0dff..3bed3a64e 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go @@ -32,7 +32,6 @@ import ( apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-api/pkg/plugins/skeleton" - "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent/qrm" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc" diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go index e2d407e0f..f563a8cc2 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go @@ -823,7 +823,7 @@ func TestAllocate(t *testing.T) { as.Nil(err) if tc.enhancementDefaultValues != nil { - dynamicPolicy.qosConfig.EnhancementDefaultValues = tc.enhancementDefaultValues + dynamicPolicy.qosConfig.QoSEnhancementDefaultValues = tc.enhancementDefaultValues } dynamicPolicy.enableMemoryAdvisor = true @@ -1238,7 +1238,7 @@ func TestGetTopologyHints(t *testing.T) { as.Nil(err) if tc.enhancementDefaultValues != nil { - dynamicPolicy.qosConfig.EnhancementDefaultValues = tc.enhancementDefaultValues + dynamicPolicy.qosConfig.QoSEnhancementDefaultValues = tc.enhancementDefaultValues } resp, err := dynamicPolicy.GetTopologyHints(context.Background(), tc.req) diff --git a/pkg/agent/qrm-plugins/network/staticpolicy/policy.go b/pkg/agent/qrm-plugins/network/staticpolicy/policy.go index 93a8953bf..ca6076509 100644 --- a/pkg/agent/qrm-plugins/network/staticpolicy/policy.go +++ b/pkg/agent/qrm-plugins/network/staticpolicy/policy.go @@ -962,7 +962,7 @@ func (p *StaticPolicy) getNetClassID(podAnnotations map[string]string, podLevelN return classID, nil } - qosLevel, err := p.qosConfig.GetQoSLevel(podAnnotations) + qosLevel, err := p.qosConfig.GetQoSLevel(nil, podAnnotations) if err != nil { return 0, err } diff --git a/pkg/agent/qrm-plugins/util/util.go b/pkg/agent/qrm-plugins/util/util.go index 6ceccc818..a0578ff88 100644 --- a/pkg/agent/qrm-plugins/util/util.go +++ b/pkg/agent/qrm-plugins/util/util.go @@ -80,7 +80,7 @@ func GetKatalystQoSLevelFromResourceReq(qosConf *generic.QoSConfiguration, req * } var getErr error - qosLevel, getErr = qosConf.GetQoSLevel(req.Annotations) + qosLevel, getErr = qosConf.GetQoSLevel(nil, req.Annotations) if getErr != nil { err = fmt.Errorf("resource type mismatches: %v", getErr) return @@ -91,12 +91,7 @@ func GetKatalystQoSLevelFromResourceReq(qosConf *generic.QoSConfiguration, req * req.Annotations = make(map[string]string) } req.Annotations[apiconsts.PodAnnotationQoSLevelKey] = qosLevel - parsedAnnotations, err := qosConf.FilterQoSAndEnhancement(req.Annotations) - if err != nil { - err = fmt.Errorf("ParseKatalystAnnotations failed with error: %v", err) - return - } - req.Annotations = parsedAnnotations + req.Annotations = qosConf.FilterQoSAndEnhancementMap(req.Annotations) if req.Labels == nil { req.Labels = make(map[string]string) diff --git a/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/borwein_test.go b/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/borwein_test.go index cdad3578c..f0d2892b6 100644 --- a/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/borwein_test.go +++ b/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/borwein_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "google.golang.org/grpc" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -36,8 +37,6 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" - "github.com/stretchr/testify/require" - internalfake "github.com/kubewharf/katalyst-api/pkg/client/clientset/versioned/fake" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" diff --git a/pkg/config/generic/qos.go b/pkg/config/generic/qos.go index 1e1ba7d99..559d536a2 100644 --- a/pkg/config/generic/qos.go +++ b/pkg/config/generic/qos.go @@ -17,6 +17,7 @@ limitations under the License. package generic import ( + "encoding/json" "fmt" "sync" @@ -26,13 +27,20 @@ import ( apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" "github.com/kubewharf/katalyst-core/pkg/util/general" - "github.com/kubewharf/katalyst-core/pkg/util/qos/helper" ) // defaultQoSLevel willed b e use as default QoS Level if nothing in annotation const defaultQoSLevel = apiconsts.PodAnnotationQoSLevelSharedCores -type qosValidationFunc func(map[string]string) (bool, error) +type qosValidationFunc func(pod *v1.Pod, annotation map[string]string) (bool, error) + +// validQosKey contains all the qos-level that Katalyst supports +var validQosKey = sets.NewString( + apiconsts.PodAnnotationQoSLevelSharedCores, + apiconsts.PodAnnotationQoSLevelDedicatedCores, + apiconsts.PodAnnotationQoSLevelReclaimedCores, + apiconsts.PodAnnotationQoSLevelSystemCores, +) // validQosEnhancementKey contains all the enhancement that Katalyst supports var validQosEnhancementKey = sets.NewString( @@ -58,17 +66,17 @@ type QoSConfiguration struct { // QoSEnhancementAnnotationKey is used as an expanded way to match legacy specified // QoS annotations into standard katalyst QoS enhancement - QoSEnhancementAnnotationSelector map[string]string - - // qosCheckFunc is used as a syntactic sugar to easily walk through - // all QoS Level validation functions - qosCheckFuncMap map[string]qosValidationFunc + QoSEnhancementAnnotationKey map[string]string // for different situation, there may be different default values for enhancement keys // we use options to control those different values // the key here is specific enhancement key such as "numa_binding", "numa_exclusive" // the value is the default value of the key - EnhancementDefaultValues map[string]string + QoSEnhancementDefaultValues map[string]string + + // qosCheckFunc is used as a syntactic sugar to easily walk through + // all QoS Level validation functions + qosCheckFuncMap map[string]qosValidationFunc } // NewQoSConfiguration creates a new qos configuration. @@ -88,8 +96,8 @@ func NewQoSConfiguration() *QoSConfiguration { apiconsts.PodAnnotationQoSLevelKey: apiconsts.PodAnnotationQoSLevelSystemCores, }, }, - QoSEnhancementAnnotationSelector: make(map[string]string), - EnhancementDefaultValues: make(map[string]string), + QoSEnhancementAnnotationKey: make(map[string]string), + QoSEnhancementDefaultValues: make(map[string]string), } c.qosCheckFuncMap = map[string]qosValidationFunc{ @@ -101,36 +109,34 @@ func NewQoSConfiguration() *QoSConfiguration { return c } -// FilterQoSAndEnhancement filter map that are related to katalyst QoS and katalyst Enhancement. -// for enhancements,we should unmarshal and store the unmarshal key-value. -// it works both for default katalyst QoS keys and expanded QoS keys. -func (c *QoSConfiguration) FilterQoSAndEnhancement(annotations map[string]string) (map[string]string, error) { - filteredAnnotations := c.FilterQoSMap(annotations) - wrappedEnhancements := c.GetQoSEnhancements(annotations) +func (c *QoSConfiguration) SetExpandQoSLevelSelector(qosLevel string, selectorMap map[string]string) { + if _, ok := c.qosCheckFuncMap[qosLevel]; !ok { + return + } - c.RLock() - defer c.RUnlock() + c.Lock() + defer c.Unlock() + c.QoSClassAnnotationSelector[qosLevel] = general.MergeMap(c.QoSClassAnnotationSelector[qosLevel], selectorMap) +} - for _, enhancementKey := range validQosEnhancementKey.List() { - enhancementKVs := helper.ParseKatalystQOSEnhancement(wrappedEnhancements, annotations, enhancementKey) - for key, val := range enhancementKVs { - if filteredAnnotations[key] != "" { - general.Warningf("get enhancements %s:%s from %s, but the kv already exists: %s:%s", - key, val, enhancementKey, key, filteredAnnotations[key]) - } - filteredAnnotations[key] = val - } - } +func (c *QoSConfiguration) SetExpandQoSEnhancementKey(enhancementKeys map[string]string) { + c.Lock() + defer c.Unlock() - for enhancementKey, defaultValue := range c.EnhancementDefaultValues { - if _, found := filteredAnnotations[enhancementKey]; !found { - general.Infof("enhancementKey: %s isn't declared, set its value to defaultValue: %s", - enhancementKey, defaultValue) - filteredAnnotations[enhancementKey] = defaultValue + for defaultKey, expandedKey := range enhancementKeys { + if validQosEnhancementKey.Has(defaultKey) { + c.QoSEnhancementAnnotationKey[expandedKey] = defaultKey } } +} + +// SetEnhancementDefaultValues set default values for enhancement keys +// because sometimes we need different default values for enhancement keys in different types of clusters +func (c *QoSConfiguration) SetEnhancementDefaultValues(enhancementDefaultValues map[string]string) { + c.Lock() + defer c.Unlock() - return filteredAnnotations, nil + c.QoSEnhancementDefaultValues = general.MergeMap(c.QoSEnhancementDefaultValues, enhancementDefaultValues) } // FilterQoSMap filter map that are related to katalyst QoS. @@ -150,47 +156,66 @@ func (c *QoSConfiguration) FilterQoSMap(annotations map[string]string) map[strin return filteredAnnotations } -func (c *QoSConfiguration) SetExpandQoSLevelSelector(qosLevel string, selectorMap map[string]string) { - if _, ok := c.qosCheckFuncMap[qosLevel]; !ok { - return +// FilterQoSEnhancementMap filter map that are related to katalyst Enhancement. +// for enhancements,we should unmarshal and store the unmarshal key-value. +// it works both for default katalyst QoS keys and expanded QoS keys. +func (c *QoSConfiguration) FilterQoSEnhancementMap(annotations map[string]string) map[string]string { + c.RLock() + defer c.RUnlock() + + filteredAnnotations := make(map[string]string) + + for _, enhancementKey := range validQosEnhancementKey.List() { + enhancementKVs := c.GetQoSEnhancementKVs(nil, annotations, enhancementKey) + for key, val := range enhancementKVs { + if filteredAnnotations[key] != "" { + general.Warningf("get enhancements %s:%s from %s, but the kv already exists: %s:%s", + key, val, enhancementKey, key, filteredAnnotations[key]) + } + filteredAnnotations[key] = val + } } - c.Lock() - defer c.Unlock() - c.QoSClassAnnotationSelector[qosLevel] = general.MergeMap(c.QoSClassAnnotationSelector[qosLevel], selectorMap) + for enhancementKey, defaultValue := range c.QoSEnhancementDefaultValues { + if _, found := filteredAnnotations[enhancementKey]; !found { + general.Infof("enhancementKey: %s isn't declared, set its value to defaultValue: %s", + enhancementKey, defaultValue) + filteredAnnotations[enhancementKey] = defaultValue + } + } + + return filteredAnnotations +} + +func (c *QoSConfiguration) FilterQoSAndEnhancementMap(annotations map[string]string) map[string]string { + return general.MergeMap(c.FilterQoSMap(annotations), c.FilterQoSEnhancementMap(annotations)) } func (c *QoSConfiguration) GetQoSLevelForPod(pod *v1.Pod) (string, error) { - if pod == nil { - return "", fmt.Errorf("nil pod") - } - return c.GetQoSLevel(pod.Annotations) + return c.GetQoSLevel(pod, map[string]string{}) } // GetQoSLevel returns the standard katalyst QoS Level for given annotations; // - returns error if there is conflict in qos level annotations or can't get valid qos level. // - returns defaultQoSLevel if nothing matches and isNotDefaultQoSLevel is false. -func (c *QoSConfiguration) GetQoSLevel(annotations map[string]string) (qosLevel string, retErr error) { +func (c *QoSConfiguration) GetQoSLevel(pod *v1.Pod, expandedAnnotations map[string]string) (qosLevel string, retErr error) { + annotations := MergeAnnotations(pod, expandedAnnotations) + defer func() { if retErr != nil { return } - // redirect qos-level according to user-specified qos judgement function - qosLevelUpdater := helper.GetQoSLevelUpdateFunc() - if qosLevelUpdater != nil { - updatedQoSLevel := qosLevelUpdater(qosLevel, annotations) - if updatedQoSLevel != qosLevel { - general.Infof("update qosLevel from %s to %s", qosLevel, updatedQoSLevel) - } - qosLevel = updatedQoSLevel + overrideQoSLevel, ok := getQoSLevelExpander().Override(qosLevel, pod, annotations) + if ok { + general.Infof("update qosLevel from %s to %s", qosLevel, overrideQoSLevel) + qosLevel = overrideQoSLevel } }() isNotDefaultQoSLevel := false - for qos := range c.QoSClassAnnotationSelector { + for qos := range validQosKey { identified, matched, err := c.checkQosMatched(annotations, qos) - if err != nil { general.Errorf("check qos level %v for annotation failed: %v", qos, err) return "", err @@ -209,55 +234,14 @@ func (c *QoSConfiguration) GetQoSLevel(annotations map[string]string) (qosLevel return defaultQoSLevel, nil } -func (c *QoSConfiguration) SetExpandQoSEnhancementSelector(enhancementAdapter map[string]string) { - c.Lock() - defer c.Unlock() - - for defaultKey, expandedKey := range enhancementAdapter { - if validQosEnhancementKey.Has(defaultKey) { - c.QoSEnhancementAnnotationSelector[expandedKey] = defaultKey - } - } -} - -func (c *QoSConfiguration) GetQoSEnhancementsForPod(pod *v1.Pod) map[string]string { - if pod == nil { - return map[string]string{} - } - return c.GetQoSEnhancements(pod.Annotations) -} - -// GetQoSEnhancements returns the standard katalyst QoS Enhancement Map for given annotations; -// - ignore conflict cases: default enhancement key always prior to expand enhancement key -func (c *QoSConfiguration) GetQoSEnhancements(annotations map[string]string) map[string]string { - res := make(map[string]string) - - c.RLock() - defer c.RUnlock() - for k, v := range annotations { - if validQosEnhancementKey.Has(k) { - res[k] = v - } else if defaultK, ok := c.QoSEnhancementAnnotationSelector[k]; ok { - if _, exist := res[defaultK]; !exist { - res[defaultK] = v - } - } - } - - return res -} - func (c *QoSConfiguration) CheckReclaimedQoSForPod(pod *v1.Pod) (bool, error) { - if pod == nil { - return false, nil - } - return c.CheckReclaimedQoS(pod.Annotations) + return c.CheckReclaimedQoS(pod, map[string]string{}) } // CheckReclaimedQoS returns true if the annotation indicates for ReclaimedCores; // - returns error if different QoS configurations conflict with each other. -func (c *QoSConfiguration) CheckReclaimedQoS(annotations map[string]string) (bool, error) { - if qosLevel, err := c.GetQoSLevel(annotations); err != nil { +func (c *QoSConfiguration) CheckReclaimedQoS(pod *v1.Pod, expandedAnnotations map[string]string) (bool, error) { + if qosLevel, err := c.GetQoSLevel(pod, expandedAnnotations); err != nil { return false, err } else { return qosLevel == apiconsts.PodAnnotationQoSLevelReclaimedCores, nil @@ -265,16 +249,13 @@ func (c *QoSConfiguration) CheckReclaimedQoS(annotations map[string]string) (boo } func (c *QoSConfiguration) CheckSharedQoSForPod(pod *v1.Pod) (bool, error) { - if pod == nil { - return false, nil - } - return c.CheckSharedQoS(pod.Annotations) + return c.CheckSharedQoS(pod, map[string]string{}) } // CheckSharedQoS returns true if the annotation indicates for SharedCores; // - returns error if different QoS configurations conflict with each other. -func (c *QoSConfiguration) CheckSharedQoS(annotations map[string]string) (bool, error) { - if qosLevel, err := c.GetQoSLevel(annotations); err != nil { +func (c *QoSConfiguration) CheckSharedQoS(pod *v1.Pod, expandedAnnotations map[string]string) (bool, error) { + if qosLevel, err := c.GetQoSLevel(pod, expandedAnnotations); err != nil { return false, err } else { return qosLevel == apiconsts.PodAnnotationQoSLevelSharedCores, nil @@ -282,16 +263,13 @@ func (c *QoSConfiguration) CheckSharedQoS(annotations map[string]string) (bool, } func (c *QoSConfiguration) CheckDedicatedQoSForPod(pod *v1.Pod) (bool, error) { - if pod == nil { - return false, nil - } - return c.CheckDedicatedQoS(pod.Annotations) + return c.CheckDedicatedQoS(pod, map[string]string{}) } // CheckDedicatedQoS returns true if the annotation indicates for DedicatedCores; // - returns error if different QoS configurations conflict with each other. -func (c *QoSConfiguration) CheckDedicatedQoS(annotations map[string]string) (bool, error) { - if qosLevel, err := c.GetQoSLevel(annotations); err != nil { +func (c *QoSConfiguration) CheckDedicatedQoS(pod *v1.Pod, expandedAnnotations map[string]string) (bool, error) { + if qosLevel, err := c.GetQoSLevel(pod, expandedAnnotations); err != nil { return false, err } else { return qosLevel == apiconsts.PodAnnotationQoSLevelDedicatedCores, nil @@ -299,30 +277,19 @@ func (c *QoSConfiguration) CheckDedicatedQoS(annotations map[string]string) (boo } func (c *QoSConfiguration) CheckSystemQoSForPod(pod *v1.Pod) (bool, error) { - if pod == nil { - return false, nil - } - return c.CheckSystemQoS(pod.Annotations) + return c.CheckSystemQoS(pod, map[string]string{}) } // CheckSystemQoS returns true if the annotation indicates for SystemCores; // - returns error if different QoS configurations conflict with each other. -func (c *QoSConfiguration) CheckSystemQoS(annotations map[string]string) (bool, error) { - if qosLevel, err := c.GetQoSLevel(annotations); err != nil { +func (c *QoSConfiguration) CheckSystemQoS(pod *v1.Pod, expandedAnnotations map[string]string) (bool, error) { + if qosLevel, err := c.GetQoSLevel(pod, expandedAnnotations); err != nil { return false, err } else { return qosLevel == apiconsts.PodAnnotationQoSLevelSystemCores, nil } } -// SetEnhancementDefaultValues set default values for enhancement keys -// because sometimes we need different default values for enhancement keys in different types of clusters -func (c *QoSConfiguration) SetEnhancementDefaultValues(enhancementDefaultValues map[string]string) { - c.Lock() - defer c.Unlock() - c.EnhancementDefaultValues = general.MergeMap(c.EnhancementDefaultValues, enhancementDefaultValues) -} - // checkQosMatched is a unified helper function to judge whether annotation // matches with the given QoS Level; // return @@ -342,7 +309,8 @@ func (c *QoSConfiguration) checkQosMatched(annotations map[string]string, qosVal if valueEqualCnt > 0 { // some key-value list match while others don't if valueNotEqualCnt > 0 { - return false, false, fmt.Errorf("qos %v conflicts, matched count %v, mis matched count %v", qosValue, valueEqualCnt, valueNotEqualCnt) + return false, false, + fmt.Errorf("qos %v conflicts, matched count %v, mis matched count %v", qosValue, valueEqualCnt, valueNotEqualCnt) } // some key-value list match and some key may not exist return true, true, nil @@ -354,24 +322,78 @@ func (c *QoSConfiguration) checkQosMatched(annotations map[string]string, qosVal } func (c *QoSConfiguration) GetSpecifiedPoolNameForPod(pod *v1.Pod) (string, error) { - if pod == nil { - return "", fmt.Errorf("nil pod") - } - return c.GetSpecifiedPoolName(c.GetQoSEnhancementsForPod(pod), pod.Annotations) + return c.GetSpecifiedPoolName(pod, map[string]string{}) } // GetSpecifiedPoolName returns the specified cpuset pool name for given enhancements and annotations; -func (c *QoSConfiguration) GetSpecifiedPoolName(enhancements, annotations map[string]string) (string, error) { - qosLevel, err := c.GetQoSLevel(annotations) +func (c *QoSConfiguration) GetSpecifiedPoolName(pod *v1.Pod, expandedAnnotations map[string]string) (string, error) { + qosLevel, err := c.GetQoSLevel(pod, expandedAnnotations) if err != nil { return "", fmt.Errorf("GetQoSLevel failed with error: %v", err) } - enhancementKVs := helper.ParseKatalystQOSEnhancement(enhancements, annotations, - apiconsts.PodAnnotationCPUEnhancementKey) + enhancementKVs := c.GetQoSEnhancementKVs(pod, expandedAnnotations, apiconsts.PodAnnotationCPUEnhancementKey) return state.GetSpecifiedPoolName(qosLevel, enhancementKVs[apiconsts.PodAnnotationCPUEnhancementCPUSet]), nil } +// GetQoSEnhancementKVs parses enhancements from annotations by given key, +// since enhancement values are stored as k-v, so we should unmarshal it into maps. +func (c *QoSConfiguration) GetQoSEnhancementKVs(pod *v1.Pod, expandedAnnotations map[string]string, enhancementKey string) (flattenedEnhancements map[string]string) { + annotations := MergeAnnotations(pod, expandedAnnotations) + + defer func() { + overrideFlattenedEnhancements, ok := getQoSEnhancementExpander().Override(flattenedEnhancements, pod, annotations) + if ok { + general.Infof("update enhancements from %+v to %+v", flattenedEnhancements, overrideFlattenedEnhancements) + flattenedEnhancements = overrideFlattenedEnhancements + } + }() + + flattenedEnhancements = map[string]string{} + enhancementValue, ok := annotations[enhancementKey] + if !ok { + return + } + + err := json.Unmarshal([]byte(enhancementValue), &flattenedEnhancements) + if err != nil { + general.Errorf("parse enhancement %s failed: %v", enhancementKey, err) + return + } + return flattenedEnhancements +} + +// GetQoSEnhancements returns the standard katalyst QoS Enhancement Map for given annotations; +// - ignore conflict cases: default enhancement key always prior to expand enhancement key +func (c *QoSConfiguration) getQoSEnhancements(annotations map[string]string) map[string]string { + res := make(map[string]string) + + c.RLock() + defer c.RUnlock() + for k, v := range annotations { + if validQosEnhancementKey.Has(k) { + res[k] = v + } else if defaultK, ok := c.QoSEnhancementAnnotationKey[k]; ok { + if _, exist := res[defaultK]; !exist { + res[defaultK] = v + } + } + } + + return res +} + +func MergeAnnotations(pod *v1.Pod, expandAnnotations map[string]string) map[string]string { + if pod == nil { + if expandAnnotations == nil { + return map[string]string{} + } + return expandAnnotations + } else { + return general.MergeMap(pod.Annotations, expandAnnotations) + } +} + // checkKeyValueMatched checks whether the given key-value pair exists in the map // if the returns value equals 1, it represents // - key not exists diff --git a/pkg/config/generic/qos_expander.go b/pkg/config/generic/qos_expander.go new file mode 100644 index 000000000..eebe2a96d --- /dev/null +++ b/pkg/config/generic/qos_expander.go @@ -0,0 +1,77 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "sync" + + v1 "k8s.io/api/core/v1" +) + +// QoSLevelExpander provides a mechanism for user-specified qos judgement +// since we may need to set qos-level for some customized cases +type QoSLevelExpander interface { + Override(qosLevel string, pod *v1.Pod, expandedAnnotations map[string]string) (string, bool) +} + +type dummyQoSLevelExpander struct{} + +func (d dummyQoSLevelExpander) Override(qos string, _ *v1.Pod, _ map[string]string) (string, bool) { + return qos, false +} + +var qosLevelExpander QoSLevelExpander = dummyQoSLevelExpander{} +var qosLevelExpanderLock sync.RWMutex + +func SetQoSLevelExpander(e QoSLevelExpander) { + qosLevelExpanderLock.Lock() + qosLevelExpander = e + qosLevelExpanderLock.Unlock() +} + +func getQoSLevelExpander() QoSLevelExpander { + qosLevelExpanderLock.RLock() + defer qosLevelExpanderLock.RUnlock() + return qosLevelExpander +} + +// QoSEnhancementExpander provides a mechanism for user-specified qos-enhancement judgement +// since we may need to set qos-enhancement for some customized cases +type QoSEnhancementExpander interface { + Override(flattenedEnhancements map[string]string, pod *v1.Pod, expandedAnnotations map[string]string) (map[string]string, bool) +} + +type dummyQoSEnhancementExpander struct{} + +func (d dummyQoSEnhancementExpander) Override(flattenedEnhancements map[string]string, _ *v1.Pod, _ map[string]string) (map[string]string, bool) { + return flattenedEnhancements, false +} + +var qosEnhancementExpander QoSEnhancementExpander = dummyQoSEnhancementExpander{} +var qosEnhancementExpanderLock sync.RWMutex + +func SetQoSEnhancementExpander(e QoSEnhancementExpander) { + qosEnhancementExpanderLock.Lock() + qosEnhancementExpander = e + qosEnhancementExpanderLock.Unlock() +} + +func getQoSEnhancementExpander() QoSEnhancementExpander { + qosEnhancementExpanderLock.RLock() + defer qosEnhancementExpanderLock.RUnlock() + return qosEnhancementExpander +} diff --git a/pkg/controller/spd/spd.go b/pkg/controller/spd/spd.go index 7dde9016a..b1b26595a 100644 --- a/pkg/controller/spd/spd.go +++ b/pkg/controller/spd/spd.go @@ -551,7 +551,7 @@ func (sc *SPDController) defaultBaselinePercent(workload *unstructured.Unstructu general.ErrorS(err, "failed to GetUnstructuredTemplateAnnotations") return pointer.Int32(100) } - qosLevel, err := sc.qosConfig.GetQoSLevel(annotations) + qosLevel, err := sc.qosConfig.GetQoSLevel(nil, annotations) if err != nil { general.ErrorS(err, "failed to GetQoSLevel") return pointer.Int32(100) diff --git a/pkg/scheduler/util/qos.go b/pkg/scheduler/util/qos.go index 398711e75..69ab0eac9 100644 --- a/pkg/scheduler/util/qos.go +++ b/pkg/scheduler/util/qos.go @@ -20,12 +20,10 @@ import ( "encoding/json" "sync" + v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "github.com/kubewharf/katalyst-api/pkg/consts" - - v1 "k8s.io/api/core/v1" - "github.com/kubewharf/katalyst-core/pkg/config/generic" ) diff --git a/pkg/util/qos/cpu_enhancement.go b/pkg/util/qos/cpu_enhancement.go index 61ee51058..c753f518a 100644 --- a/pkg/util/qos/cpu_enhancement.go +++ b/pkg/util/qos/cpu_enhancement.go @@ -25,20 +25,18 @@ import ( "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/config/generic" - "github.com/kubewharf/katalyst-core/pkg/util/qos/helper" ) // GetPodCPUSuppressionToleranceRate parses cpu suppression tolerance rate for the given pod, // and cpu suppression is only supported for reclaim pods. if the given is not nominated with // cpu suppression, return max to indicate that it can be suppressed for any degree. func GetPodCPUSuppressionToleranceRate(qosConf *generic.QoSConfiguration, pod *v1.Pod) (float64, error) { - qosLevel, _ := qosConf.GetQoSLevelForPod(pod) + qosLevel, _ := qosConf.GetQoSLevel(pod, map[string]string{}) if qosLevel != consts.PodAnnotationQoSLevelReclaimedCores { return 0, fmt.Errorf("qos level %s not support cpu suppression", qosLevel) } - cpuEnhancement := helper.ParseKatalystQOSEnhancement(qosConf.GetQoSEnhancementsForPod(pod), pod.Annotations, - consts.PodAnnotationCPUEnhancementKey) + cpuEnhancement := qosConf.GetQoSEnhancementKVs(pod, map[string]string{}, consts.PodAnnotationCPUEnhancementKey) suppressionToleranceRateStr, ok := cpuEnhancement[consts.PodAnnotationCPUEnhancementSuppressionToleranceRate] if ok { suppressionToleranceRate, err := strconv.ParseFloat(suppressionToleranceRateStr, 64) diff --git a/pkg/util/qos/helper/enhancement_helper.go b/pkg/util/qos/helper/enhancement_helper.go deleted file mode 100644 index a0bf09f94..000000000 --- a/pkg/util/qos/helper/enhancement_helper.go +++ /dev/null @@ -1,78 +0,0 @@ -/* -Copyright 2022 The Katalyst Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package helper - -import ( - "encoding/json" - "sync" - - "github.com/kubewharf/katalyst-core/pkg/util/general" -) - -// EnhancementUpdateFunc provides a mechanism for user-specified enhancement kvs for specific key -// since we may need to set qos-level for some customized cases -type EnhancementUpdateFunc func(enhancementKVs, podAnnotations map[string]string) map[string]string - -var enhancementUpdaters sync.Map - -func RegisterEnhancementUpdateFunc(enhancementKey string, f EnhancementUpdateFunc) { - enhancementUpdaters.Store(enhancementKey, f) -} - -func GetRegisteredEnhancementUpdateFuncs() map[string]EnhancementUpdateFunc { - updaters := make(map[string]EnhancementUpdateFunc) - enhancementUpdaters.Range(func(key, value interface{}) bool { - updaters[key.(string)] = value.(EnhancementUpdateFunc) - return true - }) - return updaters -} - -func GetRegisteredEnhancementUpdateFunc(enhancementKey string) EnhancementUpdateFunc { - value, ok := enhancementUpdaters.Load(enhancementKey) - if !ok { - return nil - } - return value.(EnhancementUpdateFunc) -} - -// ParseKatalystQOSEnhancement parses enhancements from annotations by given key, -// since enhancement values are stored as k-v, so we should unmarshal it into maps. -func ParseKatalystQOSEnhancement(enhancements, podAnnotations map[string]string, enhancementKey string) (enhancementConfig map[string]string) { - defer func() { - updatedEnhancementConfig := enhancementConfig - enhancementUpdateFunc := GetRegisteredEnhancementUpdateFunc(enhancementKey) - if enhancementUpdateFunc != nil { - updatedEnhancementConfig = enhancementUpdateFunc(enhancementConfig, podAnnotations) - general.Infof("update enhancementConfig from %+v to %+v", enhancementConfig, updatedEnhancementConfig) - } - enhancementConfig = updatedEnhancementConfig - }() - - enhancementValue, ok := enhancements[enhancementKey] - if !ok { - return nil - } - - enhancementConfig = map[string]string{} - err := json.Unmarshal([]byte(enhancementValue), &enhancementConfig) - if err != nil { - general.Errorf("parse enhancement %s failed: %v", enhancementKey, err) - return nil - } - return enhancementConfig -} diff --git a/pkg/util/qos/helper/qos_helper.go b/pkg/util/qos/helper/qos_helper.go deleted file mode 100644 index 939399d0b..000000000 --- a/pkg/util/qos/helper/qos_helper.go +++ /dev/null @@ -1,40 +0,0 @@ -/* -Copyright 2022 The Katalyst Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package helper - -import ( - "sync" -) - -// QoSLevelUpdateFunc provides a mechanism for user-specified qos judgement -// since we may need to set qos-level for some customized cases -type QoSLevelUpdateFunc func(qosLevel string, podAnnotations map[string]string) string - -var qosLevelUpdater QoSLevelUpdateFunc -var qosLevelUpdaterLock sync.RWMutex - -func SetQoSLevelUpdateFunc(f QoSLevelUpdateFunc) { - qosLevelUpdaterLock.Lock() - qosLevelUpdater = f - qosLevelUpdaterLock.Unlock() -} - -func GetQoSLevelUpdateFunc() QoSLevelUpdateFunc { - qosLevelUpdaterLock.RLock() - defer qosLevelUpdaterLock.RUnlock() - return qosLevelUpdater -} diff --git a/pkg/util/qos/mem_enhancement.go b/pkg/util/qos/mem_enhancement.go index d169e53ae..457286d6d 100644 --- a/pkg/util/qos/mem_enhancement.go +++ b/pkg/util/qos/mem_enhancement.go @@ -23,19 +23,18 @@ import ( apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/config/generic" - "github.com/kubewharf/katalyst-core/pkg/util/qos/helper" ) func ParseMemoryEnhancement(qosConf *generic.QoSConfiguration, pod *v1.Pod) map[string]string { if pod == nil || qosConf == nil { return nil } - return helper.ParseKatalystQOSEnhancement(qosConf.GetQoSEnhancementsForPod(pod), pod.Annotations, apiconsts.PodAnnotationMemoryEnhancementKey) + return qosConf.GetQoSEnhancementKVs(pod, map[string]string{}, apiconsts.PodAnnotationMemoryEnhancementKey) } // IsPodNumaBinding checks whether the pod needs numa-binding func IsPodNumaBinding(qosConf *generic.QoSConfiguration, pod *v1.Pod) bool { - isDedicatedPod, err := qosConf.CheckDedicatedQoSForPod(pod) + isDedicatedPod, err := qosConf.CheckDedicatedQoS(pod, map[string]string{}) if err != nil || !isDedicatedPod { return false }