Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(reporter): validate specific resources in pod resources response #541

Merged
merged 1 commit into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/katalyst-agent/app/options/reporter/kubelet_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package reporter

import (
v1 "k8s.io/api/core/v1"
cliflag "k8s.io/component-base/cli/flag"
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"

Expand All @@ -28,6 +29,7 @@ type KubeletPluginOptions struct {
KubeletResourcePluginPaths []string
EnableReportTopologyPolicy bool
ResourceNameToZoneTypeMap map[string]string
NeedValidationResources []string
}

func NewKubeletPluginOptions() *KubeletPluginOptions {
Expand All @@ -40,6 +42,10 @@ func NewKubeletPluginOptions() *KubeletPluginOptions {
},
EnableReportTopologyPolicy: false,
ResourceNameToZoneTypeMap: make(map[string]string),
NeedValidationResources: []string{
string(v1.ResourceCPU),
string(v1.ResourceMemory),
},
}
}

Expand All @@ -54,13 +60,16 @@ func (o *KubeletPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
"whether to report topology policy")
fs.StringToStringVar(&o.ResourceNameToZoneTypeMap, "resource-name-to-zone-type-map", o.ResourceNameToZoneTypeMap,
"a map that stores the mapping relationship between resource names to zone types in KCNR (e.g. nvidia.com/gpu=GPU,...)")
fs.StringSliceVar(&o.NeedValidationResources, "need-validation-resources", o.NeedValidationResources,
"resources need to be validated")
}

func (o *KubeletPluginOptions) ApplyTo(c *reporter.KubeletPluginConfiguration) error {
c.PodResourcesServerEndpoints = o.PodResourcesServerEndpoints
c.KubeletResourcePluginPaths = o.KubeletResourcePluginPaths
c.EnableReportTopologyPolicy = o.EnableReportTopologyPolicy
c.ResourceNameToZoneTypeMap = o.ResourceNameToZoneTypeMap
c.NeedValidationResources = o.NeedValidationResources

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func NewKubeletReporterPlugin(emitter metrics.MetricEmitter, metaServer *metaser

topologyStatusAdapter, err := topology.NewPodResourcesServerTopologyAdapter(metaServer, conf.QoSConfiguration,
conf.PodResourcesServerEndpoints, conf.KubeletResourcePluginPaths, conf.ResourceNameToZoneTypeMap,
nil, p.getNumaInfo, topology.GenericPodResourcesFilter(conf.QoSConfiguration), podresources.GetV1Client)
nil, p.getNumaInfo, topology.GenericPodResourcesFilter(conf.QoSConfiguration), podresources.GetV1Client,
conf.NeedValidationResources)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,16 @@ type topologyAdapterImpl struct {

// resourceNameToZoneTypeMap is a map that stores the mapping relationship between resource names to zone types for device zones
resourceNameToZoneTypeMap map[string]string

// needValidationResources is the resources needed to be validated
needValidationResources []string
}

// NewPodResourcesServerTopologyAdapter creates a topology adapter which uses pod resources server
func NewPodResourcesServerTopologyAdapter(metaServer *metaserver.MetaServer, qosConf *generic.QoSConfiguration, endpoints []string,
kubeletResourcePluginPaths []string, resourceNameToZoneTypeMap map[string]string, skipDeviceNames sets.String,
numaInfoGetter NumaInfoGetter, podResourcesFilter PodResourcesFilter, getClientFunc podresources.GetClientFunc) (Adapter, error) {
func NewPodResourcesServerTopologyAdapter(metaServer *metaserver.MetaServer, qosConf *generic.QoSConfiguration,
endpoints []string, kubeletResourcePluginPaths []string, resourceNameToZoneTypeMap map[string]string,
skipDeviceNames sets.String, numaInfoGetter NumaInfoGetter, podResourcesFilter PodResourcesFilter,
getClientFunc podresources.GetClientFunc, needValidationResources []string) (Adapter, error) {
numaInfo, err := numaInfoGetter()
if err != nil {
return nil, fmt.Errorf("failed to get numa info: %s", err)
Expand All @@ -125,6 +129,7 @@ func NewPodResourcesServerTopologyAdapter(metaServer *metaserver.MetaServer, qos
getClientFunc: getClientFunc,
podResourcesFilter: podResourcesFilter,
resourceNameToZoneTypeMap: resourceNameToZoneTypeMap,
needValidationResources: needValidationResources,
}, nil
}

Expand Down Expand Up @@ -158,7 +163,7 @@ func (p *topologyAdapterImpl) GetTopologyZones(parentCtx context.Context) ([]*no
}

// validate pod Resources server response to make sure report topology status is correct
if err = validatePodResourcesServerResponse(allocatableResources, listPodResourcesResponse); err != nil {
if err = p.validatePodResourcesServerResponse(allocatableResources, listPodResourcesResponse); err != nil {
return nil, errors.Wrap(err, "validate pod Resources server response failed")
}

Expand Down Expand Up @@ -286,14 +291,21 @@ func (p *topologyAdapterImpl) Run(ctx context.Context, handler func()) error {

// validatePodResourcesServerResponse validate pod resources server response, if the resource is empty,
// maybe the kubelet or qrm plugin is restarting
func validatePodResourcesServerResponse(allocatableResourcesResponse *podresv1.AllocatableResourcesResponse,
listPodResourcesResponse *podresv1.ListPodResourcesResponse) error {
if allocatableResourcesResponse == nil {
return fmt.Errorf("allocatable Resources response is nil")
}
func (p *topologyAdapterImpl) validatePodResourcesServerResponse(allocatableResourcesResponse *podresv1.
AllocatableResourcesResponse, listPodResourcesResponse *podresv1.ListPodResourcesResponse) error {
if len(p.needValidationResources) > 0 {
if allocatableResourcesResponse == nil {
return fmt.Errorf("allocatable resources response is nil")
}

if len(allocatableResourcesResponse.Resources) == 0 {
return fmt.Errorf("allocatable topology aware Resources is empty")
allocResSet := sets.NewString()
for _, res := range allocatableResourcesResponse.Resources {
allocResSet.Insert(res.ResourceName)
}

if !allocResSet.HasAll(p.needValidationResources...) {
return fmt.Errorf("allocatable resources response doen't contain all the resources that need to be validated")
}
}

if listPodResourcesResponse == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2956,7 +2956,7 @@ func Test_podResourcesServerTopologyAdapterImpl_Run(t *testing.T) {
notifier := make(chan struct{}, 1)
p, _ := NewPodResourcesServerTopologyAdapter(testMetaServer, generic.NewQoSConfiguration(),
endpoints, kubeletResourcePluginPath, nil,
nil, getNumaInfo, nil, podresources.GetV1Client)
nil, getNumaInfo, nil, podresources.GetV1Client, []string{"cpu", "memory"})
err = p.Run(ctx, func() {})
assert.NoError(t, err)

Expand Down
1 change: 1 addition & 0 deletions pkg/config/agent/reporter/kubelet_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type KubeletPluginConfiguration struct {
KubeletResourcePluginPaths []string
EnableReportTopologyPolicy bool
ResourceNameToZoneTypeMap map[string]string
NeedValidationResources []string
}

func NewKubeletPluginConfiguration() *KubeletPluginConfiguration {
Expand Down
Loading