Skip to content

Commit

Permalink
1. spd manager support get container memory bandwidth request by spd
Browse files Browse the repository at this point in the history
2. kubelet reporter plugin support report numa memory bandwidth allocatable/capacity and allocation
  • Loading branch information
luomingmeng committed Apr 8, 2024
1 parent c025c06 commit 6aab107
Show file tree
Hide file tree
Showing 23 changed files with 1,452 additions and 171 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/google/cadvisor v0.44.2
github.com/google/uuid v1.3.0
github.com/klauspost/cpuid/v2 v2.2.6
github.com/kubewharf/katalyst-api v0.4.1-0.20240407031720-b62db8de7a36
github.com/kubewharf/katalyst-api v0.4.1-0.20240407044918-5d1dfd91ffa4
github.com/montanaflynn/stats v0.7.1
github.com/opencontainers/runc v1.1.6
github.com/opencontainers/selinux v1.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.4.1-0.20240407031720-b62db8de7a36 h1:AUsgMb1EaUbrmUWQU7zAXnZHAOPAH65bx/MELm3qaAQ=
github.com/kubewharf/katalyst-api v0.4.1-0.20240407031720-b62db8de7a36/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/katalyst-api v0.4.1-0.20240407044918-5d1dfd91ffa4 h1:YUrPtBBTMJFTLK9hgZzwimyBvoHOdcLCfarEh3/9zxw=
github.com/kubewharf/katalyst-api v0.4.1-0.20240407044918-5d1dfd91ffa4/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8 h1:2e89T/nZTgzaVhyRsZuwEdRk8V8kJXs4PRkgfeG4Ai4=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
115 changes: 111 additions & 4 deletions pkg/agent/resourcemanager/fetcher/kubelet/topology/topology_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"
"time"

Expand All @@ -29,6 +30,7 @@ import (
"google.golang.org/grpc"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
Expand All @@ -43,6 +45,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
metaserverpod "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metaserver/spd"
"github.com/kubewharf/katalyst-core/pkg/util"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
Expand Down Expand Up @@ -182,7 +185,13 @@ func (p *topologyAdapterImpl) GetTopologyZones(parentCtx context.Context) ([]*no
// get zone attributes by allocatable resources
zoneAttributes, err := p.getZoneAttributes(allocatableResources)
if err != nil {
return nil, errors.Wrap(err, "get zone Attributes failed")
return nil, errors.Wrap(err, "get zone attributes failed")
}

// get zone siblings by SiblingNumaMap
zoneSiblings, err := p.getZoneSiblings()
if err != nil {
return nil, errors.Wrap(err, "get zone siblings failed")
}

// initialize a topology zone generator by numa socket zone node map
Expand All @@ -202,7 +211,7 @@ func (p *topologyAdapterImpl) GetTopologyZones(parentCtx context.Context) ([]*no
return nil, errors.Wrap(err, "get device zone topology failed")
}

return topologyZoneGenerator.GenerateTopologyZoneStatus(zoneAllocations, zoneResources, zoneAttributes), nil
return topologyZoneGenerator.GenerateTopologyZoneStatus(zoneAllocations, zoneResources, zoneAttributes, zoneSiblings), nil
}

// GetTopologyPolicy return newest topology policy status
Expand Down Expand Up @@ -407,6 +416,16 @@ func (p *topologyAdapterImpl) getZoneResources(allocatableResources *podresv1.Al
}
}

zoneCapacity, err = p.addNumaMemoryBandwidthResources(zoneCapacity, p.metaServer.SiblingNumaAvgMBWCapacityMap)
if err != nil {
errList = append(errList, err)
}

zoneAllocatable, err = p.addNumaMemoryBandwidthResources(zoneAllocatable, p.metaServer.SiblingNumaAvgMBWAllocatableMap)
if err != nil {
errList = append(errList, err)
}

if len(errList) > 0 {
return nil, utilerrors.NewAggregate(errList)
}
Expand Down Expand Up @@ -468,7 +487,7 @@ func (p *topologyAdapterImpl) getZoneAllocations(podList []*v1.Pod, podResources
}

// aggregates resources in each zone used by all containers of the pod
podAllocated, err := p.aggregateContainerAllocated(podResources.Containers)
podAllocated, err := p.aggregateContainerAllocated(pod.ObjectMeta, podResources.Containers)
if err != nil {
errList = append(errList, fmt.Errorf("pod %s aggregate container allocated failed, %s", podKey, err))
continue
Expand Down Expand Up @@ -599,7 +618,7 @@ func (p *topologyAdapterImpl) getZoneAttributes(allocatableResources *podresv1.A

// aggregateContainerAllocated aggregates resources in each zone used by all containers of a pod and returns a map of zone node to
// container allocated resources.
func (p *topologyAdapterImpl) aggregateContainerAllocated(containers []*podresv1.ContainerResources) (map[util.ZoneNode]*v1.ResourceList, error) {
func (p *topologyAdapterImpl) aggregateContainerAllocated(podMeta metav1.ObjectMeta, containers []*podresv1.ContainerResources) (map[util.ZoneNode]*v1.ResourceList, error) {
var errList []error

podAllocated := make(map[util.ZoneNode]*v1.ResourceList)
Expand All @@ -624,6 +643,14 @@ func (p *topologyAdapterImpl) aggregateContainerAllocated(containers []*podresv1
continue
}

// add container memory bandwidth according to its allocated numa resources
containerAllocated, err = p.addContainerMemoryBandwidth(containerAllocated, podMeta, containerResources.Name)
if err != nil {
errList = append(errList, fmt.Errorf("get container %s memory bandwidth failed: %s",
containerResources.Name, err))
continue
}

for zoneNode, resourceList := range containerAllocated {
if resourceList == nil {
continue
Expand Down Expand Up @@ -836,6 +863,86 @@ func (p *topologyAdapterImpl) generateZoneNode(quantity podresv1.TopologyAwareQu
}
}

func (p *topologyAdapterImpl) getZoneSiblings() (map[util.ZoneNode]util.ZoneSiblings, error) {
zoneSiblings := make(map[util.ZoneNode]util.ZoneSiblings)
for id, siblings := range p.metaServer.SiblingNumaMap {
zoneNode := util.GenerateNumaZoneNode(id)
zoneSiblings[zoneNode] = make(util.ZoneSiblings, 0)
for sibling := range siblings {
zoneSiblings[zoneNode] = append(zoneSiblings[zoneNode], nodev1alpha1.Sibling{
Type: nodev1alpha1.TopologyTypeNuma,
Name: strconv.Itoa(sibling),
})
}
}

return zoneSiblings, nil
}

// addContainerMemoryBandwidth add container memory bandwidth according to numa cpu allocated and cpu request
func (p *topologyAdapterImpl) addContainerMemoryBandwidth(zoneAllocated map[util.ZoneNode]*v1.ResourceList, podMeta metav1.ObjectMeta, name string) (map[util.ZoneNode]*v1.ResourceList, error) {
spec, err := p.metaServer.GetContainerSpec(string(podMeta.UID), name)
if err != nil {
return nil, err
}

cpuRequest := native.CPUQuantityGetter()(spec.Resources.Requests)
if cpuRequest.IsZero() {
return zoneAllocated, nil
}

numaAllocated := make(map[util.ZoneNode]*v1.ResourceList)
for zoneNode, allocated := range zoneAllocated {
// only consider numa which is allocated cpu and memory bandwidth capacity greater than zero
if zoneNode.Meta.Type == nodev1alpha1.TopologyTypeNuma && allocated != nil &&
!(*allocated).Cpu().IsZero() {
numaID, err := util.GetZoneID(zoneNode)
if err != nil {
return nil, err
}

// if the numa avg mbw capacity is zero, we will not consider its mbw allocation
if p.metaServer.SiblingNumaAvgMBWCapacityMap[numaID] > 0 {
numaAllocated[zoneNode] = allocated
}
}
}

// only numa allocated container need consider memory bandwidth
if len(numaAllocated) > 0 {
memoryBandwidthRequest, err := spd.GetContainerMemoryBandwidthRequest(p.metaServer, podMeta, int(cpuRequest.Value()))
if err != nil {
return nil, err
}

if memoryBandwidthRequest > 0 {
memoryBandwidthRequestPerNuma := memoryBandwidthRequest / len(numaAllocated)
for _, allocated := range numaAllocated {
(*allocated)[apiconsts.ResourceMemoryBandwidth] = *resource.NewQuantity(int64(memoryBandwidthRequestPerNuma), resource.BinarySI)
}
}
}

return zoneAllocated, nil
}

// addNumaMemoryBandwidthResources add numa memory bandwidth by numa to memory bandwidth map
func (p *topologyAdapterImpl) addNumaMemoryBandwidthResources(zoneResources map[util.ZoneNode]*v1.ResourceList, memoryBandwidthMap map[int]int64) (map[util.ZoneNode]*v1.ResourceList, error) {
for id, memoryBandwidth := range memoryBandwidthMap {
if memoryBandwidth <= 0 {
continue
}

numaZoneNode := util.GenerateNumaZoneNode(id)
res, ok := zoneResources[numaZoneNode]
if !ok || res == nil {
zoneResources[numaZoneNode] = &v1.ResourceList{}
}
(*zoneResources[numaZoneNode])[apiconsts.ResourceMemoryBandwidth] = *resource.NewQuantity(memoryBandwidth, resource.BinarySI)
}
return zoneResources, nil
}

// filterAllocatedPodResourcesList is to filter pods that have allocated devices or Resources
func filterAllocatedPodResourcesList(podResourcesList []*podresv1.PodResources) []*podresv1.PodResources {
allocatedPodResourcesList := make([]*podresv1.PodResources, 0, len(podResourcesList))
Expand Down
Loading

0 comments on commit 6aab107

Please sign in to comment.