Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(qrm): implement allowing share_cores cpuset overlap with reclaimed_cores #597

Merged
merged 6 commits into from
Jul 3, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
const (
defaultEnableLoadEviction = false
defaultLoadUpperBoundRatio = 1.8
defaultLoadLowerBoundRatio = 1.0
defaultLoadThresholdMetPercentage = 0.8
defaultLoadMetricSize = 10
defaultLoadEvictionCoolDownTime = 300 * time.Second
Expand All @@ -40,6 +41,7 @@ const (
type CPUPressureEvictionOptions struct {
EnableLoadEviction bool
LoadUpperBoundRatio float64
LoadLowerBoundRatio float64
LoadThresholdMetPercentage float64
LoadMetricRingSize int
LoadEvictionCoolDownTime time.Duration
Expand All @@ -54,6 +56,7 @@ func NewCPUPressureEvictionOptions() *CPUPressureEvictionOptions {
return &CPUPressureEvictionOptions{
EnableLoadEviction: defaultEnableLoadEviction,
LoadUpperBoundRatio: defaultLoadUpperBoundRatio,
LoadLowerBoundRatio: defaultLoadLowerBoundRatio,
LoadThresholdMetPercentage: defaultLoadThresholdMetPercentage,
LoadMetricRingSize: defaultLoadMetricSize,
LoadEvictionCoolDownTime: defaultLoadEvictionCoolDownTime,
Expand All @@ -71,8 +74,11 @@ func (o *CPUPressureEvictionOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs.BoolVar(&o.EnableLoadEviction, "eviction-load-enable", o.EnableLoadEviction,
"set true to enable cpu load eviction")
fs.Float64Var(&o.LoadUpperBoundRatio, "eviction-load-upper-bound-ratio", o.LoadUpperBoundRatio,
"multiply the target cpuset pool size by this ration to get the load upper bound. "+
"multiply the target cpuset pool size by this ratio to get the load upper bound. "+
"if the load of the target cpuset pool is greater than the load upper bound repeatedly, the eviction will be triggered")
fs.Float64Var(&o.LoadLowerBoundRatio, "eviction-load-lower-bound-ratio", o.LoadLowerBoundRatio,
"multiply the target cpuset pool size by this ratio to get the load lower bound. "+
"if the load of the target cpuset pool is greater than the load lower bound repeatedly, the node taint will be triggered")
fs.Float64Var(&o.LoadThresholdMetPercentage, "eviction-load-threshold-met-percentage", o.LoadThresholdMetPercentage,
"the ratio between the times metric value over the bound value and the metric ring size is greater than this percentage "+
", the eviction or node taint will be triggered")
Expand All @@ -95,6 +101,7 @@ func (o *CPUPressureEvictionOptions) AddFlags(fss *cliflag.NamedFlagSets) {
func (o *CPUPressureEvictionOptions) ApplyTo(c *eviction.CPUPressureEvictionConfiguration) error {
c.EnableLoadEviction = o.EnableLoadEviction
c.LoadUpperBoundRatio = o.LoadUpperBoundRatio
c.LoadLowerBoundRatio = o.LoadLowerBoundRatio
c.LoadThresholdMetPercentage = o.LoadThresholdMetPercentage
c.LoadMetricRingSize = o.LoadMetricRingSize
c.LoadEvictionCoolDownTime = o.LoadEvictionCoolDownTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,24 @@ func (a *cpuAccumulator) freeCPUsInNUMANode(numaID int) []int {
return a.cpuDetails.CPUsInNUMANodes(numaID).ToSliceInt()
}

// freeCoresInNUMANode returns free core ids in specified NUMA
// freeCPUsInNUMANodeReversely returns free cpu ids in specified NUMA,
// and those returned cpu slices have already been sorted reversely
func (a *cpuAccumulator) freeCPUsInNUMANodeReversely(numaID int) []int {
return a.cpuDetails.CPUsInNUMANodes(numaID).ToSliceIntReversely()
}

// freeCoresInNUMANode returns free core ids in specified NUMA,
// and those returned cpu slices have already been sorted
func (a *cpuAccumulator) freeCoresInNUMANode(numaID int) []int {
return a.cpuDetails.CoresInNUMANodes(numaID).Filter(a.isCoreFree).ToSliceInt()
}

// freeCoresInNUMANode returns free core ids in specified NUMA,
// and those returned cpu slices have already been sorted reversely
func (a *cpuAccumulator) freeCoresInNUMANodeReversely(numaID int) []int {
return a.cpuDetails.CoresInNUMANodes(numaID).Filter(a.isCoreFree).ToSliceIntReversely()
}

// isSocketFree returns true if the supplied socket is fully available
func (a *cpuAccumulator) isSocketFree(socketID int) bool {
return a.cpuDetails.CPUsInSockets(socketID).Size() == a.getTopology().CPUsPerSocket()
Expand Down Expand Up @@ -301,7 +314,7 @@ successful:
}

// TakeHTByNUMABalance tries to make the allocated cpu spread on different
// sockets, and it uses cpu HT as the basic allocation unit
// NUMAs, and it uses cpu HT as the basic allocation unit
func TakeHTByNUMABalance(info *machine.KatalystMachineInfo, availableCPUs machine.CPUSet,
cpuRequirement int,
) (machine.CPUSet, machine.CPUSet, error) {
Expand Down Expand Up @@ -338,3 +351,55 @@ failed:
successful:
return acc.result.Clone(), availableCPUs.Difference(acc.result), nil
}

// TakeByNUMABalanceReversely tries to make the allocated cpu resersely spread on different
// NUMAs, and it uses cpu Cores as the basic allocation unit
func TakeByNUMABalanceReversely(info *machine.KatalystMachineInfo, availableCPUs machine.CPUSet,
cpuRequirement int,
) (machine.CPUSet, machine.CPUSet, error) {
var err error
acc := newCPUAccumulator(info, availableCPUs, cpuRequirement)
if acc.isSatisfied() {
goto successful
}

for {
if acc.isFailed() {
err = fmt.Errorf("not enough cpus available to satisfy request")
goto failed
}

numaLoop:
for _, s := range info.CPUDetails.NUMANodes().ToSliceInt() {
if acc.needs(acc.getTopology().CPUsPerCore()) && len(acc.freeCores()) > 0 {
for _, c := range acc.freeCoresInNUMANodeReversely(s) {
acc.take(acc.getDetails().CPUsInCores(c))
if acc.isSatisfied() {
goto successful
} else {
continue numaLoop
}
}
continue
}

for _, c := range acc.freeCPUsInNUMANodeReversely(s) {
if acc.needs(1) {
acc.take(machine.NewCPUSet(c))
}
if acc.isSatisfied() {
goto successful
} else {
break
}
}
}
}
failed:
if err == nil {
err = errors.New("failed to allocate cpus")
}
return availableCPUs, availableCPUs, err
successful:
return acc.result.Clone(), availableCPUs.Difference(acc.result), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ func (p *CPUPressureLoadEviction) collectMetrics(_ context.Context) {
return
}

allowSharedCoresOverlapReclaimedCores := p.state.GetAllowSharedCoresOverlapReclaimedCores()

pod2Pool := getPodPoolMapFunc(p.metaServer.MetaAgent, p.state)
p.clearExpiredMetricsHistory(pod2Pool)

Expand Down Expand Up @@ -388,7 +390,7 @@ func (p *CPUPressureLoadEviction) collectMetrics(_ context.Context) {
general.Warningf("metric: %s hasn't pool metric collecting handler, use default handler", metricName)
handler = p.collectPoolMetricDefault
}
handler(dynamicConfig, underPressure, metricName, poolsMetric[poolName][metricName], poolName, poolEntry.PoolSize, collectTime)
handler(dynamicConfig, underPressure, metricName, poolsMetric[poolName][metricName], poolName, poolEntry.PoolSize, collectTime, allowSharedCoresOverlapReclaimedCores)
}
}
}
Expand Down Expand Up @@ -440,20 +442,22 @@ func (p *CPUPressureLoadEviction) accumulateSharedPoolsLimit() int {
// collectPoolLoad is specifically used for cpu-load in pool level,
// and its upper-bound and lower-bound are calculated by pool size.
func (p *CPUPressureLoadEviction) collectPoolLoad(dynamicConfig *dynamic.Configuration, pressureByPoolSize bool,
metricName string, metricValue float64, poolName string, poolSize int, collectTime int64,
metricName string, metricValue float64, poolName string,
poolSize int, collectTime int64, allowSharedCoresOverlapReclaimedCores bool,
) {
snapshot := &MetricSnapshot{
Info: MetricInfo{
Name: metricName,
Value: metricValue,
UpperBound: float64(poolSize) * dynamicConfig.LoadUpperBoundRatio,
LowerBound: float64(poolSize),
LowerBound: float64(poolSize) * dynamicConfig.LoadLowerBoundRatio,
},
Time: collectTime,
}

useAdvisedThreshold := p.checkPressureWithAdvisedThreshold()
if useAdvisedThreshold {

if useAdvisedThreshold && !allowSharedCoresOverlapReclaimedCores {
// it must not be triggered when it's healthy
lowerBound := metricValue + 1
upperBound := lowerBound * dynamicConfig.LoadUpperBoundRatio
Expand Down Expand Up @@ -483,7 +487,7 @@ func (p *CPUPressureLoadEviction) collectPoolLoad(dynamicConfig *dynamic.Configu
// collectPoolMetricDefault is a common collect in pool level,
// and its upper-bound and lower-bound are not defined.
func (p *CPUPressureLoadEviction) collectPoolMetricDefault(dynamicConfig *dynamic.Configuration, _ bool,
metricName string, metricValue float64, poolName string, _ int, collectTime int64,
metricName string, metricValue float64, poolName string, _ int, collectTime int64, allowSharedCoresOverlapReclaimedCores bool,
) {
snapshot := &MetricSnapshot{
Info: MetricInfo{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func (se SubEntries) IsPoolEntry() bool {
type Entries map[string]SubEntries

type PoolMetricCollectHandler func(dynamicConfig *dynamic.Configuration, poolsUnderPressure bool,
metricName string, metricValue float64, poolName string, poolSize int, collectTime int64)
metricName string, metricValue float64, poolName string,
poolSize int, collectTime int64, allowSharedCoresOverlapReclaimedCores bool)

func (ring *MetricRing) Sum() float64 {
ring.RLock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (
defaultMetricRingSize = 1
defaultCPUPressureEvictionPodGracePeriodSeconds = -1
defaultLoadUpperBoundRatio = 1.8
defaultLoadLowerBoundRatio = 1.0
defaultLoadThresholdMetPercentage = 0.8
defaultReservedForAllocate = "4"
defaultReservedForReclaim = "4"
Expand All @@ -72,13 +73,14 @@ func makeMetaServer(metricsFetcher metrictypes.MetricsFetcher, cpuTopology *mach
return metaServer
}

func makeConf(metricRingSize int, gracePeriod int64, loadUpperBoundRatio,
func makeConf(metricRingSize int, gracePeriod int64, loadUpperBoundRatio, loadLowerBoundRatio,
loadThresholdMetPercentage float64, reservedForReclaim, reservedForAllocate string, reservedForSystem int,
) *config.Configuration {
conf := config.NewConfiguration()
conf.GetDynamicConfiguration().EnableLoadEviction = true
conf.GetDynamicConfiguration().LoadMetricRingSize = metricRingSize
conf.GetDynamicConfiguration().LoadUpperBoundRatio = loadUpperBoundRatio
conf.GetDynamicConfiguration().LoadLowerBoundRatio = loadLowerBoundRatio
conf.GetDynamicConfiguration().LoadThresholdMetPercentage = loadThresholdMetPercentage
conf.GetDynamicConfiguration().CPUPressureEvictionConfiguration.GracePeriod = gracePeriod
conf.GetDynamicConfiguration().ReservedResourceForAllocate = v1.ResourceList{
Expand Down Expand Up @@ -113,7 +115,9 @@ func TestNewCPUPressureLoadEviction(t *testing.T) {
cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4)
as.Nil(err)
conf := makeConf(defaultMetricRingSize, int64(defaultCPUPressureEvictionPodGracePeriodSeconds),
defaultLoadUpperBoundRatio, defaultLoadThresholdMetPercentage, defaultReservedForReclaim, defaultReservedForAllocate, defaultReservedForSystem)
defaultLoadUpperBoundRatio, defaultLoadLowerBoundRatio,
defaultLoadThresholdMetPercentage, defaultReservedForReclaim,
defaultReservedForAllocate, defaultReservedForSystem)
metaServer := makeMetaServer(metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}), cpuTopology)
stateImpl, err := makeState(cpuTopology)
as.Nil(err)
Expand All @@ -132,7 +136,9 @@ func TestThresholdMet(t *testing.T) {
cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4)
as.Nil(err)
conf := makeConf(defaultMetricRingSize, int64(defaultCPUPressureEvictionPodGracePeriodSeconds),
defaultLoadUpperBoundRatio, defaultLoadThresholdMetPercentage, defaultReservedForReclaim, defaultReservedForAllocate, defaultReservedForSystem)
defaultLoadUpperBoundRatio, defaultLoadLowerBoundRatio,
defaultLoadThresholdMetPercentage, defaultReservedForReclaim,
defaultReservedForAllocate, defaultReservedForSystem)
metaServer := makeMetaServer(metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}), cpuTopology)
stateImpl, err := makeState(cpuTopology)
as.Nil(err)
Expand Down Expand Up @@ -398,8 +404,8 @@ func TestThresholdMet(t *testing.T) {
as.Nil(err)
as.NotNil(t, metResp)

as.Equal(tt.wantMetType, metResp.MetType)
as.Equal(tt.wantEvictionScope, metResp.EvictionScope)
as.Equalf(tt.wantMetType, metResp.MetType, "case: %s", tt.name)
as.Equalf(tt.wantEvictionScope, metResp.EvictionScope, "case: %s", tt.name)
if tt.wantCondition != nil && metResp.Condition != nil {
as.Equal(*(tt.wantCondition), *(metResp.Condition))
} else {
Expand All @@ -416,7 +422,9 @@ func TestGetTopEvictionPods(t *testing.T) {
cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4)
as.Nil(err)
conf := makeConf(defaultMetricRingSize, int64(defaultCPUPressureEvictionPodGracePeriodSeconds),
defaultLoadUpperBoundRatio, defaultLoadThresholdMetPercentage, defaultReservedForReclaim, defaultReservedForAllocate, defaultReservedForSystem)
defaultLoadUpperBoundRatio, defaultLoadLowerBoundRatio,
defaultLoadThresholdMetPercentage, defaultReservedForReclaim,
defaultReservedForAllocate, defaultReservedForSystem)
metaServer := makeMetaServer(metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}), cpuTopology)
stateImpl, err := makeState(cpuTopology)
as.Nil(err)
Expand Down Expand Up @@ -1576,7 +1584,9 @@ func TestCPUPressureLoadEviction_collectMetrics(t *testing.T) {
cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4)
as.Nil(err)
conf := makeConf(defaultMetricRingSize, int64(defaultCPUPressureEvictionPodGracePeriodSeconds),
defaultLoadUpperBoundRatio, defaultLoadThresholdMetPercentage, tt.reservedCPUForReclaim, tt.reservedCPUForAllocate, tt.reservedCPUForSystem)
defaultLoadUpperBoundRatio, defaultLoadLowerBoundRatio,
defaultLoadThresholdMetPercentage, tt.reservedCPUForReclaim,
tt.reservedCPUForAllocate, tt.reservedCPUForSystem)
conf.GetDynamicConfiguration().EnableReclaim = tt.enableReclaim
stateImpl, err := makeState(cpuTopology)
as.Nil(err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func GetReadonlyState() (state.ReadonlyState, error) {
defer readonlyStateLock.RUnlock()

if readonlyState == nil {
return nil, fmt.Errorf("readonlyState isn't setted")
return nil, fmt.Errorf("readonlyState isn't set")
}
return readonlyState, nil
}
Expand Down Expand Up @@ -273,6 +273,7 @@ func (p *DynamicPolicy) Start() (err error) {
general.Infof("is already started")
return nil
}

p.stopCh = make(chan struct{})

go wait.Until(func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func (p *DynamicPolicy) lwCPUAdvisorServer(stopCh <-chan struct{}) error {
if err != nil {
general.Errorf("allocate by ListAndWatch response of CPUAdvisorServer failed with error: %v", err)
}

_ = general.UpdateHealthzStateByError(cpuconsts.CommunicateWithAdvisor, err)

if p.advisorMonitor != nil && err == nil {
Expand Down Expand Up @@ -281,6 +282,14 @@ func (p *DynamicPolicy) allocateByCPUAdvisor(resp *advisorapi.ListAndWatchRespon
return fmt.Errorf("applyBlocks failed with error: %v", applyErr)
}

curAllowSharedCoresOverlapReclaimedCores := p.state.GetAllowSharedCoresOverlapReclaimedCores()

if curAllowSharedCoresOverlapReclaimedCores != resp.AllowSharedCoresOverlapReclaimedCores {
general.Infof("set allowSharedCoresOverlapReclaimedCores from %v to %v",
curAllowSharedCoresOverlapReclaimedCores, resp.AllowSharedCoresOverlapReclaimedCores)
p.state.SetAllowSharedCoresOverlapReclaimedCores(resp.AllowSharedCoresOverlapReclaimedCores)
}

return nil
}

Expand Down
Loading
Loading