Skip to content

Commit

Permalink
kubelet reporter plugin support report numa memory bandwidth allocata…
Browse files Browse the repository at this point in the history
…ble/capacity and allocation
  • Loading branch information
luomingmeng committed Apr 7, 2024
1 parent d3caa5a commit 4e3e5ff
Show file tree
Hide file tree
Showing 8 changed files with 681 additions and 40 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ require (
)

replace (
github.com/kubewharf/katalyst-api => github.com/luomingmeng/katalyst-api v0.0.0-20240328174006-ecab364c1822
github.com/kubewharf/katalyst-api => github.com/luomingmeng/katalyst-api v0.0.0-20240407025853-4aa8ae470e78
k8s.io/api => k8s.io/api v0.24.6
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6
k8s.io/apimachinery => k8s.io/apimachinery v0.24.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,8 @@ github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0U
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/lpabon/godbc v0.1.1/go.mod h1:Jo9QV0cf3U6jZABgiJ2skINAXb9j8m51r07g4KI92ZA=
github.com/luomingmeng/katalyst-api v0.0.0-20240328174006-ecab364c1822 h1:7CzD7Cai8mKvnC5qmrIztm8wvB32cM88BUE643A/QVs=
github.com/luomingmeng/katalyst-api v0.0.0-20240328174006-ecab364c1822/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/luomingmeng/katalyst-api v0.0.0-20240407025853-4aa8ae470e78 h1:8CULQ7g+6GQjYGgPN6fumH1080Lb6BzHAJ2F8mvBAmw=
github.com/luomingmeng/katalyst-api v0.0.0-20240407025853-4aa8ae470e78/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down
105 changes: 101 additions & 4 deletions pkg/agent/resourcemanager/fetcher/kubelet/topology/topology_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"
"time"

Expand All @@ -29,6 +30,7 @@ import (
"google.golang.org/grpc"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
Expand All @@ -43,6 +45,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
metaserverpod "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metaserver/spd"
"github.com/kubewharf/katalyst-core/pkg/util"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
Expand Down Expand Up @@ -182,7 +185,13 @@ func (p *topologyAdapterImpl) GetTopologyZones(parentCtx context.Context) ([]*no
// get zone attributes by allocatable resources
zoneAttributes, err := p.getZoneAttributes(allocatableResources)
if err != nil {
return nil, errors.Wrap(err, "get zone Attributes failed")
return nil, errors.Wrap(err, "get zone attributes failed")
}

// get zone siblings by SiblingNumaMap
zoneSiblings, err := p.getZoneSiblings()
if err != nil {
return nil, errors.Wrap(err, "get zone siblings failed")
}

// initialize a topology zone generator by numa socket zone node map
Expand All @@ -202,7 +211,7 @@ func (p *topologyAdapterImpl) GetTopologyZones(parentCtx context.Context) ([]*no
return nil, errors.Wrap(err, "get device zone topology failed")
}

return topologyZoneGenerator.GenerateTopologyZoneStatus(zoneAllocations, zoneResources, zoneAttributes), nil
return topologyZoneGenerator.GenerateTopologyZoneStatus(zoneAllocations, zoneResources, zoneAttributes, zoneSiblings), nil
}

// GetTopologyPolicy return newest topology policy status
Expand Down Expand Up @@ -407,6 +416,16 @@ func (p *topologyAdapterImpl) getZoneResources(allocatableResources *podresv1.Al
}
}

zoneCapacity, err = p.addNumaMemoryBandwidthResources(zoneCapacity, p.metaServer.SiblingNumaMBWCapacityMap)
if err != nil {
errList = append(errList, err)
}

zoneAllocatable, err = p.addNumaMemoryBandwidthResources(zoneAllocatable, p.metaServer.SiblingNumaMBWAllocatableMap)
if err != nil {
errList = append(errList, err)
}

if len(errList) > 0 {
return nil, utilerrors.NewAggregate(errList)
}
Expand Down Expand Up @@ -468,7 +487,7 @@ func (p *topologyAdapterImpl) getZoneAllocations(podList []*v1.Pod, podResources
}

// aggregates resources in each zone used by all containers of the pod
podAllocated, err := p.aggregateContainerAllocated(podResources.Containers)
podAllocated, err := p.aggregateContainerAllocated(pod.ObjectMeta, podResources.Containers)
if err != nil {
errList = append(errList, fmt.Errorf("pod %s aggregate container allocated failed, %s", podKey, err))
continue
Expand Down Expand Up @@ -599,7 +618,7 @@ func (p *topologyAdapterImpl) getZoneAttributes(allocatableResources *podresv1.A

// aggregateContainerAllocated aggregates resources in each zone used by all containers of a pod and returns a map of zone node to
// container allocated resources.
func (p *topologyAdapterImpl) aggregateContainerAllocated(containers []*podresv1.ContainerResources) (map[util.ZoneNode]*v1.ResourceList, error) {
func (p *topologyAdapterImpl) aggregateContainerAllocated(podMeta metav1.ObjectMeta, containers []*podresv1.ContainerResources) (map[util.ZoneNode]*v1.ResourceList, error) {
var errList []error

podAllocated := make(map[util.ZoneNode]*v1.ResourceList)
Expand All @@ -624,6 +643,14 @@ func (p *topologyAdapterImpl) aggregateContainerAllocated(containers []*podresv1
continue
}

// add container memory bandwidth according to its allocated numa resources
containerAllocated, err = p.addContainerMemoryBandwidth(containerAllocated, podMeta, containerResources.Name)
if err != nil {
errList = append(errList, fmt.Errorf("get container %s memory bandwidth failed: %s",
containerResources.Name, err))
continue
}

for zoneNode, resourceList := range containerAllocated {
if resourceList == nil {
continue
Expand Down Expand Up @@ -836,6 +863,76 @@ func (p *topologyAdapterImpl) generateZoneNode(quantity podresv1.TopologyAwareQu
}
}

func (p *topologyAdapterImpl) getZoneSiblings() (map[util.ZoneNode]util.ZoneSiblings, error) {
zoneSiblings := make(map[util.ZoneNode]util.ZoneSiblings)
for id, siblings := range p.metaServer.SiblingNumaMap {
zoneNode := util.GenerateNumaZoneNode(id)
zoneSiblings[zoneNode] = make(util.ZoneSiblings, 0)
for sibling := range siblings {
zoneSiblings[zoneNode] = append(zoneSiblings[zoneNode], nodev1alpha1.Sibling{
Type: nodev1alpha1.TopologyTypeNuma,
Name: strconv.Itoa(sibling),
})
}
}

return zoneSiblings, nil
}

// addContainerMemoryBandwidth add container memory bandwidth according to numa cpu allocated and cpu request
func (p *topologyAdapterImpl) addContainerMemoryBandwidth(zoneAllocated map[util.ZoneNode]*v1.ResourceList, podMeta metav1.ObjectMeta, name string) (map[util.ZoneNode]*v1.ResourceList, error) {
spec, err := p.metaServer.GetContainerSpec(string(podMeta.UID), name)
if err != nil {
return nil, err
}

cpuRequest := native.CPUQuantityGetter()(spec.Resources.Requests)
if cpuRequest.IsZero() {
return zoneAllocated, nil
}

memoryBandwidthRequest, err := spd.GetContainerMemoryBandwidthRequest(p.metaServer, podMeta, int(cpuRequest.Value()))
if err != nil {
return nil, err
}

if memoryBandwidthRequest > 0 {
numaAllocated := make(map[util.ZoneNode]*v1.ResourceList)
for zoneNode, allocated := range zoneAllocated {
// only consider numa which is allocated cpu
if zoneNode.Meta.Type == nodev1alpha1.TopologyTypeNuma && allocated != nil &&
!(*allocated).Cpu().IsZero() {
numaAllocated[zoneNode] = allocated
}
}

if len(numaAllocated) > 0 {
memoryBandwidthRequestPerNuma := memoryBandwidthRequest / len(numaAllocated)
for _, allocated := range numaAllocated {
(*allocated)[apiconsts.ResourceMemoryBandwidth] = *resource.NewQuantity(int64(memoryBandwidthRequestPerNuma), resource.BinarySI)
}
}
}
return zoneAllocated, nil
}

// addNumaMemoryBandwidthResources add numa memory bandwidth by numa to memory bandwidth map
func (p *topologyAdapterImpl) addNumaMemoryBandwidthResources(zoneResources map[util.ZoneNode]*v1.ResourceList, memoryBandwidthMap map[int]int64) (map[util.ZoneNode]*v1.ResourceList, error) {
for id, memoryBandwidth := range memoryBandwidthMap {
if memoryBandwidth <= 0 {
continue
}

numaZoneNode := util.GenerateNumaZoneNode(id)
res, ok := zoneResources[numaZoneNode]
if !ok || res == nil {
zoneResources[numaZoneNode] = &v1.ResourceList{}
}
(*zoneResources[numaZoneNode])[apiconsts.ResourceMemoryBandwidth] = *resource.NewQuantity(memoryBandwidth, resource.BinarySI)
}
return zoneResources, nil
}

// filterAllocatedPodResourcesList is to filter pods that have allocated devices or Resources
func filterAllocatedPodResourcesList(podResourcesList []*podresv1.PodResources) []*podresv1.PodResources {
allocatedPodResourcesList := make([]*podresv1.PodResources, 0, len(podResourcesList))
Expand Down
Loading

0 comments on commit 4e3e5ff

Please sign in to comment.