Skip to content

Commit

Permalink
Merge pull request #541 from caohe/validate-pod-resources
Browse files Browse the repository at this point in the history
feat(reporter): validate specific resources in pod resources response
  • Loading branch information
luomingmeng authored Apr 22, 2024
2 parents bd179d0 + 1a91730 commit 0c698ae
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 13 deletions.
9 changes: 9 additions & 0 deletions cmd/katalyst-agent/app/options/reporter/kubelet_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package reporter

import (
v1 "k8s.io/api/core/v1"
cliflag "k8s.io/component-base/cli/flag"
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"

Expand All @@ -28,6 +29,7 @@ type KubeletPluginOptions struct {
KubeletResourcePluginPaths []string
EnableReportTopologyPolicy bool
ResourceNameToZoneTypeMap map[string]string
NeedValidationResources []string
}

func NewKubeletPluginOptions() *KubeletPluginOptions {
Expand All @@ -40,6 +42,10 @@ func NewKubeletPluginOptions() *KubeletPluginOptions {
},
EnableReportTopologyPolicy: false,
ResourceNameToZoneTypeMap: make(map[string]string),
NeedValidationResources: []string{
string(v1.ResourceCPU),
string(v1.ResourceMemory),
},
}
}

Expand All @@ -54,13 +60,16 @@ func (o *KubeletPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
"whether to report topology policy")
fs.StringToStringVar(&o.ResourceNameToZoneTypeMap, "resource-name-to-zone-type-map", o.ResourceNameToZoneTypeMap,
"a map that stores the mapping relationship between resource names to zone types in KCNR (e.g. nvidia.com/gpu=GPU,...)")
fs.StringSliceVar(&o.NeedValidationResources, "need-validation-resources", o.NeedValidationResources,
"resources need to be validated")
}

func (o *KubeletPluginOptions) ApplyTo(c *reporter.KubeletPluginConfiguration) error {
c.PodResourcesServerEndpoints = o.PodResourcesServerEndpoints
c.KubeletResourcePluginPaths = o.KubeletResourcePluginPaths
c.EnableReportTopologyPolicy = o.EnableReportTopologyPolicy
c.ResourceNameToZoneTypeMap = o.ResourceNameToZoneTypeMap
c.NeedValidationResources = o.NeedValidationResources

return nil
}
3 changes: 2 additions & 1 deletion pkg/agent/resourcemanager/fetcher/kubelet/kubeletplugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func NewKubeletReporterPlugin(emitter metrics.MetricEmitter, metaServer *metaser

topologyStatusAdapter, err := topology.NewPodResourcesServerTopologyAdapter(metaServer, conf.QoSConfiguration,
conf.PodResourcesServerEndpoints, conf.KubeletResourcePluginPaths, conf.ResourceNameToZoneTypeMap,
nil, p.getNumaInfo, topology.GenericPodResourcesFilter(conf.QoSConfiguration), podresources.GetV1Client)
nil, p.getNumaInfo, topology.GenericPodResourcesFilter(conf.QoSConfiguration), podresources.GetV1Client,
conf.NeedValidationResources)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,16 @@ type topologyAdapterImpl struct {

// resourceNameToZoneTypeMap is a map that stores the mapping relationship between resource names to zone types for device zones
resourceNameToZoneTypeMap map[string]string

// needValidationResources is the resources needed to be validated
needValidationResources []string
}

// NewPodResourcesServerTopologyAdapter creates a topology adapter which uses pod resources server
func NewPodResourcesServerTopologyAdapter(metaServer *metaserver.MetaServer, qosConf *generic.QoSConfiguration, endpoints []string,
kubeletResourcePluginPaths []string, resourceNameToZoneTypeMap map[string]string, skipDeviceNames sets.String,
numaInfoGetter NumaInfoGetter, podResourcesFilter PodResourcesFilter, getClientFunc podresources.GetClientFunc) (Adapter, error) {
func NewPodResourcesServerTopologyAdapter(metaServer *metaserver.MetaServer, qosConf *generic.QoSConfiguration,
endpoints []string, kubeletResourcePluginPaths []string, resourceNameToZoneTypeMap map[string]string,
skipDeviceNames sets.String, numaInfoGetter NumaInfoGetter, podResourcesFilter PodResourcesFilter,
getClientFunc podresources.GetClientFunc, needValidationResources []string) (Adapter, error) {
numaInfo, err := numaInfoGetter()
if err != nil {
return nil, fmt.Errorf("failed to get numa info: %s", err)
Expand All @@ -125,6 +129,7 @@ func NewPodResourcesServerTopologyAdapter(metaServer *metaserver.MetaServer, qos
getClientFunc: getClientFunc,
podResourcesFilter: podResourcesFilter,
resourceNameToZoneTypeMap: resourceNameToZoneTypeMap,
needValidationResources: needValidationResources,
}, nil
}

Expand Down Expand Up @@ -158,7 +163,7 @@ func (p *topologyAdapterImpl) GetTopologyZones(parentCtx context.Context) ([]*no
}

// validate pod Resources server response to make sure report topology status is correct
if err = validatePodResourcesServerResponse(allocatableResources, listPodResourcesResponse); err != nil {
if err = p.validatePodResourcesServerResponse(allocatableResources, listPodResourcesResponse); err != nil {
return nil, errors.Wrap(err, "validate pod Resources server response failed")
}

Expand Down Expand Up @@ -286,14 +291,21 @@ func (p *topologyAdapterImpl) Run(ctx context.Context, handler func()) error {

// validatePodResourcesServerResponse validate pod resources server response, if the resource is empty,
// maybe the kubelet or qrm plugin is restarting
func validatePodResourcesServerResponse(allocatableResourcesResponse *podresv1.AllocatableResourcesResponse,
listPodResourcesResponse *podresv1.ListPodResourcesResponse) error {
if allocatableResourcesResponse == nil {
return fmt.Errorf("allocatable Resources response is nil")
}
func (p *topologyAdapterImpl) validatePodResourcesServerResponse(allocatableResourcesResponse *podresv1.
AllocatableResourcesResponse, listPodResourcesResponse *podresv1.ListPodResourcesResponse) error {
if len(p.needValidationResources) > 0 {
if allocatableResourcesResponse == nil {
return fmt.Errorf("allocatable resources response is nil")
}

if len(allocatableResourcesResponse.Resources) == 0 {
return fmt.Errorf("allocatable topology aware Resources is empty")
allocResSet := sets.NewString()
for _, res := range allocatableResourcesResponse.Resources {
allocResSet.Insert(res.ResourceName)
}

if !allocResSet.HasAll(p.needValidationResources...) {
return fmt.Errorf("allocatable resources response doen't contain all the resources that need to be validated")
}
}

if listPodResourcesResponse == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2967,7 +2967,7 @@ func Test_podResourcesServerTopologyAdapterImpl_Run(t *testing.T) {
notifier := make(chan struct{}, 1)
p, _ := NewPodResourcesServerTopologyAdapter(testMetaServer, generic.NewQoSConfiguration(),
endpoints, kubeletResourcePluginPath, nil,
nil, getNumaInfo, nil, podresources.GetV1Client)
nil, getNumaInfo, nil, podresources.GetV1Client, []string{"cpu", "memory"})
err = p.Run(ctx, func() {})
assert.NoError(t, err)

Expand Down
1 change: 1 addition & 0 deletions pkg/config/agent/reporter/kubelet_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type KubeletPluginConfiguration struct {
KubeletResourcePluginPaths []string
EnableReportTopologyPolicy bool
ResourceNameToZoneTypeMap map[string]string
NeedValidationResources []string
}

func NewKubeletPluginConfiguration() *KubeletPluginConfiguration {
Expand Down

0 comments on commit 0c698ae

Please sign in to comment.