Skip to content

Commit

Permalink
enhance memory plugin with advisor (#136)
Browse files Browse the repository at this point in the history
* fix(qrm): fix cpu allocation handlers log

* feat(qrm): enhance memory plugin with advisor

* fix(qrm): use NUMA balance stategy for blocks with fake NUMA id

* test(qrm): add unit-tests for memory plugin with advisor

* fix(qrm): fix cpu/memory state struct clone semantic

* refine(qrm): refine according to comments for memory plugin with advisor

* fix(test): fix write/read race for testing

---------

Co-authored-by: shaowei.wayne <shaowei.wayne@bytedance.com>
  • Loading branch information
csfldf and waynepeking348 authored Jul 18, 2023
1 parent ad3aff5 commit de23e44
Show file tree
Hide file tree
Showing 31 changed files with 1,912 additions and 133 deletions.
5 changes: 3 additions & 2 deletions cmd/katalyst-agent/app/options/global/qrmadvisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import (

// QRMAdvisorOptions holds the configurations for both qrm plugins and sys advisor qrm servers
type QRMAdvisorOptions struct {
CPUAdvisorSocketAbsPath string
CPUPluginSocketAbsPath string
CPUAdvisorSocketAbsPath string
CPUPluginSocketAbsPath string

MemoryAdvisorSocketAbsPath string
}

Expand Down
10 changes: 5 additions & 5 deletions cmd/katalyst-agent/app/options/qrm/cpu_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

type CPUOptions struct {
PolicyName string
EnableSysAdvisor bool
EnableCPUAdvisor bool
ReservedCPUCores int
SkipCPUStateCorruption bool
EnableCPUPressureEviction bool
Expand All @@ -35,7 +35,7 @@ type CPUOptions struct {
func NewCPUOptions() *CPUOptions {
return &CPUOptions{
PolicyName: "dynamic",
EnableSysAdvisor: false,
EnableCPUAdvisor: false,
ReservedCPUCores: 0,
SkipCPUStateCorruption: false,
EnableCPUPressureEviction: false,
Expand All @@ -49,8 +49,8 @@ func (o *CPUOptions) AddFlags(fss *cliflag.NamedFlagSets) {

fs.StringVar(&o.PolicyName, "cpu-resource-plugin-policy",
o.PolicyName, "The policy cpu resource plugin should use")
fs.BoolVar(&o.EnableSysAdvisor, "cpu-resource-plugin-advisor",
o.EnableSysAdvisor, "Whether cpu resource plugin should enable sys-advisor")
fs.BoolVar(&o.EnableCPUAdvisor, "cpu-resource-plugin-advisor",
o.EnableCPUAdvisor, "Whether cpu resource plugin should enable sys-advisor")
fs.IntVar(&o.ReservedCPUCores, "cpu-resource-plugin-reserved",
o.ReservedCPUCores, "The total cores cpu resource plugin should reserve")
fs.BoolVar(&o.SkipCPUStateCorruption, "skip-cpu-state-corruption",
Expand All @@ -66,7 +66,7 @@ func (o *CPUOptions) AddFlags(fss *cliflag.NamedFlagSets) {

func (o *CPUOptions) ApplyTo(conf *qrmconfig.CPUQRMPluginConfig) error {
conf.PolicyName = o.PolicyName
conf.EnableSysAdvisor = o.EnableSysAdvisor
conf.EnableCPUAdvisor = o.EnableCPUAdvisor
conf.ReservedCPUCores = o.ReservedCPUCores
conf.SkipCPUStateCorruption = o.SkipCPUStateCorruption
conf.EnableCPUPressureEviction = o.EnableCPUPressureEviction
Expand Down
9 changes: 9 additions & 0 deletions cmd/katalyst-agent/app/options/qrm/memory_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type MemoryOptions struct {
ReservedMemoryGB uint64
SkipMemoryStateCorruption bool
EnableSettingMemoryMigrate bool
EnableMemoryAdvisor bool
ExtraControlKnobConfigFile string
}

func NewMemoryOptions() *MemoryOptions {
Expand All @@ -35,6 +37,7 @@ func NewMemoryOptions() *MemoryOptions {
ReservedMemoryGB: 0,
SkipMemoryStateCorruption: false,
EnableSettingMemoryMigrate: false,
EnableMemoryAdvisor: false,
}
}

Expand All @@ -49,11 +52,17 @@ func (o *MemoryOptions) AddFlags(fss *cliflag.NamedFlagSets) {
o.SkipMemoryStateCorruption, "if set true, we will skip memory state corruption")
fs.BoolVar(&o.EnableSettingMemoryMigrate, "enable-setting-memory-migrate",
o.EnableSettingMemoryMigrate, "if set true, we will enable cpuset.memory_migrate for containers not numa_binding")
fs.BoolVar(&o.EnableMemoryAdvisor, "memory-resource-plugin-advisor",
o.EnableMemoryAdvisor, "Whether memory resource plugin should enable sys-advisor")
fs.StringVar(&o.ExtraControlKnobConfigFile, "memory-extra-control-knob-config-file",
o.ExtraControlKnobConfigFile, "the absolute path of extra control knob config file")
}
func (o *MemoryOptions) ApplyTo(conf *qrmconfig.MemoryQRMPluginConfig) error {
conf.PolicyName = o.PolicyName
conf.ReservedMemoryGB = o.ReservedMemoryGB
conf.SkipMemoryStateCorruption = o.SkipMemoryStateCorruption
conf.EnableSettingMemoryMigrate = o.EnableSettingMemoryMigrate
conf.EnableMemoryAdvisor = o.EnableMemoryAdvisor
conf.ExtraControlKnobConfigFile = o.ExtraControlKnobConfigFile
return nil
}
55 changes: 55 additions & 0 deletions pkg/agent/qrm-plugins/commonstate/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package commonstate

import "github.com/kubewharf/katalyst-core/pkg/util/general"

// ControlKnobInfo shows common types of control knobs:
// 1. applied by cgroup manager according to entryName, subEntryName, cgroupIfaceName and cgroupSubsysName
// 2. applied by QRM framework according to ociPropertyName
//
// there may be new types of control knobs,
// we won't modified this struct to identify them,
// and we will register custom per-control-knob executor to deal with them.
type ControlKnobInfo struct {
ControlKnobValue string `json:"control_knob_value"`

// for control knobs applied by cgroup manager
// according to entryName, subEntryName, cgroupIfaceName and cgroupSubsysName
CgroupVersionToIfaceName map[string]string `json:"cgroup_version_to_iface_name"`
CgroupSubsysName string `json:"cgroup_subsys_name"`

// for control knobs applied by QRM framework according to ociPropertyName
OciPropertyName string `json:"oci_property_name"`
}

func (cki ControlKnobInfo) Clone() ControlKnobInfo {
return ControlKnobInfo{
ControlKnobValue: cki.ControlKnobValue,
CgroupVersionToIfaceName: general.DeepCopyMap(cki.CgroupVersionToIfaceName),
CgroupSubsysName: cki.CgroupSubsysName,
OciPropertyName: cki.OciPropertyName,
}
}

type ExtraControlKnobConfigs map[string]ExtraControlKnobConfig

type ExtraControlKnobConfig struct {
PodExplicitlyAnnotationKey string `json:"pod_explicitly_annotation_key"`
QoSLevelToDefaultValue map[string]string `json:"qos_level_to_default_value"`
ControlKnobInfo `json:"control_knob_info"`
}
38 changes: 38 additions & 0 deletions pkg/agent/qrm-plugins/commonstate/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package commonstate

import (
"encoding/json"
"io/ioutil"
)

func LoadExtraControlKnobConfigs(extraControlKnobConfigAbsPath string) (ExtraControlKnobConfigs, error) {
configBytes, err := ioutil.ReadFile(extraControlKnobConfigAbsPath)
if err != nil {
return nil, err
}

extraControlKnobConfigs := make(ExtraControlKnobConfigs)

err = json.Unmarshal(configBytes, &extraControlKnobConfigs)
if err != nil {
return nil, err
}

return extraControlKnobConfigs, nil
}
10 changes: 5 additions & 5 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type DynamicPolicy struct {

// those are parsed from configurations
// todo if we want to use dynamic configuration, we'd better not use self-defined conf
enableCPUSysAdvisor bool
enableCPUAdvisor bool
reservedCPUs machine.CPUSet
cpuAdvisorSocketAbsPath string
cpuPluginSocketAbsPath string
Expand Down Expand Up @@ -186,7 +186,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
dynamicConfig: conf.DynamicAgentConfiguration,
cpuAdvisorSocketAbsPath: conf.CPUAdvisorSocketAbsPath,
cpuPluginSocketAbsPath: conf.CPUPluginSocketAbsPath,
enableCPUSysAdvisor: conf.CPUQRMPluginConfig.EnableSysAdvisor,
enableCPUAdvisor: conf.CPUQRMPluginConfig.EnableCPUAdvisor,
reservedCPUs: reservedCPUs,
extraStateFileAbsPath: conf.ExtraStateFileAbsPath,
enableSyncingCPUIdle: conf.CPUQRMPluginConfig.EnableSyncingCPUIdle,
Expand Down Expand Up @@ -286,7 +286,7 @@ func (p *DynamicPolicy) Start() (err error) {
}

// pre-check necessary dirs if sys-advisor is enabled
if !p.enableCPUSysAdvisor {
if !p.enableCPUAdvisor {
general.Infof("start dynamic policy cpu plugin without sys-advisor")
return nil
} else if p.cpuAdvisorSocketAbsPath == "" || p.cpuPluginSocketAbsPath == "" {
Expand Down Expand Up @@ -695,7 +695,7 @@ func (p *DynamicPolicy) Allocate(ctx context.Context,
p.Lock()
defer func() {
// calls sys-advisor to inform the latest container
if p.enableCPUSysAdvisor && respErr == nil && req.ContainerType != pluginapi.ContainerType_INIT {
if p.enableCPUAdvisor && respErr == nil && req.ContainerType != pluginapi.ContainerType_INIT {
_, err := p.advisorClient.AddContainer(ctx, &advisorsvc.AddContainerRequest{
PodUid: req.PodUid,
PodNamespace: req.PodNamespace,
Expand Down Expand Up @@ -788,7 +788,7 @@ func (p *DynamicPolicy) RemovePod(ctx context.Context,
}
}()

if p.enableCPUSysAdvisor {
if p.enableCPUAdvisor {
_, err = p.advisorClient.RemovePod(ctx, &advisorsvc.RemovePodRequest{PodUid: req.PodUid})
if err != nil {
return nil, fmt.Errorf("remove pod in QoS aware server failed with error: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/status"
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"
maputil "k8s.io/kubernetes/pkg/util/maps"

Expand Down Expand Up @@ -204,6 +205,8 @@ func (p *DynamicPolicy) pushCPUAdvisor() error {
// lwCPUAdvisorServer works as a client to connect with cpu-advisor.
// it will wait to receive allocations from cpu-advisor, and perform allocate actions
func (p *DynamicPolicy) lwCPUAdvisorServer(stopCh <-chan struct{}) error {
general.Infof("called")

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-stopCh
Expand All @@ -220,7 +223,8 @@ func (p *DynamicPolicy) lwCPUAdvisorServer(stopCh <-chan struct{}) error {
resp, err := stream.Recv()
if err != nil {
_ = p.emitter.StoreInt64(util.MetricNameLWCPUAdvisorServerFailed, 1, metrics.MetricTypeNameRaw)
return fmt.Errorf("receive ListAndWatch response of CPUAdvisorServer failed with error: %v", err)
return fmt.Errorf("receive ListAndWatch response of CPUAdvisorServer failed with error: %v, grpc code: %v",
err, status.Code(err))
}

err = p.allocateByCPUAdvisor(resp)
Expand Down Expand Up @@ -356,7 +360,10 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
blockID, err)
}

cpuset, err := calculator.TakeByTopology(machineInfo, availableCPUs, blockResult)
// use NUMA balance strategy to aviod changing memset as much as possible
// for blocks with faked NUMA id
var cpuset machine.CPUSet
cpuset, availableCPUs, err = calculator.TakeByNUMABalance(machineInfo, availableCPUs, blockResult)
if err != nil {
return nil, fmt.Errorf("allocate cpuset for non NUMA Aware block: %s failed with error: %v, availableCPUs: %d(%s), blockResult: %d",
blockID, err, availableCPUs.Size(), availableCPUs.String(), blockResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,11 @@ func (p *DynamicPolicy) allocateNumaBindingCPUs(numCPUs int, hint *pluginapi.Top
var err error
alignedCPUs, err = calculator.TakeByTopology(p.machineInfo, alignedAvailableCPUs, numCPUs)

general.ErrorS(err, "take cpu for NUMA not exclusive binding container failed",
"hints", hint.Nodes,
"alignedAvailableCPUs", alignedAvailableCPUs.String())

if err != nil {
general.ErrorS(err, "take cpu for NUMA not exclusive binding container failed",
"hints", hint.Nodes,
"alignedAvailableCPUs", alignedAvailableCPUs.String())

return machine.NewCPUSet(),
fmt.Errorf("take cpu for NUMA not exclusive binding container failed with err: %v", err)
}
Expand Down Expand Up @@ -479,7 +479,7 @@ func (p *DynamicPolicy) putAllocationsAndAdjustAllocationEntries(allocationInfos
machineState := p.state.GetMachineState()

var poolsQuantityMap map[string]int
if p.enableCPUSysAdvisor {
if p.enableCPUAdvisor {
// if sys advisor is enabled, we believe the pools' ratio that sys advisor indicates
poolsQuantityMap = machine.GetQuantityMap(entries.GetFilteredPoolsCPUSetMap(state.ResidentPools))
} else {
Expand Down Expand Up @@ -521,7 +521,7 @@ func (p *DynamicPolicy) adjustAllocationEntries() error {
// if sys advisor is enabled, we believe the pools' ratio that sys advisor indicates,
// else we do sum(containers req) for each pool to get pools ratio
var poolsQuantityMap map[string]int
if p.enableCPUSysAdvisor {
if p.enableCPUAdvisor {
poolsQuantityMap = machine.GetQuantityMap(entries.GetFilteredPoolsCPUSetMap(state.ResidentPools))
} else {
poolsQuantityMap = state.GetSharedQuantityMapFromPodEntries(entries, nil)
Expand Down Expand Up @@ -572,7 +572,7 @@ func (p *DynamicPolicy) adjustPoolsAndIsolatedEntries(poolsQuantityMap map[strin
// with the intersection of previous reclaim pool and non-ramp-up dedicated_cores numa_binding containers
func (p *DynamicPolicy) reclaimOverlapNUMABinding(poolsCPUSet map[string]machine.CPUSet, entries state.PodEntries) error {
// reclaimOverlapNUMABinding only works with cpu advisor and reclaim enabled
if !(p.enableCPUSysAdvisor && p.dynamicConfig.GetDynamicConfiguration().EnableReclaim) {
if !(p.enableCPUAdvisor && p.dynamicConfig.GetDynamicConfiguration().EnableReclaim) {
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (p *DynamicPolicy) clearResidualState() {
}

var rErr error
if p.enableCPUSysAdvisor {
if p.enableCPUAdvisor {
_, rErr = p.advisorClient.RemovePod(ctx, &advisorsvc.RemovePodRequest{
PodUid: podUID,
})
Expand Down
Loading

0 comments on commit de23e44

Please sign in to comment.