Skip to content

Commit

Permalink
feat: support node resource topology scheduler plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
WangZzzhe committed Oct 7, 2023
1 parent 435e927 commit d292543
Show file tree
Hide file tree
Showing 28 changed files with 3,712 additions and 11 deletions.
2 changes: 2 additions & 0 deletions cmd/katalyst-scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/component-base/logs"

"github.com/kubewharf/katalyst-core/cmd/katalyst-scheduler/app"
"github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/noderesourcetopology"
"github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/qosawarenoderesources"

// Ensure scheme package is initialized.
Expand All @@ -36,6 +37,7 @@ func main() {
command := app.NewSchedulerCommand(
app.WithPlugin(qosawarenoderesources.FitName, qosawarenoderesources.NewFit),
app.WithPlugin(qosawarenoderesources.BalancedAllocationName, qosawarenoderesources.NewBalancedAllocation),
app.WithPlugin(noderesourcetopology.TopologyMatchName, noderesourcetopology.New),
)

if err := runCommand(command); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
go.uber.org/atomic v1.7.0
golang.org/x/sys v0.7.0
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
gonum.org/v1/gonum v0.6.2
google.golang.org/grpc v1.51.0
k8s.io/api v0.24.6
k8s.io/apimachinery v0.24.6
Expand Down Expand Up @@ -141,6 +142,7 @@ require (
)

replace (
github.com/kubewharf/katalyst-api => github.com/WangZzzhe/katalyst-api v0.0.0-20231007050042-c874884f6194
k8s.io/api => k8s.io/api v0.24.6
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6
k8s.io/apimachinery => k8s.io/apimachinery v0.24.6
Expand All @@ -162,7 +164,7 @@ replace (
k8s.io/kube-proxy => k8s.io/kube-proxy v0.24.6
k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.24.6
k8s.io/kubectl => k8s.io/kubectl v0.24.6
k8s.io/kubelet => github.com/kubewharf/kubelet v1.24.6-kubewharf.7
k8s.io/kubelet => github.com/WangZzzhe/kubelet v0.0.0-20230921082135-659f02530842
k8s.io/kubernetes => k8s.io/kubernetes v1.24.6
k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.24.6
k8s.io/metrics => k8s.io/metrics v0.24.6
Expand Down
11 changes: 7 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/WangZzzhe/katalyst-api v0.0.0-20231007050042-c874884f6194 h1:JnDoiwOId+94AAYEZb8MjWmgKEdbTbVW55BnWuvgmMs=
github.com/WangZzzhe/katalyst-api v0.0.0-20231007050042-c874884f6194/go.mod h1:iXRZxVyry2XhEzVDpcp24c3BAgZySPvxNyjSpQnWylY=
github.com/WangZzzhe/kubelet v0.0.0-20230921082135-659f02530842 h1:b3evJL9bAgi/i9dzA/7Sr+T1nPv5V8oDVquH9y0yJg0=
github.com/WangZzzhe/kubelet v0.0.0-20230921082135-659f02530842/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -543,10 +547,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.1.17-0.20230926151705-2bf76444f84a h1:+d7MWCTk1uCaQ833TufEfd3IJ5ZYc1nq0l5/XAABqYc=
github.com/kubewharf/katalyst-api v0.1.17-0.20230926151705-2bf76444f84a/go.mod h1:iVILS5UL5PRtkUPH2Iu1K/gFGTPMNItnth5fmQ80VGE=
github.com/kubewharf/kubelet v1.24.6-kubewharf.7 h1:zex5NjgWh3b+fk8sey5Hp/hOVoSKdqf4mJu8MeE8T4k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.7/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/libopenstorage/openstorage v1.0.0/go.mod h1:Sp1sIObHjat1BeXhfMqLZ14wnOzEhNx2YQedreMcUyc=
Expand Down Expand Up @@ -988,6 +988,7 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f h1:GrkO5AtFUU9U/1f5ctbIBXtBGeSJbWwIYfIsTcFMaX4=
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f/go.mod h1:I6l2HNBLBZEcrOoCpyKLdY2lHoRZ8lI4x60KMCQDft4=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
Expand Down Expand Up @@ -1323,8 +1324,10 @@ gomodules.xyz/orderedmap v0.1.0 h1:fM/+TGh/O1KkqGR5xjTKg6bU8OKBkg7p0Y+x/J9m8Os=
gomodules.xyz/orderedmap v0.1.0/go.mod h1:g9/TPUCm1t2gwD3j3zfV8uylyYhVdCNSi+xCEIu7yTU=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.0.0-20190331200053-3d26580ed485/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0=
gonum.org/v1/gonum v0.6.2 h1:4r+yNT0+8SWcOkXP+63H2zQbN+USnC73cjGUxnDF94Q=
gonum.org/v1/gonum v0.6.2/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/netlib v0.0.0-20190331212654-76723241ea4e h1:jRyg0XfpwWlhEV8mDfdNGBeSJM2fuyh9Yjrnd8kF2Ts=
gonum.org/v1/netlib v0.0.0-20190331212654-76723241ea4e/go.mod h1:kS+toOQn6AQKjmKJ7gzohV1XkqsFehRA2FbsbkopSuQ=
gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
Expand Down
6 changes: 4 additions & 2 deletions pkg/agent/qrm-plugins/cpu/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ limitations under the License.

package consts

import "github.com/kubewharf/katalyst-api/pkg/consts"

const (
// CPUResourcePluginPolicyNameDynamic is the name of the dynamic policy.
CPUResourcePluginPolicyNameDynamic = "dynamic"
CPUResourcePluginPolicyNameDynamic = string(consts.ResourcePluginPolicyNameDynamic)

// CPUResourcePluginPolicyNameNative is the name of the native policy.
CPUResourcePluginPolicyNameNative = "native"
CPUResourcePluginPolicyNameNative = string(consts.ResourcePluginPolicyNameNative)
)
3 changes: 2 additions & 1 deletion pkg/agent/qrm-plugins/io/staticpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"

"github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent/qrm"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
Expand All @@ -37,7 +38,7 @@ import (

const (
// IOResourcePluginPolicyNameStatic is the policy name of static io resource plugin
IOResourcePluginPolicyNameStatic = "static"
IOResourcePluginPolicyNameStatic = string(consts.ResourcePluginPolicyNameStatic)
)

// StaticPolicy is the static io policy
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import (
)

const (
MemoryResourcePluginPolicyNameDynamic = "dynamic"
MemoryResourcePluginPolicyNameDynamic = string(apiconsts.ResourcePluginPolicyNameDynamic)

memoryPluginStateFileName = "memory_plugin_state"
memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers"
Expand Down
3 changes: 1 addition & 2 deletions pkg/agent/qrm-plugins/network/staticpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

apinode "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"

"github.com/kubewharf/katalyst-api/pkg/plugins/skeleton"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/state"
Expand All @@ -52,7 +51,7 @@ import (

const (
// NetworkResourcePluginPolicyNameStatic is the policy name of static network resource plugin
NetworkResourcePluginPolicyNameStatic = "static"
NetworkResourcePluginPolicyNameStatic = string(apiconsts.ResourcePluginPolicyNameStatic)

NetworkPluginStateFileName = "network_plugin_state"

Expand Down
2 changes: 2 additions & 0 deletions pkg/consts/qrm.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ package consts
const (
// KubeletQoSResourceManagerCheckpoint is the name of the checkpoint file for kubelet QoS resource manager
KubeletQoSResourceManagerCheckpoint = "kubelet_qrm_checkpoint"

MainContainerNameAnnotationKey = "kubernetes.io/main-container-name"
)
38 changes: 38 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,41 @@ func (cache *extendedCache) GetNodeInfo(name string) (*NodeInfo, error) {

return nodeInfo, nil
}

func (cache *extendedCache) ReserveNodeResource(nodeName string, pod *v1.Pod) {
cache.mu.Lock()
defer cache.mu.Unlock()

nodeInfo, ok := cache.nodes[nodeName]
if !ok {
nodeInfo = NewNodeInfo()
}

nodeInfo.AddAssumedPod(pod)
cache.nodes[nodeName] = nodeInfo
}

func (cache *extendedCache) UnreserveNodeResource(nodeName string, pod *v1.Pod) {
cache.mu.Lock()
defer cache.mu.Unlock()

nodeInfo, ok := cache.nodes[nodeName]
if !ok {
klog.Warningf("UnreserveNodeResource fail, node %v not exist in extendedCache", nodeName)
return
}

nodeInfo.DeleteAssumedPod(pod)
}

// GetNodeResourceTopology assumedPodResource will be added to nodeResourceTopology
func (cache *extendedCache) GetNodeResourceTopology(nodeName string) *ResourceTopology {
cache.mu.RLock()
defer cache.mu.RUnlock()

nodeInfo, ok := cache.nodes[nodeName]
if !ok {
return nil
}
return nodeInfo.GetResourceTopologyCopy()
}
71 changes: 71 additions & 0 deletions pkg/scheduler/cache/nodeinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"sync"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"

apis "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
"github.com/kubewharf/katalyst-api/pkg/consts"
Expand Down Expand Up @@ -52,6 +54,13 @@ type NodeInfo struct {
// record PodInfo here since we may have the functionality to
// change pod resources.
Pods map[string]*PodInfo

// node TopologyPolicy and TopologyZones from CNR status.
// is total CNR data necessary in extendedCache ?
ResourceTopology *ResourceTopology

// record assumed pod resource util pod is watched in CNR updated events.
AssumedPodResources native.PodResource
}

// NewNodeInfo returns a ready to use empty NodeInfo object.
Expand All @@ -63,6 +72,8 @@ func NewNodeInfo() *NodeInfo {
QoSResourcesNonZeroRequested: &native.QoSResource{},
QoSResourcesAllocatable: &native.QoSResource{},
Pods: make(map[string]*PodInfo),
ResourceTopology: new(ResourceTopology),
AssumedPodResources: native.PodResource{},
}
return ni
}
Expand All @@ -72,6 +83,12 @@ func (n *NodeInfo) UpdateNodeInfo(cnr *apis.CustomNodeResource) {
n.Mutex.Lock()
defer n.Mutex.Unlock()

n.updateReclaimed(cnr)

n.updateTopology(cnr)
}

func (n *NodeInfo) updateReclaimed(cnr *apis.CustomNodeResource) {
if cnr.Status.Resources.Allocatable != nil {
beResourceList := *cnr.Status.Resources.Allocatable
if reclaimedMilliCPU, ok := beResourceList[consts.ReclaimedResourceMilliCPU]; ok {
Expand All @@ -88,6 +105,36 @@ func (n *NodeInfo) UpdateNodeInfo(cnr *apis.CustomNodeResource) {
}
}

func (n *NodeInfo) updateTopology(cnr *apis.CustomNodeResource) {
for _, topologyZone := range cnr.Status.TopologyZone {
if topologyZone.Type != apis.TopologyTypeSocket {
continue
}
for _, child := range topologyZone.Children {
if child.Type != apis.TopologyTypeNuma {
continue
}

for _, alloc := range child.Allocations {
namespace, name, _, err := native.ParseNamespaceNameUIDKey(alloc.Consumer)
if err != nil {
klog.Errorf("unexpected CNR numa consumer: %v", err)
continue
}
// delete all pod from AssumedPodResource
n.AssumedPodResources.DeletePod(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
})
}
}
}

n.ResourceTopology.Update(cnr)
}

// AddPod adds pod information to this NodeInfo.
func (n *NodeInfo) AddPod(key string, pod *v1.Pod) {
// always try to clean previous pod, and then insert
Expand Down Expand Up @@ -129,3 +176,27 @@ func (n *NodeInfo) RemovePod(key string, pod *v1.Pod) {
n.QoSResourcesNonZeroRequested.ReclaimedMilliCPU -= podInfo.QoSResourcesNonZeroRequested.ReclaimedMilliCPU
n.QoSResourcesNonZeroRequested.ReclaimedMemory -= podInfo.QoSResourcesNonZeroRequested.ReclaimedMemory
}

func (n *NodeInfo) AddAssumedPod(pod *v1.Pod) {
n.Mutex.Lock()
defer n.Mutex.Unlock()
n.AssumedPodResources.AddPod(pod)
}

func (n *NodeInfo) DeleteAssumedPod(pod *v1.Pod) {
n.Mutex.Lock()
defer n.Mutex.Unlock()

n.AssumedPodResources.DeletePod(pod)
}

func (n *NodeInfo) GetResourceTopologyCopy() *ResourceTopology {
n.Mutex.RLock()
defer n.Mutex.RUnlock()

if n.ResourceTopology == nil {
return nil
}

return n.ResourceTopology.WithPodReousrce(n.AssumedPodResources)
}
82 changes: 82 additions & 0 deletions pkg/scheduler/cache/resourcetopology.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"fmt"

"github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

type ResourceTopology struct {
TopologyZone []*v1alpha1.TopologyZone

TopologyPolicy v1alpha1.TopologyPolicy
}

func (rt *ResourceTopology) Update(nrt *v1alpha1.CustomNodeResource) {
cp := nrt.DeepCopy()

rt.TopologyZone = cp.Status.TopologyZone

rt.TopologyPolicy = cp.Status.TopologyPolicy
}

// WithPodReousrce add assumedPodResource to ResourceTopology,
// performing pessimistic overallocation across all the NUMA zones.
func (rt *ResourceTopology) WithPodReousrce(podResource native.PodResource) *ResourceTopology {
cp := rt.DeepCopy()

if len(podResource) == 0 {
return cp
}
for _, topologyZone := range cp.TopologyZone {
if topologyZone.Type != v1alpha1.TopologyTypeSocket {
continue
}
for _, child := range topologyZone.Children {
if child.Type != v1alpha1.TopologyTypeNuma {
continue
}
for key, podReq := range podResource {
copyReq := podReq.DeepCopy()
fakeAllocation := v1alpha1.Allocation{
Consumer: fmt.Sprintf("fake-consumer/%s/uid", key),
Requests: &copyReq,
}
child.Allocations = append(child.Allocations, &fakeAllocation)
}
}
}

return cp
}

func (rt *ResourceTopology) DeepCopy() *ResourceTopology {
out := new(ResourceTopology)
if rt.TopologyZone != nil {
out.TopologyZone = make([]*v1alpha1.TopologyZone, len(rt.TopologyZone))
for i := range rt.TopologyZone {
if rt.TopologyZone[i] != nil {
out.TopologyZone[i] = rt.TopologyZone[i].DeepCopy()
}
}
}
out.TopologyPolicy = rt.TopologyPolicy
return out
}
Loading

0 comments on commit d292543

Please sign in to comment.