Skip to content

Commit

Permalink
support service profiling manager
Browse files Browse the repository at this point in the history
  • Loading branch information
luomingmeng committed May 22, 2023
1 parent 0be9a13 commit 9130205
Show file tree
Hide file tree
Showing 13 changed files with 927 additions and 230 deletions.
2 changes: 2 additions & 0 deletions cmd/katalyst-agent/app/options/global/metaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
defaultCustomNodeConfigCacheTTL = 15 * time.Second
defaultServiceProfileCacheTTL = 15 * time.Second
defaultConfigCacheTTL = 15 * time.Second
defaultConfigDisableDynamic = false
defaultConfigSkipFailedInitialization = true
defaultConfigCheckpointGraceTime = 2 * time.Hour
)
Expand Down Expand Up @@ -73,6 +74,7 @@ func NewMetaServerOptions() *MetaServerOptions {
CustomNodeConfigCacheTTL: defaultCustomNodeConfigCacheTTL,
ServiceProfileCacheTTL: defaultServiceProfileCacheTTL,
ConfigCacheTTL: defaultConfigCacheTTL,
ConfigDisableDynamic: defaultConfigDisableDynamic,
ConfigSkipFailedInitialization: defaultConfigSkipFailedInitialization,
ConfigCheckpointGraceTime: defaultConfigCheckpointGraceTime,
KubeletReadOnlyPort: defaultKubeletReadOnlyPort,
Expand Down
102 changes: 100 additions & 2 deletions pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

"github.com/kubewharf/katalyst-api/pkg/consts"
Expand All @@ -41,6 +43,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)
Expand All @@ -56,7 +59,7 @@ func generateTestConfiguration(t *testing.T, checkpointDir, stateFileDir string)
return conf
}

func newTestCPUResourceAdvisor(t *testing.T, checkpointDir, stateFileDir string) (*cpuResourceAdvisor, metacache.MetaCache) {
func newTestCPUResourceAdvisor(t *testing.T, pods []*v1.Pod, checkpointDir, stateFileDir string) (*cpuResourceAdvisor, metacache.MetaCache) {
conf := generateTestConfiguration(t, checkpointDir, stateFileDir)

metaCache, err := metacache.NewMetaCacheImp(conf, metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}))
Expand All @@ -76,6 +79,9 @@ func newTestCPUResourceAdvisor(t *testing.T, checkpointDir, stateFileDir string)
KatalystMachineInfo: &machine.KatalystMachineInfo{
CPUTopology: cpuTopology,
},
PodFetcher: &pod.PodFetcherStub{
PodList: pods,
},
}

cra := NewCPUResourceAdvisor(conf, struct{}{}, metaCache, metaServer, nil)
Expand Down Expand Up @@ -109,6 +115,7 @@ func TestUpdate(t *testing.T) {
name string
pools map[string]*types.PoolInfo
containers []*types.ContainerInfo
pods []*v1.Pod
reclaimEnabled bool
wantInternalCalculationResult InternalCalculationResult
wantHeadroom resource.Quantity
Expand Down Expand Up @@ -177,6 +184,15 @@ func TestUpdate(t *testing.T) {
1: machine.MustParse("25"),
}, 4),
},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
UID: "uid1",
},
},
},
wantInternalCalculationResult: InternalCalculationResult{
map[string]map[int]resource.Quantity{
state.PoolNameReserve: {-1: *resource.NewQuantity(2, resource.DecimalSI)},
Expand Down Expand Up @@ -220,6 +236,15 @@ func TestUpdate(t *testing.T) {
1: machine.MustParse("25-47,72-95"),
}, 96),
},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
UID: "uid1",
},
},
},
wantInternalCalculationResult: InternalCalculationResult{
map[string]map[int]resource.Quantity{
state.PoolNameReserve: {-1: *resource.NewQuantity(2, resource.DecimalSI)},
Expand Down Expand Up @@ -265,6 +290,22 @@ func TestUpdate(t *testing.T) {
1: machine.MustParse("25-26,72-73"),
}, 4),
},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
UID: "uid1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "default",
UID: "uid2",
},
},
},
wantInternalCalculationResult: InternalCalculationResult{
map[string]map[int]resource.Quantity{
state.PoolNameReserve: {
Expand Down Expand Up @@ -315,6 +356,22 @@ func TestUpdate(t *testing.T) {
1: machine.MustParse("25-26,72-73"),
}, 6),
},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
UID: "uid1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "default",
UID: "uid2",
},
},
},
wantInternalCalculationResult: InternalCalculationResult{
map[string]map[int]resource.Quantity{
state.PoolNameReserve: {
Expand Down Expand Up @@ -363,6 +420,15 @@ func TestUpdate(t *testing.T) {
0: machine.MustParse("1-23,48-71"),
}, 48),
},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
UID: "uid1",
},
},
},
wantInternalCalculationResult: InternalCalculationResult{
map[string]map[int]resource.Quantity{
state.PoolNameReserve: {
Expand Down Expand Up @@ -426,6 +492,22 @@ func TestUpdate(t *testing.T) {
1: machine.MustParse("25-47,72-95"),
}, 96),
},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
UID: "uid1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "default",
UID: "uid2",
},
},
},
wantInternalCalculationResult: InternalCalculationResult{
map[string]map[int]resource.Quantity{
state.PoolNameReserve: {-1: *resource.NewQuantity(2, resource.DecimalSI)},
Expand Down Expand Up @@ -486,6 +568,22 @@ func TestUpdate(t *testing.T) {
1: machine.MustParse("26"),
}, 4),
},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
UID: "uid1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "default",
UID: "uid2",
},
},
},
wantInternalCalculationResult: InternalCalculationResult{
map[string]map[int]resource.Quantity{
state.PoolNameReserve: {-1: *resource.NewQuantity(2, resource.DecimalSI)},
Expand All @@ -508,7 +606,7 @@ func TestUpdate(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(sfDir)

advisor, metaCache := newTestCPUResourceAdvisor(t, ckDir, sfDir)
advisor, metaCache := newTestCPUResourceAdvisor(t, tt.pods, ckDir, sfDir)
advisor.startTime = time.Now().Add(-types.StartUpPeriod * 2)
advisor.conf.ReclaimedResourceConfiguration.SetEnableReclaim(tt.reclaimEnabled)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package provisionpolicy

import (
"context"

"k8s.io/klog/v2"

"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
Expand All @@ -43,17 +45,23 @@ func NewPolicyCanonical(regionName string, _ *config.Configuration, _ interface{

func (p *PolicyCanonical) estimationCPUUsage() (cpuEstimation float64, containerCnt uint, err error) {
for podUID, containerSet := range p.podSet {
enableReclaim, err := helper.PodEnableReclaim(context.Background(), p.metaServer, podUID, p.essentials.EnableReclaim)
if err != nil {
return 0, 0, err
}

for containerName := range containerSet {
ci, ok := p.metaReader.GetContainerInfo(podUID, containerName)
if !ok || ci == nil {
klog.Errorf("[qosaware-cpu-provision] illegal container info of %v/%v", podUID, containerName)
continue
}

containerEstimation, err := helper.EstimateContainerCPUUsage(ci, p.metaReader, p.essentials.EnableReclaim)
containerEstimation, err := helper.EstimateContainerCPUUsage(ci, p.metaReader, enableReclaim)
if err != nil {
return 0, 0, err
}

// FIXME: metric server doesn't support to report cpu usage in numa granularity,
// so we split cpu usage evenly across the binding numas of container.
if p.bindingNumas.Size() > 0 {
Expand Down
56 changes: 56 additions & 0 deletions pkg/agent/sysadvisor/plugin/qosaware/resource/helper/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package helper

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/errors"

"github.com/kubewharf/katalyst-core/pkg/metaserver"
)

// PodEnableReclaim checks whether the pod can be reclaimed,
// if node does not enable reclaim, it will return false directly,
// if node enable reclaim, it will check whether the pod is degraded.
func PodEnableReclaim(ctx context.Context, metaServer *metaserver.MetaServer,
podUID string, nodeEnableReclaim bool) (bool, error) {
if !nodeEnableReclaim {
return false, nil
}

if metaServer == nil {
return false, fmt.Errorf("metaServer is nil")
}

pod, err := metaServer.GetPod(ctx, podUID)
if err != nil {
return false, err
}

// check whether the pod is degraded
degraded, err := metaServer.ServiceBusinessPerformanceDegraded(ctx, pod)
if err != nil && !errors.IsNotFound(err) {
return false, err
} else if err != nil {
return true, nil
}

// if pod is degraded, it can not be reclaimed
return !degraded, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

"github.com/kubewharf/katalyst-api/pkg/consts"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)
Expand Down Expand Up @@ -88,7 +90,7 @@ func generateTestConfiguration(t *testing.T, checkpointDir, stateFileDir string)
return conf
}

func newTestMemoryAdvisor(t *testing.T, checkpointDir, stateFileDir string) (*memoryResourceAdvisor, metacache.MetaCache) {
func newTestMemoryAdvisor(t *testing.T, pods []*v1.Pod, checkpointDir, stateFileDir string) (*memoryResourceAdvisor, metacache.MetaCache) {
conf := generateTestConfiguration(t, checkpointDir, stateFileDir)

metaCache, err := metacache.NewMetaCacheImp(conf, metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}))
Expand All @@ -106,6 +108,9 @@ func newTestMemoryAdvisor(t *testing.T, checkpointDir, stateFileDir string) (*me
MemoryCapacity: 1000 << 30,
},
},
PodFetcher: &pod.PodFetcherStub{
PodList: pods,
},
}

mra := NewMemoryResourceAdvisor(conf, struct{}{}, metaCache, metaServer, nil)
Expand All @@ -119,6 +124,7 @@ func TestUpdate(t *testing.T) {
name string
pools map[string]*types.PoolInfo
containers []*types.ContainerInfo
pods []*v1.Pod
wantHeadroom resource.Quantity
reclaimedEnable bool
}{
Expand Down Expand Up @@ -180,6 +186,15 @@ func TestUpdate(t *testing.T) {
1: machine.MustParse("25"),
}, 0),
},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
UID: "uid1",
},
},
},
wantHeadroom: *resource.NewQuantity(988<<30, resource.DecimalSI),
},
{
Expand Down Expand Up @@ -216,6 +231,15 @@ func TestUpdate(t *testing.T) {
1: machine.MustParse("25"),
}, 200<<30),
},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
UID: "uid1",
},
},
},
wantHeadroom: *resource.NewQuantity(796<<30, resource.DecimalSI),
},
}
Expand All @@ -230,7 +254,7 @@ func TestUpdate(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(sfDir)

advisor, metaCache := newTestMemoryAdvisor(t, ckDir, sfDir)
advisor, metaCache := newTestMemoryAdvisor(t, tt.pods, ckDir, sfDir)
advisor.startTime = time.Now().Add(-startUpPeriod * 2)
advisor.conf.ReclaimedResourceConfiguration.SetEnableReclaim(tt.reclaimedEnable)
_, _ = advisor.GetChannels()
Expand Down
Loading

0 comments on commit 9130205

Please sign in to comment.