diff --git a/go.mod b/go.mod index 839f9b36cd..4b7b620cfc 100644 --- a/go.mod +++ b/go.mod @@ -140,7 +140,7 @@ require ( ) replace ( - github.com/kubewharf/katalyst-api => github.com/kubewharf/katalyst-api v0.0.4 + github.com/kubewharf/katalyst-api => github.com/caohe/katalyst-api v0.0.0-20230423073908-4e492bfdaf40 k8s.io/api => k8s.io/api v0.24.6 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6 k8s.io/apimachinery => k8s.io/apimachinery v0.24.6 diff --git a/go.sum b/go.sum index e040e20421..b9e37626d3 100644 --- a/go.sum +++ b/go.sum @@ -126,6 +126,8 @@ github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2y github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/bombsimon/wsl/v3 v3.1.0/go.mod h1:st10JtZYLE4D5sC7b8xV4zTKZwAQjCH/Hy2Pm1FNZIc= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= +github.com/caohe/katalyst-api v0.0.0-20230423073908-4e492bfdaf40 h1:85nIuJ3Xa88SW1ufcITUMcwEj4WapTSc5YfYo0CbC2g= +github.com/caohe/katalyst-api v0.0.0-20230423073908-4e492bfdaf40/go.mod h1:N477ZHDwzROGwkee/sPGs2mo5lWqtS10hZxYfTcxB88= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -550,8 +552,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubewharf/katalyst-api v0.0.4 h1:P4ezSfAN65+fPIpoQiAcGy03EQ2gbL/R+WRZRwG7QeM= -github.com/kubewharf/katalyst-api v0.0.4/go.mod h1:N477ZHDwzROGwkee/sPGs2mo5lWqtS10hZxYfTcxB88= github.com/kubewharf/kubelet v1.24.6-kubewharf.4 h1:3lg/wqi0jqZZr60JcjDHeOpLcXMKsq3MrEq/4bmfzR8= github.com/kubewharf/kubelet v1.24.6-kubewharf.4/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c= github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8= diff --git a/pkg/agent/qrm-plugins/network/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/network/dynamicpolicy/policy.go index 5598ee1040..2ce7433f04 100644 --- a/pkg/agent/qrm-plugins/network/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/network/dynamicpolicy/policy.go @@ -17,12 +17,11 @@ package dynamicpolicy import ( "context" "fmt" - "github.com/kubewharf/katalyst-core/pkg/util/native" - "github.com/kubewharf/katalyst-core/pkg/util/qos" "strings" "sync" "time" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" @@ -38,6 +37,8 @@ import ( "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" cgroupcmutils "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager" "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/native" + "github.com/kubewharf/katalyst-core/pkg/util/qos" ) const ( @@ -183,7 +184,7 @@ func (p *DynamicPolicy) ResourceName() string { // GetTopologyHints returns hints of corresponding resources func (p *DynamicPolicy) GetTopologyHints(ctx context.Context, req *pluginapi.ResourceRequest) (*pluginapi.ResourceHintsResponse, error) { - return nil, nil + return &pluginapi.ResourceHintsResponse{}, nil } func (p *DynamicPolicy) RemovePod(ctx context.Context, req *pluginapi.RemovePodRequest) (*pluginapi.RemovePodResponse, error) { @@ -204,17 +205,17 @@ func (p *DynamicPolicy) RemovePod(ctx context.Context, req *pluginapi.RemovePodR // GetResourcesAllocation returns allocation results of corresponding resources func (p *DynamicPolicy) GetResourcesAllocation(ctx context.Context, req *pluginapi.GetResourcesAllocationRequest) (*pluginapi.GetResourcesAllocationResponse, error) { - return nil, nil + return &pluginapi.GetResourcesAllocationResponse{}, nil } // GetTopologyAwareResources returns allocation results of corresponding resources as topology aware format func (p *DynamicPolicy) GetTopologyAwareResources(ctx context.Context, req *pluginapi.GetTopologyAwareResourcesRequest) (*pluginapi.GetTopologyAwareResourcesResponse, error) { - return nil, nil + return &pluginapi.GetTopologyAwareResourcesResponse{}, nil } // GetTopologyAwareAllocatableResources returns corresponding allocatable resources as topology aware format func (p *DynamicPolicy) GetTopologyAwareAllocatableResources(ctx context.Context, req *pluginapi.GetTopologyAwareAllocatableResourcesRequest) (*pluginapi.GetTopologyAwareAllocatableResourcesResponse, error) { - return nil, nil + return &pluginapi.GetTopologyAwareAllocatableResourcesResponse{}, nil } // GetResourcePluginOptions returns options to be communicated with Resource Manager @@ -254,17 +255,112 @@ func (p *DynamicPolicy) Allocate(ctx context.Context, req *pluginapi.ResourceReq // before each container start. Resource plugin can run resource specific operations // such as resetting the resource before making resources available to the container func (p *DynamicPolicy) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { - return nil, nil + return &pluginapi.PreStartContainerResponse{}, nil } func (p *DynamicPolicy) applyNetClass() { + if p.metaServer == nil { + klog.Errorf("[NetworkDynamicPolicy.applyNetClass] nil metaServer") + return + } + ctx := context.Background() + podList, err := p.metaServer.GetPodList(ctx, nil) + if err != nil { + klog.Errorf("[NetworkDynamicPolicy.applyNetClass] get pod list failed, err: %v", err) + return + } + for _, pod := range podList { + classID, err := p.getNetClassID(pod) + if err != nil { + klog.Errorf("[NetworkDynamicPolicy.applyNetClass] get net class id failed, pod: %s, err: %s", native.GenerateUniqObjectNameKey(pod), err) + continue + } + netClsData := &common.NetClsData{ + ClassID: classID, + Attributes: native.FilterPodAnnotations(p.podLevelNetAttributesAnnoKeys, pod), + } + + for _, container := range pod.Spec.Containers { + go func(podUID, containerName string, netClsData *common.NetClsData) { + containerID, err := p.metaServer.GetContainerID(podUID, containerName) + if err != nil { + klog.Errorf("[NetworkDynamicPolicy.applyNetClass] get container id failed, pod: %s, container: %s(%s), err: %v", + podUID, containerName, containerID, err) + return + } + + exist, err := common.IsContainerCgroupExist(podUID, containerID) + if err != nil { + klog.Errorf("[NetworkDynamicPolicy.applyNetClass] check if container cgroup exists failed, pod: %s, container: %s(%s), err: %v", + podUID, containerName, containerID, err) + return + } + if !exist { + klog.Infof("[NetworkDynamicPolicy.applyNetClass] container cgroup does not exist, pod: %s, container: %s(%s)", podUID, containerName, containerID) + return + } + + if p.isCgV2Env { + cgID, err := p.metaServer.ExternalManager.GetCgroupIDForContainer(podUID, containerID) + if err != nil { + klog.Errorf("[NetworkDynamicPolicy.applyNetClass] get cgroup id failed, pod: %s, container: %s(%s), err: %v", + podUID, containerName, containerID, err) + return + } + netClsData.CgroupID = cgID + } + + err = p.applyNetClassFunc(podUID, containerID, netClsData) + if err != nil { + klog.Errorf("[NetworkDynamicPolicy.applyNetClass] apply net class failed, pod: %s, container: %s(%s), netClsData: %+v, err: %v", + podUID, containerName, containerID, *netClsData, err) + return + } + + klog.Infof("[NetworkDynamicPolicy.applyNetClass] apply net class successfully, pod: %s, container: %s(%s), netClsData: %+v", podUID, containerName, containerID, *netClsData) + }(string(pod.UID), container.Name, netClsData) + } + } } func (p *DynamicPolicy) removePod(podUID string) error { + cgIDList, err := p.metaServer.ExternalManager.ListCgroupIDsForPod(podUID) + if err != nil { + return fmt.Errorf("[NetworkDynamicPolicy.removePod] list cgroup ids of pod: %s failed with error: %v", podUID, err) + } + for _, cgID := range cgIDList { + go func(cgID uint64) { + if err := p.metaServer.ExternalManager.ClearNetClass(cgID); err != nil { + klog.Errorf("[NetworkDynamicPolicy.removePod] delete net class failed, cgID: %v, err: %v", cgID, err) + return + } + }(cgID) + } return nil } +func (p *DynamicPolicy) getNetClassID(pod *v1.Pod) (uint32, error) { + isPodLevelNetClassExist, classID, err := qos.GetPodNetClassID(pod) + if err != nil { + return 0, err + } + if isPodLevelNetClassExist { + return classID, nil + } + + qosClass, err := p.qosConfig.GetQoSLevelForPod(pod) + if err != nil { + return 0, err + } + return p.getNetClassIDByQoS(qosClass), nil +} + +func (p *DynamicPolicy) getNetClassIDByQoS(qosClass string) uint32 { + p.RLock() + defer p.RUnlock() + return p.netClassMap[qosClass] +} diff --git a/pkg/metaserver/external/manager.go b/pkg/metaserver/external/manager.go index 6f0ddfcbd1..21afa9d466 100644 --- a/pkg/metaserver/external/manager.go +++ b/pkg/metaserver/external/manager.go @@ -17,22 +17,16 @@ package external import ( "context" - "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + "github.com/kubewharf/katalyst-core/pkg/metaserver/external/cgroupid" + "github.com/kubewharf/katalyst-core/pkg/util/external/network" + "github.com/kubewharf/katalyst-core/pkg/util/external/rdt" ) // ExternalManager contains a set of managers that execute configurations beyond the OCI spec. type ExternalManager interface { - Run(ctx context.Context) - - GetCgroupIDForContainer(podUID, containerID string) (uint64, error) - ListCgroupIDsForPod(podUID string) ([]uint64, error) + cgroupid.CgroupIDManager + network.NetworkManager + rdt.RDTManager - ApplyNetClass(podUID, containerId string, data *common.NetClsData) error - ClearNetClass(cgroupID uint64) error - - CheckSupportRDT() (bool, error) - InitRDT() error - ApplyTasks(clos string, tasks []string) error - ApplyCAT(clos string, cat map[int]int) error - ApplyMBA(clos string, mba map[int]int) error + Run(ctx context.Context) } diff --git a/pkg/metaserver/external/manager_unsupported.go b/pkg/metaserver/external/manager_unsupported.go index 8f72ce1755..1cb79c0f26 100644 --- a/pkg/metaserver/external/manager_unsupported.go +++ b/pkg/metaserver/external/manager_unsupported.go @@ -19,6 +19,7 @@ package external import ( "context" + "sync" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" "github.com/kubewharf/katalyst-core/pkg/metaserver/external/cgroupid" @@ -26,19 +27,34 @@ import ( "github.com/kubewharf/katalyst-core/pkg/util/external/rdt" ) -type unsupportedExternalManager struct { +var ( + initUnsupportedManagerOnce sync.Once + unsupportedManager *unsupportedExternalManagerImpl +) + +type unsupportedExternalManagerImpl struct { + start bool cgroupid.CgroupIDManager network.NetworkManager rdt.RDTManager } -// Run starts an unsupportedExternalManager -func (m *unsupportedExternalManager) Run(_ context.Context) { +// Run starts an unsupportedExternalManagerImpl +func (m *unsupportedExternalManagerImpl) Run(_ context.Context) { } // InitExternalManager initializes an externalManagerImpl func InitExternalManager(podFetcher pod.PodFetcher) ExternalManager { - return &unsupportedExternalManager{} + initUnsupportedManagerOnce.Do(func() { + unsupportedManager = &unsupportedExternalManagerImpl{ + start: false, + CgroupIDManager: cgroupid.NewCgroupIDManager(podFetcher), + NetworkManager: network.NewNetworkManager(), + RDTManager: rdt.NewDefaultManager(), + } + }) + + return unsupportedManager } diff --git a/pkg/util/cgroup/common/path.go b/pkg/util/cgroup/common/path.go index 8e144ed961..3591c8352f 100644 --- a/pkg/util/cgroup/common/path.go +++ b/pkg/util/cgroup/common/path.go @@ -113,3 +113,12 @@ func GetPodAbsCgroupPath(subsys, podUID string) (string, error) { func GetContainerAbsCgroupPath(subsys, podUID, containerId string) (string, error) { return GetKubernetesAnyExistAbsCgroupPath(subsys, path.Join(fmt.Sprintf("%s%s", PodCgroupPathPrefix, podUID), containerId)) } + +func IsContainerCgroupExist(podUID, containerID string) (bool, error) { + containerAbsCGPath, err := GetContainerAbsCgroupPath("", podUID, containerID) + if err != nil { + return false, fmt.Errorf("GetContainerAbsCgroupPath failed, err: %v", err) + } + + return general.IsPathExists(containerAbsCGPath), nil +} diff --git a/pkg/util/external/rdt/manager.go b/pkg/util/external/rdt/manager.go index 5b22065cec..0abcd3703d 100644 --- a/pkg/util/external/rdt/manager.go +++ b/pkg/util/external/rdt/manager.go @@ -15,6 +15,8 @@ package rdt // RDTManager provides methods that control RDT related resources. +// Note: OCI Spec and runC already support the configuration of RDT-related parameters, but CRI and containerd do not yet support it. +// Therefore, we plan to support the configuration of RDT-related parameters through NRI or CRI in the future. type RDTManager interface { CheckSupportRDT() (bool, error) InitRDT() error diff --git a/pkg/util/native/pods.go b/pkg/util/native/pods.go index 1638a9b472..1fe7799c88 100644 --- a/pkg/util/native/pods.go +++ b/pkg/util/native/pods.go @@ -326,3 +326,16 @@ func DeepCopyPodContainers(pod *v1.Pod) (containers []v1.Container) { } return } + +// FilterPodAnnotations returns the needed annotations for the given pod. +func FilterPodAnnotations(filterKeys []string, pod *v1.Pod) map[string]string { + netAttrMap := make(map[string]string) + + for _, attrKey := range filterKeys { + if attrVal, ok := pod.GetAnnotations()[attrKey]; ok { + netAttrMap[attrKey] = attrVal + } + } + + return netAttrMap +} diff --git a/pkg/util/qos/net_enhancement.go b/pkg/util/qos/net_enhancement.go new file mode 100644 index 0000000000..07aee4e4de --- /dev/null +++ b/pkg/util/qos/net_enhancement.go @@ -0,0 +1,39 @@ +// Copyright 2022 The Katalyst Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package qos + +import ( + "strconv" + + v1 "k8s.io/api/core/v1" + + "github.com/kubewharf/katalyst-api/pkg/consts" +) + +// GetPodNetClassID parses net class id for the given pod. +// if the given pod doesn't specify a class id, the first value returned will be false +func GetPodNetClassID(pod *v1.Pod) (bool, uint32, error) { + classIDStr, ok := pod.GetAnnotations()[consts.PodAnnotationNetClassKey] + + if !ok { + return false, 0, nil + } + + classID, err := strconv.ParseUint(classIDStr, 10, 64) + if err != nil { + return true, 0, err + } + return true, uint32(classID), nil +}