Skip to content

Commit

Permalink
refine pressure_supression eviction
Browse files Browse the repository at this point in the history
when allow_shared_cores_overlap_reclaimed = true, the workloads of reclaim pool
include shared_cores and reclaimed_cores, so actual resource supply for reclaimed_cores
should be idle of pool plus reclaim usage, so GetEvictPods of pressure_supression is
adapated here.

Signed-off-by: linzhecheng <linzhecheng@bytedance.com>
  • Loading branch information
cheney-lin committed Jul 30, 2024
1 parent aabfc53 commit 3f4179f
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/helper"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
Expand All @@ -40,18 +41,20 @@ import (
const EvictionNameSuppression = "cpu-pressure-suppression-plugin"

type CPUPressureSuppression struct {
conf *config.Configuration
state state.ReadonlyState
conf *config.Configuration
state state.ReadonlyState
metaServer *metaserver.MetaServer

lastToleranceTime sync.Map
}

func NewCPUPressureSuppressionEviction(_ metrics.MetricEmitter, _ *metaserver.MetaServer,
func NewCPUPressureSuppressionEviction(_ metrics.MetricEmitter, metaServer *metaserver.MetaServer,
conf *config.Configuration, state state.ReadonlyState,
) (CPUPressureEviction, error) {
return &CPUPressureSuppression{
conf: conf,
state: state,
conf: conf,
state: state,
metaServer: metaServer,
}, nil
}

Expand Down Expand Up @@ -90,6 +93,11 @@ func (p *CPUPressureSuppression) GetEvictPods(_ context.Context, request *plugin
return &pluginapi.GetEvictPodsResponse{}, nil
}

reclaimMetrics, err := helper.GetReclaimMetrics(poolCPUSet, p.conf.ReclaimRelativeRootCgroupPath, p.metaServer.MetricsFetcher)
if err != nil {
return nil, fmt.Errorf("get reclaim metrics failed: %s", err)
}

filteredPods := native.FilterPods(request.ActivePods, p.conf.CheckReclaimedQoSForPod)
if len(filteredPods) == 0 {
return &pluginapi.GetEvictPodsResponse{}, nil
Expand All @@ -107,13 +115,21 @@ func (p *CPUPressureSuppression) GetEvictPods(_ context.Context, request *plugin
for _, pod := range filteredPods {
totalCPURequest.Add(native.CPUQuantityGetter()(native.SumUpPodRequestResources(pod)))
}
general.Infof("total reclaim cpu request is %v, reclaim pool size is %v", totalCPURequest.String(), poolSize)

general.InfoS("info", "reclaim cpu request", totalCPURequest.String(),
"reclaim pool size", poolSize, "reclaimedCoresSupply", reclaimMetrics.ReclaimedCoresSupply,
"reclaimPoolUsage", reclaimMetrics.PoolCPUUsage, "reclaimedCoresUsage", reclaimMetrics.CgroupCPUUsage)

now := time.Now()
var evictPods []*v1alpha1.EvictPod
for _, pod := range filteredPods {
key := native.GenerateUniqObjectNameKey(pod)
poolSuppressionRate := float64(totalCPURequest.Value()) / float64(poolSize)
poolSuppressionRate := 0.0
if reclaimMetrics.ReclaimedCoresSupply == 0 {
poolSuppressionRate = math.MaxFloat64
} else {
poolSuppressionRate = float64(totalCPURequest.Value()) / reclaimMetrics.ReclaimedCoresSupply
}

if podToleranceRate := p.getPodToleranceRate(pod, dynamicConfig.MaxSuppressionToleranceRate); podToleranceRate < poolSuppressionRate {
last, _ := p.lastToleranceTime.LoadOrStore(key, now)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ import (
evictionpluginapi "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1"
qrmstate "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/config"
pkgconsts "github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
utilmetric "github.com/kubewharf/katalyst-core/pkg/util/metric"
)

const (
Expand All @@ -53,6 +55,7 @@ func makeSuppressionEvictionConf(cpuMaxSuppressionToleranceRate float64,
conf.GetDynamicConfiguration().EnableSuppressionEviction = true
conf.GetDynamicConfiguration().MaxSuppressionToleranceRate = cpuMaxSuppressionToleranceRate
conf.GetDynamicConfiguration().MinSuppressionToleranceDuration = cpuMinSuppressionToleranceDuration
conf.ReclaimRelativeRootCgroupPath = "test"
return conf
}

Expand All @@ -77,15 +80,7 @@ func TestCPUPressureSuppression_GetEvictPods(t *testing.T) {

as := require.New(t)

cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4)
as.Nil(err)
conf := makeSuppressionEvictionConf(defaultCPUMaxSuppressionToleranceRate, defaultCPUMinSuppressionToleranceDuration)
metaServer := makeMetaServer(metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}), cpuTopology)
stateImpl, err := makeState(cpuTopology)
as.Nil(err)

plugin, _ := NewCPUPressureSuppressionEviction(metrics.DummyMetrics{}, metaServer, conf, stateImpl)
as.NotNil(plugin)
now := time.Now()

pod1UID := string(uuid.NewUUID())
pod1Name := "pod-1"
Expand All @@ -95,6 +90,7 @@ func TestCPUPressureSuppression_GetEvictPods(t *testing.T) {
tests := []struct {
name string
podEntries qrmstate.PodEntries
setFakeMetric func(store *metric.FakeMetricsFetcher)
wantEvictPodUIDSet sets.String
}{
{
Expand Down Expand Up @@ -157,6 +153,20 @@ func TestCPUPressureSuppression_GetEvictPods(t *testing.T) {
},
},
wantEvictPodUIDSet: sets.NewString(),
setFakeMetric: func(store *metric.FakeMetricsFetcher) {
store.SetCPUMetric(1, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(3, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(4, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(5, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(6, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(9, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(11, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(12, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(13, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(14, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})

store.SetCgroupMetric("test", pkgconsts.MetricCPUUsageCgroup, utilmetric.MetricData{Value: 5, Time: &now})
},
},
{
name: "over tolerance rate",
Expand Down Expand Up @@ -253,72 +263,106 @@ func TestCPUPressureSuppression_GetEvictPods(t *testing.T) {
},
},
wantEvictPodUIDSet: sets.NewString(pod1UID),
setFakeMetric: func(store *metric.FakeMetricsFetcher) {
store.SetCPUMetric(1, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(3, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(4, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(5, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(6, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(9, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(11, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(12, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(13, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})
store.SetCPUMetric(14, pkgconsts.MetricCPUUsageRatio, utilmetric.MetricData{Value: 0.5, Time: &now})

store.SetCgroupMetric("test", pkgconsts.MetricCPUUsageCgroup, utilmetric.MetricData{Value: 5, Time: &now})
},
},
}

for _, tt := range tests {
stateImpl, err := makeState(cpuTopology)
as.Nil(err)
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

pods := make([]*v1.Pod, 0, len(tt.podEntries))
cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4)
as.Nil(err)
conf := makeSuppressionEvictionConf(defaultCPUMaxSuppressionToleranceRate, defaultCPUMinSuppressionToleranceDuration)

for entryName, entries := range tt.podEntries {
for subEntryName, entry := range entries {
stateImpl.SetAllocationInfo(entryName, subEntryName, entry)
metricsFetcher := metric.NewFakeMetricsFetcher(metrics.DummyMetrics{})
store := metricsFetcher.(*metric.FakeMetricsFetcher)

if entries.IsPoolEntry() {
continue
}
metaServer := makeMetaServer(metricsFetcher, cpuTopology)
stateImpl, err := makeState(cpuTopology)
as.Nil(err)

pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(entry.PodUid),
Name: entry.PodName,
Namespace: entry.PodNamespace,
Annotations: maputil.CopySS(entry.Annotations),
Labels: maputil.CopySS(entry.Labels),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: entry.ContainerName,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
apiconsts.ReclaimedResourceMilliCPU: *resource.NewQuantity(int64(entry.RequestQuantity*1000), resource.DecimalSI),
},
Limits: v1.ResourceList{
apiconsts.ReclaimedResourceMilliCPU: *resource.NewQuantity(int64(entry.RequestQuantity*1000), resource.DecimalSI),
plugin, _ := NewCPUPressureSuppressionEviction(metrics.DummyMetrics{}, metaServer, conf, stateImpl)
as.NotNil(plugin)

pods := make([]*v1.Pod, 0, len(tt.podEntries))

if tt.setFakeMetric != nil {
tt.setFakeMetric(store)
}

for entryName, entries := range tt.podEntries {
for subEntryName, entry := range entries {
stateImpl.SetAllocationInfo(entryName, subEntryName, entry)

if entries.IsPoolEntry() {
continue
}

pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(entry.PodUid),
Name: entry.PodName,
Namespace: entry.PodNamespace,
Annotations: maputil.CopySS(entry.Annotations),
Labels: maputil.CopySS(entry.Labels),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: entry.ContainerName,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
apiconsts.ReclaimedResourceMilliCPU: *resource.NewQuantity(int64(entry.RequestQuantity*1000), resource.DecimalSI),
},
Limits: v1.ResourceList{
apiconsts.ReclaimedResourceMilliCPU: *resource.NewQuantity(int64(entry.RequestQuantity*1000), resource.DecimalSI),
},
},
},
},
},
},
}
}

pods = append(pods, pod)
pods = append(pods, pod)
}
}
}

plugin.(*CPUPressureSuppression).state = stateImpl
plugin.(*CPUPressureSuppression).state = stateImpl

resp, err := plugin.GetEvictPods(context.TODO(), &evictionpluginapi.GetEvictPodsRequest{
ActivePods: pods,
})
assert.NoError(t, err)
assert.NotNil(t, resp)
resp, err := plugin.GetEvictPods(context.TODO(), &evictionpluginapi.GetEvictPodsRequest{
ActivePods: pods,
})
assert.NoError(t, err)
assert.NotNil(t, resp)

time.Sleep(defaultCPUMinSuppressionToleranceDuration)

time.Sleep(defaultCPUMinSuppressionToleranceDuration)
resp, err = plugin.GetEvictPods(context.TODO(), &evictionpluginapi.GetEvictPodsRequest{
ActivePods: pods,
})
assert.NoError(t, err)
assert.NotNil(t, resp)

resp, err = plugin.GetEvictPods(context.TODO(), &evictionpluginapi.GetEvictPodsRequest{
ActivePods: pods,
evictPodUIDSet := sets.String{}
for _, pod := range resp.EvictPods {
evictPodUIDSet.Insert(string(pod.Pod.GetUID()))
}
assert.Equal(t, tt.wantEvictPodUIDSet, evictPodUIDSet)
})
assert.NoError(t, err)
assert.NotNil(t, resp)

evictPodUIDSet := sets.String{}
for _, pod := range resp.EvictPods {
evictPodUIDSet.Insert(string(pod.Pod.GetUID()))
}
assert.Equal(t, tt.wantEvictPodUIDSet, evictPodUIDSet)
}
}

0 comments on commit 3f4179f

Please sign in to comment.