Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix (sysadvisor): pap - fixes of defects identified in integration validation #728

Merged
merged 4 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ func (p ruleBasedPowerStrategy) RecommendAction(actualWatt int,
return action.PowerAction{Op: spec.InternalOpNoop, Arg: 0}
}

if ttl <= time.Minute*2 {
// whatever valid alert, power capping should do in short of 2 minutes
return action.PowerAction{Op: spec.InternalOpFreqCap, Arg: desiredWatt}
}

op := internalOp
if spec.InternalOpAuto == op {
op = p.autoAction(actualWatt, desiredWatt, ttl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,31 @@ func Test_ruleBasedPowerStrategy_RecommendAction(t *testing.T) {
actualWatt: 99,
desiredWatt: 88,
alert: spec.PowerAlertP1,
internalOp: spec.InternalOpThrottle,
internalOp: spec.InternalOpAuto,
ttl: time.Second * 30,
},
want: action.PowerAction{
Op: spec.InternalOpFreqCap,
Arg: 88,
},
},
{
name: "approaching deadline no freq capping is indicated otherwise only",
fields: fields{
coefficient: exponentialDecay{},
},
args: args{
actualWatt: 99,
desiredWatt: 88,
alert: spec.PowerAlertP1,
internalOp: spec.InternalOpEvict,
ttl: time.Second * 30,
},
want: action.PowerAction{
Op: spec.InternalOpEvict,
Arg: 12,
},
},
{
name: "having a lot of time usually leads to evict a very little portion",
fields: fields{coefficient: exponentialDecay{b: math.E / 2}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net"
"os"
"path/filepath"
"sync"

"github.com/pkg/errors"
Expand Down Expand Up @@ -204,6 +205,10 @@ func newPowerCapServiceSuite(conf *config.Configuration, emitter metrics.MetricE
return nil, nil, errors.Wrap(err, "failed to clean up the residue file")
}

if err := os.MkdirAll(filepath.Dir(socketPath), 0o755); err != nil {
return nil, nil, errors.Wrap(err, "failed to create folders to unix sock file")
}

sock, err := net.Listen("unix", socketPath)
if err != nil {
return nil, nil, fmt.Errorf("%v listen %s failed: %v", powerCapSvc.Name(), socketPath, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/kubewharf/katalyst-api/pkg/plugins/registration"
"github.com/kubewharf/katalyst-api/pkg/plugins/skeleton"

pluginapi "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1"

"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/evictor"
Expand All @@ -43,24 +42,47 @@ const (
var errPowerPressureEvictionPluginUnavailable = errors.New("power pressure eviction plugin is unavailable")

type powerPressureEvictServer struct {
plugin *powerPressureEvictPlugin
gary-lgy marked this conversation as resolved.
Show resolved Hide resolved
service *skeleton.PluginRegistrationWrapper
}

func (p *powerPressureEvictServer) Init() error {
return p.plugin.Init()
}

func (p *powerPressureEvictServer) Evict(ctx context.Context, pods []*v1.Pod) error {
return p.plugin.Evict(ctx, pods)
}

func (p *powerPressureEvictServer) Start() error {
if err := p.service.Start(); err != nil {
return errors.Wrap(err, "failed to start power pressure eviction plugin server")
}
return nil
}

func (p *powerPressureEvictServer) Stop() error {
return p.service.Stop()
}

type powerPressureEvictPlugin struct {
mutex sync.RWMutex
started bool
evicts map[types.UID]*v1.Pod
service *skeleton.PluginRegistrationWrapper
}

func (p *powerPressureEvictServer) Init() error {
func (p *powerPressureEvictPlugin) Init() error {
return nil
}

// reset method clears all pending eviction requests not fetched by remote client
func (p *powerPressureEvictServer) reset(ctx context.Context) {
func (p *powerPressureEvictPlugin) reset(ctx context.Context) {
p.evicts = make(map[types.UID]*v1.Pod)
}

// Evict method puts request to evict pods in the pool; it will be sent out to plugin client via the eviction protocol
// the real eviction will be done by the (remote) eviction manager where the plugin client is registered with
func (p *powerPressureEvictServer) Evict(ctx context.Context, pods []*v1.Pod) error {
func (p *powerPressureEvictPlugin) Evict(ctx context.Context, pods []*v1.Pod) error {
p.mutex.Lock()
defer p.mutex.Unlock()

Expand All @@ -80,7 +102,7 @@ func (p *powerPressureEvictServer) Evict(ctx context.Context, pods []*v1.Pod) er
return nil
}

func (p *powerPressureEvictServer) evictPod(ctx context.Context, pod *v1.Pod) error {
func (p *powerPressureEvictPlugin) evictPod(ctx context.Context, pod *v1.Pod) error {
if pod == nil {
return errors.New("unexpected nil pod")
}
Expand All @@ -89,56 +111,43 @@ func (p *powerPressureEvictServer) evictPod(ctx context.Context, pod *v1.Pod) er
return nil
}

func (p *powerPressureEvictServer) Name() string {
func (p *powerPressureEvictPlugin) Name() string {
return EvictionPluginNameNodePowerPressure
}

func (p *powerPressureEvictServer) Start() error {
func (p *powerPressureEvictPlugin) Start() error {
p.mutex.Lock()
defer p.mutex.Unlock()

if p.started {
general.InfofV(6, "pap: power pressure eviction server already started")
return nil
}

if err := p.service.Start(); err != nil {
return errors.Wrap(err, "failed to start power pressure eviction plugin server")
}
p.started = true
return nil
}

func (p *powerPressureEvictServer) Stop() error {
func (p *powerPressureEvictPlugin) Stop() error {
p.mutex.Lock()
defer p.mutex.Unlock()

if !p.started {
general.InfofV(6, "pap: power pressure eviction server already stopped")
return nil
}

p.started = false
return p.service.Stop()
p.reset(context.Background())
return nil
}

func (p *powerPressureEvictServer) GetToken(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.GetTokenResponse, error) {
func (p *powerPressureEvictPlugin) GetToken(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.GetTokenResponse, error) {
return &pluginapi.GetTokenResponse{Token: ""}, nil
}

func (p *powerPressureEvictServer) ThresholdMet(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.ThresholdMetResponse, error) {
func (p *powerPressureEvictPlugin) ThresholdMet(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.ThresholdMetResponse, error) {
return &pluginapi.ThresholdMetResponse{}, nil
}

func (p *powerPressureEvictServer) GetTopEvictionPods(ctx context.Context, request *pluginapi.GetTopEvictionPodsRequest) (*pluginapi.GetTopEvictionPodsResponse, error) {
func (p *powerPressureEvictPlugin) GetTopEvictionPods(ctx context.Context, request *pluginapi.GetTopEvictionPodsRequest) (*pluginapi.GetTopEvictionPodsResponse, error) {
return &pluginapi.GetTopEvictionPodsResponse{}, nil
}

// GetEvictPods is called from a remote evict plugin client to get evict candidates
// In the current eviction manager framework, plugins are expected to implement either GetEvictPods or GetTopEvictionPods + ThresholdMet;
// the former allows the plugin to explicitly specify force and soft eviction candidates, which suits this plugin's use case.
// Adequate to implement only GetEvictPods and simply let GetTopEvictionPods and ThresholdMet return default responses.
func (p *powerPressureEvictServer) GetEvictPods(ctx context.Context, request *pluginapi.GetEvictPodsRequest) (*pluginapi.GetEvictPodsResponse, error) {
func (p *powerPressureEvictPlugin) GetEvictPods(ctx context.Context, request *pluginapi.GetEvictPodsRequest) (*pluginapi.GetEvictPodsResponse, error) {
general.InfofV(6, "pap: evict: GetEvictPods request with %d active pods", len(request.GetActivePods()))
activePods := map[types.UID]struct{}{}
for _, pod := range request.GetActivePods() {
Expand Down Expand Up @@ -168,14 +177,14 @@ func (p *powerPressureEvictServer) GetEvictPods(ctx context.Context, request *pl
return &pluginapi.GetEvictPodsResponse{EvictPods: evictPods}, nil
}

func newPowerPressureEvictServer() *powerPressureEvictServer {
return &powerPressureEvictServer{
func newPowerPressureEvictPlugin() *powerPressureEvictPlugin {
return &powerPressureEvictPlugin{
evicts: make(map[types.UID]*v1.Pod),
}
}

func NewPowerPressureEvictionPlugin(conf *config.Configuration, emitter metrics.MetricEmitter) (evictor.PodEvictor, error) {
plugin := newPowerPressureEvictServer()
func NewPowerPressureEvictionServer(conf *config.Configuration, emitter metrics.MetricEmitter) (evictor.PodEvictor, error) {
plugin := newPowerPressureEvictPlugin()
regWrapper, err := skeleton.NewRegistrationPluginWrapper(plugin,
[]string{conf.PluginRegistrationDir}, // unix socket dirs
func(key string, value int64) {
Expand All @@ -188,6 +197,9 @@ func NewPowerPressureEvictionPlugin(conf *config.Configuration, emitter metrics.
return nil, errors.Wrap(err, "failed to register pap power pressure eviction plugin")
}

plugin.service = regWrapper
return plugin, nil
server := &powerPressureEvictServer{
plugin: plugin,
service: regWrapper,
}
return server, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net"
"os"
"sort"
"testing"

Expand All @@ -34,13 +35,15 @@ import (
evictionv1apha1 "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1"

"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/evictor"
"github.com/kubewharf/katalyst-core/pkg/config"
metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool"
)

func setupGrpcServer(ctx context.Context) (evictor.PodEvictor, evictionv1apha1.EvictionPluginClient, func()) {
buffer := 101024 * 1024
lis := bufconn.Listen(buffer)

server := &powerPressureEvictServer{}
server := &powerPressureEvictPlugin{}
baseServer := grpc.NewServer()
evictionv1apha1.RegisterEvictionPluginServer(baseServer, server)
server.started = true
Expand Down Expand Up @@ -178,7 +181,7 @@ func Test_powerPressureEvictPluginServer_GetEvictPods(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
p := &powerPressureEvictServer{
p := &powerPressureEvictPlugin{
evicts: tt.fields.evicts,
}
got, err := p.GetEvictPods(tt.args.ctx, tt.args.request)
Expand All @@ -194,3 +197,23 @@ func Test_powerPressureEvictPluginServer_GetEvictPods(t *testing.T) {
})
}
}

func Test_powerPressureEvictServer_Start_Stop(t *testing.T) {
t.Parallel()
tmpDir, err := os.MkdirTemp("", "*")
assert.NoError(t, err)

dummyConf := config.NewConfiguration()
dummyConf.PluginRegistrationDir = tmpDir
dummyEmitter := metricspool.DummyMetricsEmitterPool.GetDefaultMetricsEmitter(metricspool.DummyMetricsEmitterPool{})
plugin, err := NewPowerPressureEvictionServer(dummyConf, dummyEmitter)
assert.NoError(t, err)

err = plugin.Start()
assert.NoError(t, err)

err = plugin.Stop()
assert.NoError(t, err)

_ = os.RemoveAll(tmpDir)
}
2 changes: 1 addition & 1 deletion pkg/agent/sysadvisor/plugin/poweraware/power_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func NewPowerAwarePlugin(
if conf.DisablePowerPressureEvict {
podEvictor = evictor.NewNoopPodEvictor()
} else {
if podEvictor, err = evictserver.NewPowerPressureEvictionPlugin(conf, emitter); err != nil {
if podEvictor, err = evictserver.NewPowerPressureEvictionServer(conf, emitter); err != nil {
return nil, errors.Wrap(err, "pap: failed to create power aware plugin")
}
}
Expand Down