Skip to content

Commit

Permalink
feat(qrm): use state to cache AllowSharedCoresOverlapReclaimedCores
Browse files Browse the repository at this point in the history
  • Loading branch information
csfldf committed Jun 3, 2024
1 parent d4cd4a9 commit 780d19a
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ type CPUPressureLoadEviction struct {

systemReservedCPUs machine.CPUSet

//allowSharedCoresOverlapReclaimedCores bool
//allowSharedCoresOverlapReclaimedCoresLock sync.RWMutex
// allowSharedCoresOverlapReclaimedCores bool
// allowSharedCoresOverlapReclaimedCoresLock sync.RWMutex
}

func NewCPUPressureLoadEviction(emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer,
Expand Down Expand Up @@ -339,12 +339,7 @@ func (p *CPUPressureLoadEviction) collectMetrics(_ context.Context) {
return
}

allowSharedCoresOverlapReclaimedCores, err := state.GetAllowSharedCoresOverlapReclaimedCores()

if err != nil {
general.Errorf("GetAllowSharedCoresOverlapReclaimedCores failed with error: %v", err)
return
}
allowSharedCoresOverlapReclaimedCores := p.state.GetAllowSharedCoresOverlapReclaimedCores()

pod2Pool := getPodPoolMapFunc(p.metaServer.MetaAgent, p.state)
p.clearExpiredMetricsHistory(pod2Pool)
Expand Down Expand Up @@ -462,7 +457,8 @@ func (p *CPUPressureLoadEviction) accumulateSharedPoolsLimit() int {
// and its upper-bound and lower-bound are calculated by pool size.
func (p *CPUPressureLoadEviction) collectPoolLoad(dynamicConfig *dynamic.Configuration, pressureByPoolSize bool,
metricName string, metricValue float64, poolName string,
poolSize int, collectTime int64, allowSharedCoresOverlapReclaimedCores bool) {
poolSize int, collectTime int64, allowSharedCoresOverlapReclaimedCores bool,
) {
snapshot := &MetricSnapshot{
Info: MetricInfo{
Name: metricName,
Expand Down
32 changes: 14 additions & 18 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,20 @@ type DynamicPolicy struct {

// those are parsed from configurations
// todo if we want to use dynamic configuration, we'd better not use self-defined conf
enableCPUAdvisor bool
reservedCPUs machine.CPUSet
cpuAdvisorSocketAbsPath string
cpuPluginSocketAbsPath string
extraStateFileAbsPath string
enableCPUIdle bool
enableSyncingCPUIdle bool
reclaimRelativeRootCgroupPath string
qosConfig *generic.QoSConfiguration
dynamicConfig *dynamicconfig.DynamicAgentConfiguration
podDebugAnnoKeys []string
transitionPeriod time.Duration
cpuNUMAHintPreferPolicy string
cpuNUMAHintPreferLowThreshold float64
allowSharedCoresOverlapReclaimedCores bool
enableCPUAdvisor bool
reservedCPUs machine.CPUSet
cpuAdvisorSocketAbsPath string
cpuPluginSocketAbsPath string
extraStateFileAbsPath string
enableCPUIdle bool
enableSyncingCPUIdle bool
reclaimRelativeRootCgroupPath string
qosConfig *generic.QoSConfiguration
dynamicConfig *dynamicconfig.DynamicAgentConfiguration
podDebugAnnoKeys []string
transitionPeriod time.Duration
cpuNUMAHintPreferPolicy string
cpuNUMAHintPreferLowThreshold float64
}

func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration,
Expand Down Expand Up @@ -275,9 +274,6 @@ func (p *DynamicPolicy) Start() (err error) {
return nil
}

p.allowSharedCoresOverlapReclaimedCores = p.dynamicConfig.GetDynamicConfiguration().AllowSharedCoresOverlapReclaimedCores
state.SetAllowSharedCoresOverlapReclaimedCores(p.allowSharedCoresOverlapReclaimedCores)

p.stopCh = make(chan struct{})

go wait.Until(func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,12 @@ func (p *DynamicPolicy) allocateByCPUAdvisor(resp *advisorapi.ListAndWatchRespon
return fmt.Errorf("applyBlocks failed with error: %v", applyErr)
}

if p.allowSharedCoresOverlapReclaimedCores != resp.AllowSharedCoresOverlapReclaimedCores {
curAllowSharedCoresOverlapReclaimedCores := p.state.GetAllowSharedCoresOverlapReclaimedCores()

if curAllowSharedCoresOverlapReclaimedCores != resp.AllowSharedCoresOverlapReclaimedCores {
general.Infof("set allowSharedCoresOverlapReclaimedCores from %v to %v",
p.allowSharedCoresOverlapReclaimedCores, resp.AllowSharedCoresOverlapReclaimedCores)
p.allowSharedCoresOverlapReclaimedCores = resp.AllowSharedCoresOverlapReclaimedCores
state.SetAllowSharedCoresOverlapReclaimedCores(p.allowSharedCoresOverlapReclaimedCores)
curAllowSharedCoresOverlapReclaimedCores, resp.AllowSharedCoresOverlapReclaimedCores)
p.state.SetAllowSharedCoresOverlapReclaimedCores(resp.AllowSharedCoresOverlapReclaimedCores)
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1060,14 +1060,14 @@ func (p *DynamicPolicy) generateNUMABindingPoolsCPUSetInPlace(poolsCPUSet map[st

var tErr error
var leftCPUs machine.CPUSet
if numaPoolsTotalQuantity <= availableSize && enableReclaim && !p.allowSharedCoresOverlapReclaimedCores {
if numaPoolsTotalQuantity <= availableSize && enableReclaim && !p.state.GetAllowSharedCoresOverlapReclaimedCores() {
leftCPUs, tErr = p.takeCPUsForPoolsInPlace(numaPoolsToQuantityMap, poolsCPUSet, numaAvailableCPUs)
if tErr != nil {
return originalAvailableCPUSet, fmt.Errorf("allocate cpus for numa_binding pools in NUMA: %d failed with error: %v",
numaID, tErr)
}
} else {
// numaPoolsTotalQuantity > availableSize || !enableReclaim || p.allowSharedCoresOverlapReclaimedCores
// numaPoolsTotalQuantity > availableSize || !enableReclaim || p.state.GetAllowSharedCoresOverlapReclaimedCores()
// both allocate all numaAvailableCPUs proportionally
leftCPUs, tErr = p.generateProportionalPoolsCPUSetInPlace(numaPoolsToQuantityMap, poolsCPUSet, numaAvailableCPUs)

Expand Down Expand Up @@ -1210,7 +1210,7 @@ func (p *DynamicPolicy) generatePoolsAndIsolation(poolsQuantityMap map[string]ma
// deal with reclaim pool
poolsCPUSet[state.PoolNameReclaim] = poolsCPUSet[state.PoolNameReclaim].Union(availableCPUs)

if !p.allowSharedCoresOverlapReclaimedCores {
if !p.state.GetAllowSharedCoresOverlapReclaimedCores() {
if poolsCPUSet[state.PoolNameReclaim].IsEmpty() {
// for reclaimed pool, we must make them exist when the node isn't in hybrid mode even if cause overlap
allAvailableCPUs := p.machineInfo.CPUDetails.CPUs().Difference(p.reservedCPUs)
Expand All @@ -1232,7 +1232,7 @@ func (p *DynamicPolicy) generatePoolsAndIsolation(poolsQuantityMap map[string]ma
state.PoolNameReclaim, poolsCPUSet[state.PoolNameReclaim].String())
}
} else {
// p.allowSharedCoresOverlapReclaimedCores == true
// p.state.GetAllowSharedCoresOverlapReclaimedCores() == true
for poolName, cset := range poolsCPUSet {
if ratio, found := reclaimOverlapShareRatio[poolName]; found && ratio > 0 {

Expand Down Expand Up @@ -1511,7 +1511,7 @@ func (p *DynamicPolicy) doAndCheckPutAllocationInfo(allocationInfo *state.Alloca
}

func (p *DynamicPolicy) getReclaimOverlapShareRatio(entries state.PodEntries) (map[string]float64, error) {
if !p.allowSharedCoresOverlapReclaimedCores {
if !p.state.GetAllowSharedCoresOverlapReclaimedCores() {
return nil, nil
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (
var _ checkpointmanager.Checkpoint = &CPUPluginCheckpoint{}

type CPUPluginCheckpoint struct {
PolicyName string `json:"policyName"`
MachineState NUMANodeMap `json:"machineState"`
PodEntries PodEntries `json:"pod_entries"`
Checksum checksum.Checksum `json:"checksum"`
PolicyName string `json:"policyName"`
MachineState NUMANodeMap `json:"machineState"`
PodEntries PodEntries `json:"pod_entries"`
AllowSharedCoresOverlapReclaimedCores bool `json:"allow_shared_cores_overlap_reclaimed_cores"`
Checksum checksum.Checksum `json:"checksum"`
}

func NewCPUPluginCheckpoint() *CPUPluginCheckpoint {
Expand Down
32 changes: 0 additions & 32 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/global_status.go

This file was deleted.

2 changes: 2 additions & 0 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ type reader interface {
GetMachineState() NUMANodeMap
GetPodEntries() PodEntries
GetAllocationInfo(podUID string, containerName string) *AllocationInfo
GetAllowSharedCoresOverlapReclaimedCores() bool
}

// writer is used to store information into local states,
Expand All @@ -704,6 +705,7 @@ type writer interface {
SetMachineState(numaNodeMap NUMANodeMap)
SetPodEntries(podEntries PodEntries)
SetAllocationInfo(podUID string, containerName string, allocationInfo *AllocationInfo)
SetAllowSharedCoresOverlapReclaimedCores(allowSharedCoresOverlapReclaimedCores bool)

Delete(podUID string, containerName string)
ClearState()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (sc *stateCheckpoint) restoreState(topology *machine.CPUTopology) error {

sc.cache.SetMachineState(generatedMachineState)
sc.cache.SetPodEntries(checkpoint.PodEntries)
sc.cache.SetAllowSharedCoresOverlapReclaimedCores(checkpoint.AllowSharedCoresOverlapReclaimedCores)

if !reflect.DeepEqual(generatedMachineState, checkpoint.MachineState) {
klog.Warningf("[cpu_plugin] machine state changed: generatedMachineState: %s; checkpointMachineState: %s",
Expand Down Expand Up @@ -128,6 +129,7 @@ func (sc *stateCheckpoint) storeState() error {
checkpoint.PolicyName = sc.policyName
checkpoint.MachineState = sc.cache.GetMachineState()
checkpoint.PodEntries = sc.cache.GetPodEntries()
checkpoint.AllowSharedCoresOverlapReclaimedCores = sc.cache.GetAllowSharedCoresOverlapReclaimedCores()

err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
if err != nil {
Expand Down Expand Up @@ -187,10 +189,28 @@ func (sc *stateCheckpoint) SetPodEntries(podEntries PodEntries) {
sc.cache.SetPodEntries(podEntries)
err := sc.storeState()
if err != nil {
klog.ErrorS(err, "[cpu_plugin] store pod entries to checkpoint error", "err")
klog.ErrorS(err, "[cpu_plugin] store pod entries to checkpoint error")
}
}

func (sc *stateCheckpoint) SetAllowSharedCoresOverlapReclaimedCores(allowSharedCoresOverlapReclaimedCores bool) {
sc.Lock()
defer sc.Unlock()

sc.cache.SetAllowSharedCoresOverlapReclaimedCores(allowSharedCoresOverlapReclaimedCores)
err := sc.storeState()
if err != nil {
klog.ErrorS(err, "[cpu_plugin] store allowSharedCoresOverlapReclaimedCores to checkpoint error")
}
}

func (sc *stateCheckpoint) GetAllowSharedCoresOverlapReclaimedCores() bool {
sc.RLock()
defer sc.RUnlock()

return sc.cache.GetAllowSharedCoresOverlapReclaimedCores()
}

func (sc *stateCheckpoint) Delete(podUID string, containerName string) {
sc.Lock()
defer sc.Unlock()
Expand Down
24 changes: 21 additions & 3 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ type cpuPluginState struct {

cpuTopology *machine.CPUTopology

podEntries PodEntries
machineState NUMANodeMap
socketTopology map[int]string
podEntries PodEntries
machineState NUMANodeMap
allowSharedCoresOverlapReclaimedCores bool
socketTopology map[int]string
}

var _ State = &cpuPluginState{}
Expand Down Expand Up @@ -121,6 +122,23 @@ func (s *cpuPluginState) SetPodEntries(podEntries PodEntries) {
"podEntries", podEntries.String())
}

func (s *cpuPluginState) SetAllowSharedCoresOverlapReclaimedCores(allowSharedCoresOverlapReclaimedCores bool) {
s.Lock()
defer s.Unlock()

klog.InfoS("[cpu_plugin] Updated allowSharedCoresOverlapReclaimedCores",
"allowSharedCoresOverlapReclaimedCores", allowSharedCoresOverlapReclaimedCores)

s.allowSharedCoresOverlapReclaimedCores = allowSharedCoresOverlapReclaimedCores
}

func (s *cpuPluginState) GetAllowSharedCoresOverlapReclaimedCores() bool {
s.RLock()
defer s.RUnlock()

return s.allowSharedCoresOverlapReclaimedCores
}

func (s *cpuPluginState) Delete(podUID string, containerName string) {
s.Lock()
defer s.Unlock()
Expand Down

0 comments on commit 780d19a

Please sign in to comment.