Skip to content

Commit

Permalink
Merge pull request #728 from h-w-chen/dev/pap-integration-bug-fix
Browse files Browse the repository at this point in the history
fix (sysadvisor): pap - fixes of defects identified in integration validation
  • Loading branch information
xu282934741 authored Dec 3, 2024
2 parents f60ad7d + 07840c5 commit e092a7c
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ func (p ruleBasedPowerStrategy) RecommendAction(actualWatt int,
return action.PowerAction{Op: spec.InternalOpNoop, Arg: 0}
}

if ttl <= time.Minute*2 {
// whatever valid alert, power capping should do in short of 2 minutes
return action.PowerAction{Op: spec.InternalOpFreqCap, Arg: desiredWatt}
}

op := internalOp
if spec.InternalOpAuto == op {
op = p.autoAction(actualWatt, desiredWatt, ttl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,31 @@ func Test_ruleBasedPowerStrategy_RecommendAction(t *testing.T) {
actualWatt: 99,
desiredWatt: 88,
alert: spec.PowerAlertP1,
internalOp: spec.InternalOpThrottle,
internalOp: spec.InternalOpAuto,
ttl: time.Second * 30,
},
want: action.PowerAction{
Op: spec.InternalOpFreqCap,
Arg: 88,
},
},
{
name: "approaching deadline no freq capping is indicated otherwise only",
fields: fields{
coefficient: exponentialDecay{},
},
args: args{
actualWatt: 99,
desiredWatt: 88,
alert: spec.PowerAlertP1,
internalOp: spec.InternalOpEvict,
ttl: time.Second * 30,
},
want: action.PowerAction{
Op: spec.InternalOpEvict,
Arg: 12,
},
},
{
name: "having a lot of time usually leads to evict a very little portion",
fields: fields{coefficient: exponentialDecay{b: math.E / 2}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net"
"os"
"path/filepath"
"sync"

"github.com/pkg/errors"
Expand Down Expand Up @@ -204,6 +205,10 @@ func newPowerCapServiceSuite(conf *config.Configuration, emitter metrics.MetricE
return nil, nil, errors.Wrap(err, "failed to clean up the residue file")
}

if err := os.MkdirAll(filepath.Dir(socketPath), 0o755); err != nil {
return nil, nil, errors.Wrap(err, "failed to create folders to unix sock file")
}

sock, err := net.Listen("unix", socketPath)
if err != nil {
return nil, nil, fmt.Errorf("%v listen %s failed: %v", powerCapSvc.Name(), socketPath, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/kubewharf/katalyst-api/pkg/plugins/registration"
"github.com/kubewharf/katalyst-api/pkg/plugins/skeleton"

pluginapi "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1"

"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/evictor"
Expand All @@ -43,24 +42,47 @@ const (
var errPowerPressureEvictionPluginUnavailable = errors.New("power pressure eviction plugin is unavailable")

type powerPressureEvictServer struct {
plugin *powerPressureEvictPlugin
service *skeleton.PluginRegistrationWrapper
}

func (p *powerPressureEvictServer) Init() error {
return p.plugin.Init()
}

func (p *powerPressureEvictServer) Evict(ctx context.Context, pods []*v1.Pod) error {
return p.plugin.Evict(ctx, pods)
}

func (p *powerPressureEvictServer) Start() error {
if err := p.service.Start(); err != nil {
return errors.Wrap(err, "failed to start power pressure eviction plugin server")
}
return nil
}

func (p *powerPressureEvictServer) Stop() error {
return p.service.Stop()
}

type powerPressureEvictPlugin struct {
mutex sync.RWMutex
started bool
evicts map[types.UID]*v1.Pod
service *skeleton.PluginRegistrationWrapper
}

func (p *powerPressureEvictServer) Init() error {
func (p *powerPressureEvictPlugin) Init() error {
return nil
}

// reset method clears all pending eviction requests not fetched by remote client
func (p *powerPressureEvictServer) reset(ctx context.Context) {
func (p *powerPressureEvictPlugin) reset(ctx context.Context) {
p.evicts = make(map[types.UID]*v1.Pod)
}

// Evict method puts request to evict pods in the pool; it will be sent out to plugin client via the eviction protocol
// the real eviction will be done by the (remote) eviction manager where the plugin client is registered with
func (p *powerPressureEvictServer) Evict(ctx context.Context, pods []*v1.Pod) error {
func (p *powerPressureEvictPlugin) Evict(ctx context.Context, pods []*v1.Pod) error {
p.mutex.Lock()
defer p.mutex.Unlock()

Expand All @@ -80,7 +102,7 @@ func (p *powerPressureEvictServer) Evict(ctx context.Context, pods []*v1.Pod) er
return nil
}

func (p *powerPressureEvictServer) evictPod(ctx context.Context, pod *v1.Pod) error {
func (p *powerPressureEvictPlugin) evictPod(ctx context.Context, pod *v1.Pod) error {
if pod == nil {
return errors.New("unexpected nil pod")
}
Expand All @@ -89,56 +111,43 @@ func (p *powerPressureEvictServer) evictPod(ctx context.Context, pod *v1.Pod) er
return nil
}

func (p *powerPressureEvictServer) Name() string {
func (p *powerPressureEvictPlugin) Name() string {
return EvictionPluginNameNodePowerPressure
}

func (p *powerPressureEvictServer) Start() error {
func (p *powerPressureEvictPlugin) Start() error {
p.mutex.Lock()
defer p.mutex.Unlock()

if p.started {
general.InfofV(6, "pap: power pressure eviction server already started")
return nil
}

if err := p.service.Start(); err != nil {
return errors.Wrap(err, "failed to start power pressure eviction plugin server")
}
p.started = true
return nil
}

func (p *powerPressureEvictServer) Stop() error {
func (p *powerPressureEvictPlugin) Stop() error {
p.mutex.Lock()
defer p.mutex.Unlock()

if !p.started {
general.InfofV(6, "pap: power pressure eviction server already stopped")
return nil
}

p.started = false
return p.service.Stop()
p.reset(context.Background())
return nil
}

func (p *powerPressureEvictServer) GetToken(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.GetTokenResponse, error) {
func (p *powerPressureEvictPlugin) GetToken(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.GetTokenResponse, error) {
return &pluginapi.GetTokenResponse{Token: ""}, nil
}

func (p *powerPressureEvictServer) ThresholdMet(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.ThresholdMetResponse, error) {
func (p *powerPressureEvictPlugin) ThresholdMet(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.ThresholdMetResponse, error) {
return &pluginapi.ThresholdMetResponse{}, nil
}

func (p *powerPressureEvictServer) GetTopEvictionPods(ctx context.Context, request *pluginapi.GetTopEvictionPodsRequest) (*pluginapi.GetTopEvictionPodsResponse, error) {
func (p *powerPressureEvictPlugin) GetTopEvictionPods(ctx context.Context, request *pluginapi.GetTopEvictionPodsRequest) (*pluginapi.GetTopEvictionPodsResponse, error) {
return &pluginapi.GetTopEvictionPodsResponse{}, nil
}

// GetEvictPods is called from a remote evict plugin client to get evict candidates
// In the current eviction manager framework, plugins are expected to implement either GetEvictPods or GetTopEvictionPods + ThresholdMet;
// the former allows the plugin to explicitly specify force and soft eviction candidates, which suits this plugin's use case.
// Adequate to implement only GetEvictPods and simply let GetTopEvictionPods and ThresholdMet return default responses.
func (p *powerPressureEvictServer) GetEvictPods(ctx context.Context, request *pluginapi.GetEvictPodsRequest) (*pluginapi.GetEvictPodsResponse, error) {
func (p *powerPressureEvictPlugin) GetEvictPods(ctx context.Context, request *pluginapi.GetEvictPodsRequest) (*pluginapi.GetEvictPodsResponse, error) {
general.InfofV(6, "pap: evict: GetEvictPods request with %d active pods", len(request.GetActivePods()))
activePods := map[types.UID]struct{}{}
for _, pod := range request.GetActivePods() {
Expand Down Expand Up @@ -168,14 +177,14 @@ func (p *powerPressureEvictServer) GetEvictPods(ctx context.Context, request *pl
return &pluginapi.GetEvictPodsResponse{EvictPods: evictPods}, nil
}

func newPowerPressureEvictServer() *powerPressureEvictServer {
return &powerPressureEvictServer{
func newPowerPressureEvictPlugin() *powerPressureEvictPlugin {
return &powerPressureEvictPlugin{
evicts: make(map[types.UID]*v1.Pod),
}
}

func NewPowerPressureEvictionPlugin(conf *config.Configuration, emitter metrics.MetricEmitter) (evictor.PodEvictor, error) {
plugin := newPowerPressureEvictServer()
func NewPowerPressureEvictionServer(conf *config.Configuration, emitter metrics.MetricEmitter) (evictor.PodEvictor, error) {
plugin := newPowerPressureEvictPlugin()
regWrapper, err := skeleton.NewRegistrationPluginWrapper(plugin,
[]string{conf.PluginRegistrationDir}, // unix socket dirs
func(key string, value int64) {
Expand All @@ -188,6 +197,9 @@ func NewPowerPressureEvictionPlugin(conf *config.Configuration, emitter metrics.
return nil, errors.Wrap(err, "failed to register pap power pressure eviction plugin")
}

plugin.service = regWrapper
return plugin, nil
server := &powerPressureEvictServer{
plugin: plugin,
service: regWrapper,
}
return server, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net"
"os"
"sort"
"testing"

Expand All @@ -34,13 +35,15 @@ import (
evictionv1apha1 "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1"

"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/evictor"
"github.com/kubewharf/katalyst-core/pkg/config"
metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool"
)

func setupGrpcServer(ctx context.Context) (evictor.PodEvictor, evictionv1apha1.EvictionPluginClient, func()) {
buffer := 101024 * 1024
lis := bufconn.Listen(buffer)

server := &powerPressureEvictServer{}
server := &powerPressureEvictPlugin{}
baseServer := grpc.NewServer()
evictionv1apha1.RegisterEvictionPluginServer(baseServer, server)
server.started = true
Expand Down Expand Up @@ -178,7 +181,7 @@ func Test_powerPressureEvictPluginServer_GetEvictPods(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
p := &powerPressureEvictServer{
p := &powerPressureEvictPlugin{
evicts: tt.fields.evicts,
}
got, err := p.GetEvictPods(tt.args.ctx, tt.args.request)
Expand All @@ -194,3 +197,23 @@ func Test_powerPressureEvictPluginServer_GetEvictPods(t *testing.T) {
})
}
}

func Test_powerPressureEvictServer_Start_Stop(t *testing.T) {
t.Parallel()
tmpDir, err := os.MkdirTemp("", "*")
assert.NoError(t, err)

dummyConf := config.NewConfiguration()
dummyConf.PluginRegistrationDir = tmpDir
dummyEmitter := metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{})
plugin, err := NewPowerPressureEvictionServer(dummyConf, dummyEmitter)
assert.NoError(t, err)

err = plugin.Start()
assert.NoError(t, err)

err = plugin.Stop()
assert.NoError(t, err)

_ = os.RemoveAll(tmpDir)
}
2 changes: 1 addition & 1 deletion pkg/agent/sysadvisor/plugin/poweraware/power_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func NewPowerAwarePlugin(
if conf.DisablePowerPressureEvict {
podEvictor = evictor.NewNoopPodEvictor()
} else {
if podEvictor, err = evictserver.NewPowerPressureEvictionPlugin(conf, emitter); err != nil {
if podEvictor, err = evictserver.NewPowerPressureEvictionServer(conf, emitter); err != nil {
return nil, errors.Wrap(err, "pap: failed to create power aware plugin")
}
}
Expand Down

0 comments on commit e092a7c

Please sign in to comment.