Skip to content

Commit

Permalink
feat(qrm): implement packet tagging
Browse files Browse the repository at this point in the history
Signed-off-by: caohe <caohe9603@gmail.com>
  • Loading branch information
caohe committed Apr 24, 2023
1 parent 9ec1d3d commit 6821446
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 27 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ require (
)

replace (
github.com/kubewharf/katalyst-api => github.com/kubewharf/katalyst-api v0.0.4
github.com/kubewharf/katalyst-api => github.com/caohe/katalyst-api v0.0.0-20230423073908-4e492bfdaf40
k8s.io/api => k8s.io/api v0.24.6
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6
k8s.io/apimachinery => k8s.io/apimachinery v0.24.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2y
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/bombsimon/wsl/v3 v3.1.0/go.mod h1:st10JtZYLE4D5sC7b8xV4zTKZwAQjCH/Hy2Pm1FNZIc=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/caohe/katalyst-api v0.0.0-20230423073908-4e492bfdaf40 h1:85nIuJ3Xa88SW1ufcITUMcwEj4WapTSc5YfYo0CbC2g=
github.com/caohe/katalyst-api v0.0.0-20230423073908-4e492bfdaf40/go.mod h1:N477ZHDwzROGwkee/sPGs2mo5lWqtS10hZxYfTcxB88=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down Expand Up @@ -550,8 +552,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.0.4 h1:P4ezSfAN65+fPIpoQiAcGy03EQ2gbL/R+WRZRwG7QeM=
github.com/kubewharf/katalyst-api v0.0.4/go.mod h1:N477ZHDwzROGwkee/sPGs2mo5lWqtS10hZxYfTcxB88=
github.com/kubewharf/kubelet v1.24.6-kubewharf.4 h1:3lg/wqi0jqZZr60JcjDHeOpLcXMKsq3MrEq/4bmfzR8=
github.com/kubewharf/kubelet v1.24.6-kubewharf.4/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
110 changes: 103 additions & 7 deletions pkg/agent/qrm-plugins/network/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ package dynamicpolicy
import (
"context"
"fmt"
"github.com/kubewharf/katalyst-core/pkg/util/native"
"github.com/kubewharf/katalyst-core/pkg/util/qos"
"strings"
"sync"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"
Expand All @@ -38,6 +37,8 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
cgroupcmutils "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
"github.com/kubewharf/katalyst-core/pkg/util/qos"
)

const (
Expand Down Expand Up @@ -183,7 +184,7 @@ func (p *DynamicPolicy) ResourceName() string {

// GetTopologyHints returns hints of corresponding resources
func (p *DynamicPolicy) GetTopologyHints(ctx context.Context, req *pluginapi.ResourceRequest) (*pluginapi.ResourceHintsResponse, error) {
return nil, nil
return &pluginapi.ResourceHintsResponse{}, nil
}

func (p *DynamicPolicy) RemovePod(ctx context.Context, req *pluginapi.RemovePodRequest) (*pluginapi.RemovePodResponse, error) {
Expand All @@ -204,17 +205,17 @@ func (p *DynamicPolicy) RemovePod(ctx context.Context, req *pluginapi.RemovePodR

// GetResourcesAllocation returns allocation results of corresponding resources
func (p *DynamicPolicy) GetResourcesAllocation(ctx context.Context, req *pluginapi.GetResourcesAllocationRequest) (*pluginapi.GetResourcesAllocationResponse, error) {
return nil, nil
return &pluginapi.GetResourcesAllocationResponse{}, nil
}

// GetTopologyAwareResources returns allocation results of corresponding resources as topology aware format
func (p *DynamicPolicy) GetTopologyAwareResources(ctx context.Context, req *pluginapi.GetTopologyAwareResourcesRequest) (*pluginapi.GetTopologyAwareResourcesResponse, error) {
return nil, nil
return &pluginapi.GetTopologyAwareResourcesResponse{}, nil
}

// GetTopologyAwareAllocatableResources returns corresponding allocatable resources as topology aware format
func (p *DynamicPolicy) GetTopologyAwareAllocatableResources(ctx context.Context, req *pluginapi.GetTopologyAwareAllocatableResourcesRequest) (*pluginapi.GetTopologyAwareAllocatableResourcesResponse, error) {
return nil, nil
return &pluginapi.GetTopologyAwareAllocatableResourcesResponse{}, nil
}

// GetResourcePluginOptions returns options to be communicated with Resource Manager
Expand Down Expand Up @@ -254,17 +255,112 @@ func (p *DynamicPolicy) Allocate(ctx context.Context, req *pluginapi.ResourceReq
// before each container start. Resource plugin can run resource specific operations
// such as resetting the resource before making resources available to the container
func (p *DynamicPolicy) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
return nil, nil
return &pluginapi.PreStartContainerResponse{}, nil
}

func (p *DynamicPolicy) applyNetClass() {
if p.metaServer == nil {
klog.Errorf("[NetworkDynamicPolicy.applyNetClass] nil metaServer")
return
}
ctx := context.Background()
podList, err := p.metaServer.GetPodList(ctx, nil)
if err != nil {
klog.Errorf("[NetworkDynamicPolicy.applyNetClass] get pod list failed, err: %v", err)
return
}

for _, pod := range podList {
classID, err := p.getNetClassID(pod)
if err != nil {
klog.Errorf("[NetworkDynamicPolicy.applyNetClass] get net class id failed, pod: %s, err: %s", native.GenerateUniqObjectNameKey(pod), err)
continue
}
netClsData := &common.NetClsData{
ClassID: classID,
Attributes: native.FilterPodAnnotations(p.podLevelNetAttributesAnnoKeys, pod),
}

for _, container := range pod.Spec.Containers {
go func(podUID, containerName string, netClsData *common.NetClsData) {
containerID, err := p.metaServer.GetContainerID(podUID, containerName)
if err != nil {
klog.Errorf("[NetworkDynamicPolicy.applyNetClass] get container id failed, pod: %s, container: %s(%s), err: %v",
podUID, containerName, containerID, err)
return
}

exist, err := common.IsContainerCgroupExist(podUID, containerID)
if err != nil {
klog.Errorf("[NetworkDynamicPolicy.applyNetClass] check if container cgroup exists failed, pod: %s, container: %s(%s), err: %v",
podUID, containerName, containerID, err)
return
}
if !exist {
klog.Infof("[NetworkDynamicPolicy.applyNetClass] container cgroup does not exist, pod: %s, container: %s(%s)", podUID, containerName, containerID)
return
}

if p.isCgV2Env {
cgID, err := p.metaServer.ExternalManager.GetCgroupIDForContainer(podUID, containerID)
if err != nil {
klog.Errorf("[NetworkDynamicPolicy.applyNetClass] get cgroup id failed, pod: %s, container: %s(%s), err: %v",
podUID, containerName, containerID, err)
return
}
netClsData.CgroupID = cgID
}

err = p.applyNetClassFunc(podUID, containerID, netClsData)
if err != nil {
klog.Errorf("[NetworkDynamicPolicy.applyNetClass] apply net class failed, pod: %s, container: %s(%s), netClsData: %+v, err: %v",
podUID, containerName, containerID, *netClsData, err)
return
}

klog.Infof("[NetworkDynamicPolicy.applyNetClass] apply net class successfully, pod: %s, container: %s(%s), netClsData: %+v", podUID, containerName, containerID, *netClsData)
}(string(pod.UID), container.Name, netClsData)
}
}
}

func (p *DynamicPolicy) removePod(podUID string) error {
cgIDList, err := p.metaServer.ExternalManager.ListCgroupIDsForPod(podUID)
if err != nil {
return fmt.Errorf("[NetworkDynamicPolicy.removePod] list cgroup ids of pod: %s failed with error: %v", podUID, err)
}

for _, cgID := range cgIDList {
go func(cgID uint64) {
if err := p.metaServer.ExternalManager.ClearNetClass(cgID); err != nil {
klog.Errorf("[NetworkDynamicPolicy.removePod] delete net class failed, cgID: %v, err: %v", cgID, err)
return
}
}(cgID)
}

return nil
}

func (p *DynamicPolicy) getNetClassID(pod *v1.Pod) (uint32, error) {
isPodLevelNetClassExist, classID, err := qos.GetPodNetClassID(pod)
if err != nil {
return 0, err
}
if isPodLevelNetClassExist {
return classID, nil
}

qosClass, err := p.qosConfig.GetQoSLevelForPod(pod)
if err != nil {
return 0, err
}
return p.getNetClassIDByQoS(qosClass), nil
}

func (p *DynamicPolicy) getNetClassIDByQoS(qosClass string) uint32 {
p.RLock()
defer p.RUnlock()

return p.netClassMap[qosClass]
}
20 changes: 7 additions & 13 deletions pkg/metaserver/external/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,16 @@ package external
import (
"context"

"github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
"github.com/kubewharf/katalyst-core/pkg/metaserver/external/cgroupid"
"github.com/kubewharf/katalyst-core/pkg/util/external/network"
"github.com/kubewharf/katalyst-core/pkg/util/external/rdt"
)

// ExternalManager contains a set of managers that execute configurations beyond the OCI spec.
type ExternalManager interface {
Run(ctx context.Context)

GetCgroupIDForContainer(podUID, containerID string) (uint64, error)
ListCgroupIDsForPod(podUID string) ([]uint64, error)
cgroupid.CgroupIDManager
network.NetworkManager
rdt.RDTManager

ApplyNetClass(podUID, containerId string, data *common.NetClsData) error
ClearNetClass(cgroupID uint64) error

CheckSupportRDT() (bool, error)
InitRDT() error
ApplyTasks(clos string, tasks []string) error
ApplyCAT(clos string, cat map[int]int) error
ApplyMBA(clos string, mba map[int]int) error
Run(ctx context.Context)
}
24 changes: 20 additions & 4 deletions pkg/metaserver/external/manager_unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,42 @@ package external

import (
"context"
"sync"

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metaserver/external/cgroupid"
"github.com/kubewharf/katalyst-core/pkg/util/external/network"
"github.com/kubewharf/katalyst-core/pkg/util/external/rdt"
)

type unsupportedExternalManager struct {
var (
initUnsupportedManagerOnce sync.Once
unsupportedManager *unsupportedExternalManagerImpl
)

type unsupportedExternalManagerImpl struct {
start bool
cgroupid.CgroupIDManager

network.NetworkManager
rdt.RDTManager
}

// Run starts an unsupportedExternalManager
func (m *unsupportedExternalManager) Run(_ context.Context) {
// Run starts an unsupportedExternalManagerImpl
func (m *unsupportedExternalManagerImpl) Run(_ context.Context) {

}

// InitExternalManager initializes an externalManagerImpl
func InitExternalManager(podFetcher pod.PodFetcher) ExternalManager {
return &unsupportedExternalManager{}
initUnsupportedManagerOnce.Do(func() {
unsupportedManager = &unsupportedExternalManagerImpl{
start: false,
CgroupIDManager: cgroupid.NewCgroupIDManager(podFetcher),
NetworkManager: network.NewNetworkManager(),
RDTManager: rdt.NewDefaultManager(),
}
})

return unsupportedManager
}
9 changes: 9 additions & 0 deletions pkg/util/cgroup/common/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,12 @@ func GetPodAbsCgroupPath(subsys, podUID string) (string, error) {
func GetContainerAbsCgroupPath(subsys, podUID, containerId string) (string, error) {
return GetKubernetesAnyExistAbsCgroupPath(subsys, path.Join(fmt.Sprintf("%s%s", PodCgroupPathPrefix, podUID), containerId))
}

func IsContainerCgroupExist(podUID, containerID string) (bool, error) {
containerAbsCGPath, err := GetContainerAbsCgroupPath("", podUID, containerID)
if err != nil {
return false, fmt.Errorf("GetContainerAbsCgroupPath failed, err: %v", err)
}

return general.IsPathExists(containerAbsCGPath), nil
}
2 changes: 2 additions & 0 deletions pkg/util/external/rdt/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package rdt

// RDTManager provides methods that control RDT related resources.
// Note: OCI Spec and runC already support the configuration of RDT-related parameters, but CRI and containerd do not yet support it.
// Therefore, we plan to support the configuration of RDT-related parameters through NRI or CRI in the future.
type RDTManager interface {
CheckSupportRDT() (bool, error)
InitRDT() error
Expand Down
13 changes: 13 additions & 0 deletions pkg/util/native/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,16 @@ func DeepCopyPodContainers(pod *v1.Pod) (containers []v1.Container) {
}
return
}

// FilterPodAnnotations returns the needed annotations for the given pod.
func FilterPodAnnotations(filterKeys []string, pod *v1.Pod) map[string]string {
netAttrMap := make(map[string]string)

for _, attrKey := range filterKeys {
if attrVal, ok := pod.GetAnnotations()[attrKey]; ok {
netAttrMap[attrKey] = attrVal
}
}

return netAttrMap
}
39 changes: 39 additions & 0 deletions pkg/util/qos/net_enhancement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2022 The Katalyst Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package qos

import (
"strconv"

v1 "k8s.io/api/core/v1"

"github.com/kubewharf/katalyst-api/pkg/consts"
)

// GetPodNetClassID parses net class id for the given pod.
// if the given pod doesn't specify a class id, the first value returned will be false
func GetPodNetClassID(pod *v1.Pod) (bool, uint32, error) {
classIDStr, ok := pod.GetAnnotations()[consts.PodAnnotationNetClassKey]

if !ok {
return false, 0, nil
}

classID, err := strconv.ParseUint(classIDStr, 10, 64)
if err != nil {
return true, 0, err
}
return true, uint32(classID), nil
}

0 comments on commit 6821446

Please sign in to comment.