Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

report numa sibling and memory bandwidth #533

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions cmd/katalyst-agent/app/options/metaserver/metaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ const (
defaultConfigCheckpointGraceTime = 2 * time.Hour
)

const defaultServiceProfileCacheTTL = 1 * time.Minute
const (
defaultServiceProfileSkipCorruptionError = true
defaultServiceProfileCacheTTL = 1 * time.Minute
)

const defaultMetricInsurancePeriod = 0 * time.Second

Expand Down Expand Up @@ -70,7 +73,8 @@ type MetaServerOptions struct {
ConfigCheckpointGraceTime time.Duration

// configurations for spd
ServiceProfileCacheTTL time.Duration
ServiceProfileSkipCorruptionError bool
ServiceProfileCacheTTL time.Duration

// configurations for metric-fetcher
MetricInsurancePeriod time.Duration
Expand Down Expand Up @@ -100,7 +104,8 @@ func NewMetaServerOptions() *MetaServerOptions {
ConfigSkipFailedInitialization: defaultConfigSkipFailedInitialization,
ConfigCheckpointGraceTime: defaultConfigCheckpointGraceTime,

ServiceProfileCacheTTL: defaultServiceProfileCacheTTL,
ServiceProfileSkipCorruptionError: defaultServiceProfileSkipCorruptionError,
ServiceProfileCacheTTL: defaultServiceProfileCacheTTL,

MetricInsurancePeriod: defaultMetricInsurancePeriod,
MetricProvisions: []string{metaserver.MetricProvisionerMalachite, metaserver.MetricProvisionerKubelet},
Expand Down Expand Up @@ -136,6 +141,8 @@ func (o *MetaServerOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs.DurationVar(&o.ConfigCheckpointGraceTime, "config-checkpoint-grace-time", o.ConfigCheckpointGraceTime,
"The grace time of meta server config checkpoint")

fs.BoolVar(&o.ServiceProfileSkipCorruptionError, "service-profile-skip-corruption-error", o.ServiceProfileSkipCorruptionError,
"Whether to skip corruption error when loading spd checkpoint")
fs.DurationVar(&o.ServiceProfileCacheTTL, "service-profile-cache-ttl", o.ServiceProfileCacheTTL,
"The ttl of service profile manager cache remote spd")

Expand Down Expand Up @@ -171,6 +178,7 @@ func (o *MetaServerOptions) ApplyTo(c *metaserver.MetaServerConfiguration) error
c.ConfigSkipFailedInitialization = o.ConfigSkipFailedInitialization
c.ConfigCheckpointGraceTime = o.ConfigCheckpointGraceTime

c.ServiceProfileSkipCorruptionError = o.ServiceProfileSkipCorruptionError
c.ServiceProfileCacheTTL = o.ServiceProfileCacheTTL

c.MetricInsurancePeriod = o.MetricInsurancePeriod
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/google/cadvisor v0.44.2
github.com/google/uuid v1.3.0
github.com/klauspost/cpuid/v2 v2.2.6
github.com/kubewharf/katalyst-api v0.4.1-0.20240407031720-b62db8de7a36
github.com/kubewharf/katalyst-api v0.4.1-0.20240407044918-5d1dfd91ffa4
github.com/montanaflynn/stats v0.7.1
github.com/opencontainers/runc v1.1.6
github.com/opencontainers/selinux v1.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.4.1-0.20240407031720-b62db8de7a36 h1:AUsgMb1EaUbrmUWQU7zAXnZHAOPAH65bx/MELm3qaAQ=
github.com/kubewharf/katalyst-api v0.4.1-0.20240407031720-b62db8de7a36/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/katalyst-api v0.4.1-0.20240407044918-5d1dfd91ffa4 h1:YUrPtBBTMJFTLK9hgZzwimyBvoHOdcLCfarEh3/9zxw=
github.com/kubewharf/katalyst-api v0.4.1-0.20240407044918-5d1dfd91ffa4/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8 h1:2e89T/nZTgzaVhyRsZuwEdRk8V8kJXs4PRkgfeG4Ai4=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"
"time"

Expand All @@ -29,6 +30,7 @@ import (
"google.golang.org/grpc"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
Expand All @@ -43,6 +45,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
metaserverpod "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metaserver/spd"
"github.com/kubewharf/katalyst-core/pkg/util"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
Expand Down Expand Up @@ -182,7 +185,13 @@ func (p *topologyAdapterImpl) GetTopologyZones(parentCtx context.Context) ([]*no
// get zone attributes by allocatable resources
zoneAttributes, err := p.getZoneAttributes(allocatableResources)
if err != nil {
return nil, errors.Wrap(err, "get zone Attributes failed")
return nil, errors.Wrap(err, "get zone attributes failed")
}

// get zone siblings by SiblingNumaMap
zoneSiblings, err := p.getZoneSiblings()
if err != nil {
return nil, errors.Wrap(err, "get zone siblings failed")
}

// initialize a topology zone generator by numa socket zone node map
Expand All @@ -202,7 +211,7 @@ func (p *topologyAdapterImpl) GetTopologyZones(parentCtx context.Context) ([]*no
return nil, errors.Wrap(err, "get device zone topology failed")
}

return topologyZoneGenerator.GenerateTopologyZoneStatus(zoneAllocations, zoneResources, zoneAttributes), nil
return topologyZoneGenerator.GenerateTopologyZoneStatus(zoneAllocations, zoneResources, zoneAttributes, zoneSiblings), nil
}

// GetTopologyPolicy return newest topology policy status
Expand Down Expand Up @@ -407,6 +416,16 @@ func (p *topologyAdapterImpl) getZoneResources(allocatableResources *podresv1.Al
}
}

zoneCapacity, err = p.addNumaMemoryBandwidthResources(zoneCapacity, p.metaServer.SiblingNumaAvgMBWCapacityMap)
if err != nil {
errList = append(errList, err)
}

zoneAllocatable, err = p.addNumaMemoryBandwidthResources(zoneAllocatable, p.metaServer.SiblingNumaAvgMBWAllocatableMap)
if err != nil {
errList = append(errList, err)
}

if len(errList) > 0 {
return nil, utilerrors.NewAggregate(errList)
}
Expand Down Expand Up @@ -468,7 +487,7 @@ func (p *topologyAdapterImpl) getZoneAllocations(podList []*v1.Pod, podResources
}

// aggregates resources in each zone used by all containers of the pod
podAllocated, err := p.aggregateContainerAllocated(podResources.Containers)
podAllocated, err := p.aggregateContainerAllocated(pod.ObjectMeta, podResources.Containers)
if err != nil {
errList = append(errList, fmt.Errorf("pod %s aggregate container allocated failed, %s", podKey, err))
continue
Expand Down Expand Up @@ -599,7 +618,7 @@ func (p *topologyAdapterImpl) getZoneAttributes(allocatableResources *podresv1.A

// aggregateContainerAllocated aggregates resources in each zone used by all containers of a pod and returns a map of zone node to
// container allocated resources.
func (p *topologyAdapterImpl) aggregateContainerAllocated(containers []*podresv1.ContainerResources) (map[util.ZoneNode]*v1.ResourceList, error) {
func (p *topologyAdapterImpl) aggregateContainerAllocated(podMeta metav1.ObjectMeta, containers []*podresv1.ContainerResources) (map[util.ZoneNode]*v1.ResourceList, error) {
var errList []error

podAllocated := make(map[util.ZoneNode]*v1.ResourceList)
Expand All @@ -624,6 +643,14 @@ func (p *topologyAdapterImpl) aggregateContainerAllocated(containers []*podresv1
continue
}

// add container memory bandwidth according to its allocated numa resources
containerAllocated, err = p.addContainerMemoryBandwidth(containerAllocated, podMeta, containerResources.Name)
if err != nil {
errList = append(errList, fmt.Errorf("get container %s memory bandwidth failed: %s",
containerResources.Name, err))
continue
}

for zoneNode, resourceList := range containerAllocated {
if resourceList == nil {
continue
Expand Down Expand Up @@ -836,6 +863,86 @@ func (p *topologyAdapterImpl) generateZoneNode(quantity podresv1.TopologyAwareQu
}
}

func (p *topologyAdapterImpl) getZoneSiblings() (map[util.ZoneNode]util.ZoneSiblings, error) {
zoneSiblings := make(map[util.ZoneNode]util.ZoneSiblings)
for id, siblings := range p.metaServer.SiblingNumaMap {
zoneNode := util.GenerateNumaZoneNode(id)
zoneSiblings[zoneNode] = make(util.ZoneSiblings, 0)
for sibling := range siblings {
zoneSiblings[zoneNode] = append(zoneSiblings[zoneNode], nodev1alpha1.Sibling{
Type: nodev1alpha1.TopologyTypeNuma,
Name: strconv.Itoa(sibling),
})
}
}

return zoneSiblings, nil
}

// addContainerMemoryBandwidth add container memory bandwidth according to numa cpu allocated and cpu request
func (p *topologyAdapterImpl) addContainerMemoryBandwidth(zoneAllocated map[util.ZoneNode]*v1.ResourceList, podMeta metav1.ObjectMeta, name string) (map[util.ZoneNode]*v1.ResourceList, error) {
spec, err := p.metaServer.GetContainerSpec(string(podMeta.UID), name)
if err != nil {
return nil, err
}

cpuRequest := native.CPUQuantityGetter()(spec.Resources.Requests)
if cpuRequest.IsZero() {
return zoneAllocated, nil
}

numaAllocated := make(map[util.ZoneNode]*v1.ResourceList)
for zoneNode, allocated := range zoneAllocated {
// only consider numa which is allocated cpu and memory bandwidth capacity greater than zero
if zoneNode.Meta.Type == nodev1alpha1.TopologyTypeNuma && allocated != nil &&
(*allocated).Cpu().CmpInt64(0) > 0 {
numaID, err := util.GetZoneID(zoneNode)
if err != nil {
return nil, err
}

// if the numa avg mbw capacity is zero, we will not consider its mbw allocation
if p.metaServer.SiblingNumaAvgMBWCapacityMap[numaID] > 0 {
numaAllocated[zoneNode] = allocated
}
}
}

// only numa allocated container need consider memory bandwidth
if len(numaAllocated) > 0 {
memoryBandwidthRequest, err := spd.GetContainerMemoryBandwidthRequest(p.metaServer, podMeta, int(cpuRequest.Value()))
if err != nil {
return nil, err
}

if memoryBandwidthRequest > 0 {
memoryBandwidthRequestPerNuma := memoryBandwidthRequest / len(numaAllocated)
for _, allocated := range numaAllocated {
(*allocated)[apiconsts.ResourceMemoryBandwidth] = *resource.NewQuantity(int64(memoryBandwidthRequestPerNuma), resource.BinarySI)
}
}
}

return zoneAllocated, nil
}

// addNumaMemoryBandwidthResources add numa memory bandwidth by numa to memory bandwidth map
func (p *topologyAdapterImpl) addNumaMemoryBandwidthResources(zoneResources map[util.ZoneNode]*v1.ResourceList, memoryBandwidthMap map[int]int64) (map[util.ZoneNode]*v1.ResourceList, error) {
for id, memoryBandwidth := range memoryBandwidthMap {
if memoryBandwidth <= 0 {
continue
}

numaZoneNode := util.GenerateNumaZoneNode(id)
res, ok := zoneResources[numaZoneNode]
if !ok || res == nil {
zoneResources[numaZoneNode] = &v1.ResourceList{}
}
(*zoneResources[numaZoneNode])[apiconsts.ResourceMemoryBandwidth] = *resource.NewQuantity(memoryBandwidth, resource.BinarySI)
}
return zoneResources, nil
}

// filterAllocatedPodResourcesList is to filter pods that have allocated devices or Resources
func filterAllocatedPodResourcesList(podResourcesList []*podresv1.PodResources) []*podresv1.PodResources {
allocatedPodResourcesList := make([]*podresv1.PodResources, 0, len(podResourcesList))
Expand Down
Loading
Loading