Skip to content

Commit

Permalink
Merge pull request #663 from cheney-lin/dev/refine_util_based
Browse files Browse the repository at this point in the history
refine pressure_supression eviction and getUtilBasedHeadroom
  • Loading branch information
nightmeng authored Jul 30, 2024
2 parents df45505 + 3f4179f commit 7d9189e
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/helper"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
Expand All @@ -40,18 +41,20 @@ import (
const EvictionNameSuppression = "cpu-pressure-suppression-plugin"

type CPUPressureSuppression struct {
conf *config.Configuration
state state.ReadonlyState
conf *config.Configuration
state state.ReadonlyState
metaServer *metaserver.MetaServer

lastToleranceTime sync.Map
}

func NewCPUPressureSuppressionEviction(_ metrics.MetricEmitter, _ *metaserver.MetaServer,
func NewCPUPressureSuppressionEviction(_ metrics.MetricEmitter, metaServer *metaserver.MetaServer,
conf *config.Configuration, state state.ReadonlyState,
) (CPUPressureEviction, error) {
return &CPUPressureSuppression{
conf: conf,
state: state,
conf: conf,
state: state,
metaServer: metaServer,
}, nil
}

Expand Down Expand Up @@ -90,6 +93,11 @@ func (p *CPUPressureSuppression) GetEvictPods(_ context.Context, request *plugin
return &pluginapi.GetEvictPodsResponse{}, nil
}

reclaimMetrics, err := helper.GetReclaimMetrics(poolCPUSet, p.conf.ReclaimRelativeRootCgroupPath, p.metaServer.MetricsFetcher)
if err != nil {
return nil, fmt.Errorf("get reclaim metrics failed: %s", err)
}

filteredPods := native.FilterPods(request.ActivePods, p.conf.CheckReclaimedQoSForPod)
if len(filteredPods) == 0 {
return &pluginapi.GetEvictPodsResponse{}, nil
Expand All @@ -107,13 +115,21 @@ func (p *CPUPressureSuppression) GetEvictPods(_ context.Context, request *plugin
for _, pod := range filteredPods {
totalCPURequest.Add(native.CPUQuantityGetter()(native.SumUpPodRequestResources(pod)))
}
general.Infof("total reclaim cpu request is %v, reclaim pool size is %v", totalCPURequest.String(), poolSize)

general.InfoS("info", "reclaim cpu request", totalCPURequest.String(),
"reclaim pool size", poolSize, "reclaimedCoresSupply", reclaimMetrics.ReclaimedCoresSupply,
"reclaimPoolUsage", reclaimMetrics.PoolCPUUsage, "reclaimedCoresUsage", reclaimMetrics.CgroupCPUUsage)

now := time.Now()
var evictPods []*v1alpha1.EvictPod
for _, pod := range filteredPods {
key := native.GenerateUniqObjectNameKey(pod)
poolSuppressionRate := float64(totalCPURequest.Value()) / float64(poolSize)
poolSuppressionRate := 0.0
if reclaimMetrics.ReclaimedCoresSupply == 0 {
poolSuppressionRate = math.MaxFloat64
} else {
poolSuppressionRate = float64(totalCPURequest.Value()) / reclaimMetrics.ReclaimedCoresSupply
}

if podToleranceRate := p.getPodToleranceRate(pod, dynamicConfig.MaxSuppressionToleranceRate); podToleranceRate < poolSuppressionRate {
last, _ := p.lastToleranceTime.LoadOrStore(key, now)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ import (
evictionpluginapi "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1"
qrmstate "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/config"
pkgconsts "github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
utilmetric "github.com/kubewharf/katalyst-core/pkg/util/metric"
)

const (
Expand All @@ -53,6 +55,7 @@ func makeSuppressionEvictionConf(cpuMaxSuppressionToleranceRate float64,
conf.GetDynamicConfiguration().EnableSuppressionEviction = true
conf.GetDynamicConfiguration().MaxSuppressionToleranceRate = cpuMaxSuppressionToleranceRate
conf.GetDynamicConfiguration().MinSuppressionToleranceDuration = cpuMinSuppressionToleranceDuration
conf.ReclaimRelativeRootCgroupPath = "test"
return conf
}

Expand All @@ -77,15 +80,7 @@ func TestCPUPressureSuppression_GetEvictPods(t *testing.T) {

as := require.New(t)

cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4)
as.Nil(err)
conf := makeSuppressionEvictionConf(defaultCPUMaxSuppressionToleranceRate, defaultCPUMinSuppressionToleranceDuration)
metaServer := makeMetaServer(metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}), cpuTopology)
stateImpl, err := makeState(cpuTopology)
as.Nil(err)

plugin, _ := NewCPUPressureSuppressionEviction(metrics.DummyMetrics{}, metaServer, conf, stateImpl)
as.NotNil(plugin)
now := time.Now()

pod1UID := string(uuid.NewUUID())
pod1Name := "pod-1"
Expand All @@ -95,6 +90,7 @@ func TestCPUPressureSuppression_GetEvictPods(t *testing.T) {
tests := []struct {
name string
podEntries qrmstate.PodEntries
setFakeMetric func(store *metric.FakeMetricsFetcher)
wantEvictPodUIDSet sets.String
}{
{
Expand Down Expand Up @@ -157,6 +153,20 @@ func TestCPUPressureSuppression_GetEvictPods(t *testing.T) {
},
},
wantEvictPodUIDSet: sets.NewString(),
setFakeMetric: func(store *metric.FakeMetricsFetcher) {
store.SetCPUMetric(1, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(3, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(4, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(5, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(6, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(9, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(11, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(12, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(13, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(14, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})

store.SetCgroupMetric("test", pkgconsts.MetricCPUUsageCgroup, utilmetric.MetricData{Value: 5, Time: &now})
},
},
{
name: "over tolerance rate",
Expand Down Expand Up @@ -253,72 +263,106 @@ func TestCPUPressureSuppression_GetEvictPods(t *testing.T) {
},
},
wantEvictPodUIDSet: sets.NewString(pod1UID),
setFakeMetric: func(store *metric.FakeMetricsFetcher) {
store.SetCPUMetric(1, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(3, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(4, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(5, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(6, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(9, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(11, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(12, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(13, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(14, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})

store.SetCgroupMetric("test", pkgconsts.MetricCPUUsageCgroup, utilmetric.MetricData{Value: 5, Time: &now})
},
},
}

for _, tt := range tests {
stateImpl, err := makeState(cpuTopology)
as.Nil(err)
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

pods := make([]*v1.Pod, 0, len(tt.podEntries))
cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4)
as.Nil(err)
conf := makeSuppressionEvictionConf(defaultCPUMaxSuppressionToleranceRate, defaultCPUMinSuppressionToleranceDuration)

for entryName, entries := range tt.podEntries {
for subEntryName, entry := range entries {
stateImpl.SetAllocationInfo(entryName, subEntryName, entry)
metricsFetcher := metric.NewFakeMetricsFetcher(metrics.DummyMetrics{})
store := metricsFetcher.(*metric.FakeMetricsFetcher)

if entries.IsPoolEntry() {
continue
}
metaServer := makeMetaServer(metricsFetcher, cpuTopology)
stateImpl, err := makeState(cpuTopology)
as.Nil(err)

pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(entry.PodUid),
Name: entry.PodName,
Namespace: entry.PodNamespace,
Annotations: maputil.CopySS(entry.Annotations),
Labels: maputil.CopySS(entry.Labels),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: entry.ContainerName,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
apiconsts.ReclaimedResourceMilliCPU: *resource.NewQuantity(int64(entry.RequestQuantity*1000), resource.DecimalSI),
},
Limits: v1.ResourceList{
apiconsts.ReclaimedResourceMilliCPU: *resource.NewQuantity(int64(entry.RequestQuantity*1000), resource.DecimalSI),
plugin, _ := NewCPUPressureSuppressionEviction(metrics.DummyMetrics{}, metaServer, conf, stateImpl)
as.NotNil(plugin)

pods := make([]*v1.Pod, 0, len(tt.podEntries))

if tt.setFakeMetric != nil {
tt.setFakeMetric(store)
}

for entryName, entries := range tt.podEntries {
for subEntryName, entry := range entries {
stateImpl.SetAllocationInfo(entryName, subEntryName, entry)

if entries.IsPoolEntry() {
continue
}

pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(entry.PodUid),
Name: entry.PodName,
Namespace: entry.PodNamespace,
Annotations: maputil.CopySS(entry.Annotations),
Labels: maputil.CopySS(entry.Labels),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: entry.ContainerName,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
apiconsts.ReclaimedResourceMilliCPU: *resource.NewQuantity(int64(entry.RequestQuantity*1000), resource.DecimalSI),
},
Limits: v1.ResourceList{
apiconsts.ReclaimedResourceMilliCPU: *resource.NewQuantity(int64(entry.RequestQuantity*1000), resource.DecimalSI),
},
},
},
},
},
},
}
}

pods = append(pods, pod)
pods = append(pods, pod)
}
}
}

plugin.(*CPUPressureSuppression).state = stateImpl
plugin.(*CPUPressureSuppression).state = stateImpl

resp, err := plugin.GetEvictPods(context.TODO(), &evictionpluginapi.GetEvictPodsRequest{
ActivePods: pods,
})
assert.NoError(t, err)
assert.NotNil(t, resp)
resp, err := plugin.GetEvictPods(context.TODO(), &evictionpluginapi.GetEvictPodsRequest{
ActivePods: pods,
})
assert.NoError(t, err)
assert.NotNil(t, resp)

time.Sleep(defaultCPUMinSuppressionToleranceDuration)

time.Sleep(defaultCPUMinSuppressionToleranceDuration)
resp, err = plugin.GetEvictPods(context.TODO(), &evictionpluginapi.GetEvictPodsRequest{
ActivePods: pods,
})
assert.NoError(t, err)
assert.NotNil(t, resp)

resp, err = plugin.GetEvictPods(context.TODO(), &evictionpluginapi.GetEvictPodsRequest{
ActivePods: pods,
evictPodUIDSet := sets.String{}
for _, pod := range resp.EvictPods {
evictPodUIDSet.Insert(string(pod.Pod.GetUID()))
}
assert.Equal(t, tt.wantEvictPodUIDSet, evictPodUIDSet)
})
assert.NoError(t, err)
assert.NotNil(t, resp)

evictPodUIDSet := sets.String{}
for _, pod := range resp.EvictPods {
evictPodUIDSet.Insert(string(pod.Pod.GetUID()))
}
assert.Equal(t, tt.wantEvictPodUIDSet, evictPodUIDSet)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/config"
pkgconsts "github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/helper"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
"github.com/kubewharf/katalyst-core/pkg/util/metric"
)

type HeadroomAssemblerCommon struct {
Expand Down Expand Up @@ -104,18 +103,9 @@ func (ha *HeadroomAssemblerCommon) GetHeadroom() (resource.Quantity, error) {
emptyNUMAs = emptyNUMAs.Difference(r.GetBindingNumas())
}

reclaimPoolUtil := 0.0

// add non binding reclaim pool size
reclaimPoolInfo, ok := ha.metaReader.GetPoolInfo(state.PoolNameReclaim)
if ok && reclaimPoolInfo != nil {

reclaimedMetrics, err := ha.getPoolMetrics(state.PoolNameReclaim)
if err != nil {
return resource.Quantity{}, err
}
reclaimPoolUtil = reclaimedMetrics.coreAvgUtil

reclaimPoolInfo, reclaimPoolExist := ha.metaReader.GetPoolInfo(state.PoolNameReclaim)
if reclaimPoolExist && reclaimPoolInfo != nil {
reclaimPoolNUMAs := machine.GetCPUAssignmentNUMAs(reclaimPoolInfo.TopologyAwareAssignments)

sharedCoresHeadroom := 0.0
Expand Down Expand Up @@ -150,31 +140,15 @@ func (ha *HeadroomAssemblerCommon) GetHeadroom() (resource.Quantity, error) {
general.InfoS("[qosaware-cpu] headroom assembled", "headroomTotal", headroomTotal, "backoffRetries",
ha.backoffRetries, "util based enabled", dynamicConfig.CPUUtilBasedConfiguration.Enable)

// if util based cpu headroom disable, just return total reclaim pool size as headroom
if !dynamicConfig.CPUUtilBasedConfiguration.Enable {
// if util based cpu headroom disable or reclaim pool not existed, just return total reclaim pool size as headroom
if !dynamicConfig.CPUUtilBasedConfiguration.Enable || !reclaimPoolExist || reclaimPoolInfo == nil {
return *resource.NewQuantity(int64(headroomTotal), resource.DecimalSI), nil
}

return ha.getUtilBasedHeadroom(dynamicConfig, int(headroomTotal), reclaimPoolUtil)
}

type poolMetrics struct {
coreAvgUtil float64
poolSize int
}

// getPoolMetrics get reclaimed pool metrics, including the average utilization of each core in
// the reclaimed pool and the size of the pool
func (ha *HeadroomAssemblerCommon) getPoolMetrics(poolName string) (*poolMetrics, error) {
reclaimedInfo, ok := ha.metaReader.GetPoolInfo(poolName)
if !ok {
return nil, fmt.Errorf("failed get reclaim pool info")
reclaimMetrics, err := helper.GetReclaimMetrics(reclaimPoolInfo.TopologyAwareAssignments.MergeCPUSet(), ha.conf.ReclaimRelativeRootCgroupPath, ha.metaServer.MetricsFetcher)
if err != nil {
return resource.Quantity{}, err
}

cpuSet := reclaimedInfo.TopologyAwareAssignments.MergeCPUSet()
m := ha.metaServer.AggregateCoreMetric(cpuSet, pkgconsts.MetricCPUUsageRatio, metric.AggregatorAvg)
return &poolMetrics{
coreAvgUtil: m.Value,
poolSize: cpuSet.Size(),
}, nil
return ha.getUtilBasedHeadroom(dynamicConfig, reclaimMetrics)
}
Loading

0 comments on commit 7d9189e

Please sign in to comment.