Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sysadvisor): add healthz check #520

Merged
merged 1 commit into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/agent/evictionmanager/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

pluginapi "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1"
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

var (
Expand Down Expand Up @@ -89,6 +90,10 @@ func (m *EvictionManger) getNodeTaintsFromConditions() []v1.Taint {
}

func (m *EvictionManger) reportConditionsAsNodeTaints(ctx context.Context) {
var err error
defer func() {
_ = general.UpdateHealthzStateByError(reportTaintHealthCheckName, err)
}()
node, err := m.metaGetter.GetNode(ctx)

if err != nil {
Expand All @@ -111,6 +116,7 @@ func (m *EvictionManger) reportConditionsAsNodeTaints(ctx context.Context) {

if !controllerutil.SwapNodeControllerTaint(ctx, m.genericClient.KubeClient, taintsToAdd, taintsToDel, node) {
klog.Errorf("failed to swap taints")
err = fmt.Errorf("failed to swap taints")
}

return
Expand Down
43 changes: 36 additions & 7 deletions pkg/agent/evictionmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
//nolint
"github.com/golang/protobuf/proto"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/events"
Expand Down Expand Up @@ -72,6 +73,11 @@ const (
UserUnknown = "unknown"

MetricsPodLabelPrefix = "pod"

evictionManagerHealthCheckName = "eviction_manager_sync"
reportTaintHealthCheckName = "eviction_manager_report_taint"
syncTolerationTurns = 3
reportTaintToleration = 15 * time.Second
)

// LatestCNRGetter returns the latest CNR resources.
Expand Down Expand Up @@ -207,7 +213,10 @@ func (m *EvictionManger) getEvictionPlugins(genericClient *client.GenericClientS
func (m *EvictionManger) Run(ctx context.Context) {
general.Infof(" run with podKiller %v", m.podKiller.Name())
defer general.Infof(" started")

general.RegisterHeartbeatCheck(evictionManagerHealthCheckName, syncTolerationTurns*m.conf.EvictionManagerSyncPeriod,
general.HealthzCheckStateNotReady, syncTolerationTurns*m.conf.EvictionManagerSyncPeriod)
general.RegisterHeartbeatCheck(reportTaintHealthCheckName, reportTaintToleration,
general.HealthzCheckStateNotReady, reportTaintToleration)
m.podKiller.Start(ctx)
for _, endpoint := range m.endpoints {
endpoint.Start()
Expand All @@ -220,6 +229,11 @@ func (m *EvictionManger) Run(ctx context.Context) {
}

func (m *EvictionManger) sync(ctx context.Context) {
var err error
defer func() {
_ = general.UpdateHealthzStateByError(evictionManagerHealthCheckName, err)
}()

activePods, err := m.metaGetter.GetPodList(ctx, native.PodIsActive)
if err != nil {
general.Errorf("failed to list pods from metaServer: %v", err)
Expand All @@ -233,14 +247,25 @@ func (m *EvictionManger) sync(ctx context.Context) {
general.Infof(" currently, there are %v candidate pods", len(pods))
_ = m.emitter.StoreInt64(MetricsNameCandidatePodCNT, int64(len(pods)), metrics.MetricTypeNameRaw)

collector := m.collectEvictionResult(pods)
errList := make([]error, 0)
collector, collectErr := m.collectEvictionResult(pods)
if collectErr != nil {
errList = append(errList, collectErr)
}

m.doEvict(collector.getSoftEvictPods(), collector.getForceEvictPods())
evictErr := m.doEvict(collector.getSoftEvictPods(), collector.getForceEvictPods())
if evictErr != nil {
errList = append(errList, evictErr)
}
if len(errList) > 0 {
err = errors.NewAggregate(errList)
}
}

func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespCollector {
func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) (*evictionRespCollector, error) {
dynamicConfig := m.conf.GetDynamicConfiguration()
collector := newEvictionRespCollector(dynamicConfig.DryRun, m.conf, m.emitter)
var errList []error

m.endpointLock.RLock()
for pluginName, ep := range m.endpoints {
Expand All @@ -252,6 +277,7 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespColl
})
if err != nil {
general.Errorf(" calling GetEvictPods of plugin: %s failed with error: %v", pluginName, err)
errList = append(errList, err)
} else if getEvictResp == nil {
general.Errorf(" calling GetEvictPods of plugin: %s and getting nil resp", pluginName)
} else {
Expand All @@ -262,6 +288,7 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespColl
metResp, err := ep.ThresholdMet(context.Background())
if err != nil {
general.Errorf(" calling ThresholdMet of plugin: %s failed with error: %v", pluginName, err)
errList = append(errList, err)
continue
} else if metResp == nil {
general.Errorf(" calling ThresholdMet of plugin: %s and getting nil resp", pluginName)
Expand Down Expand Up @@ -310,6 +337,7 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespColl
m.endpointLock.RUnlock()
if err != nil {
general.Errorf(" calling GetTopEvictionPods of plugin: %s failed with error: %v", pluginName, err)
errList = append(errList, err)
continue
} else if resp == nil {
general.Errorf(" calling GetTopEvictionPods of plugin: %s and getting nil resp", pluginName)
Expand All @@ -322,10 +350,10 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespColl
collector.collectTopEvictionPods(dynamicConfig.DryRun, pluginName, threshold, resp)
}

return collector
return collector, errors.NewAggregate(errList)
}

func (m *EvictionManger) doEvict(softEvictPods, forceEvictPods map[string]*rule.RuledEvictPod) {
func (m *EvictionManger) doEvict(softEvictPods, forceEvictPods map[string]*rule.RuledEvictPod) error {
softEvictPods = filterOutCandidatePodsWithForcePods(softEvictPods, forceEvictPods)
bestSuitedCandidate := m.getEvictPodFromCandidates(softEvictPods)
if bestSuitedCandidate != nil && bestSuitedCandidate.Pod != nil {
Expand All @@ -346,13 +374,14 @@ func (m *EvictionManger) doEvict(softEvictPods, forceEvictPods map[string]*rule.
err := m.killWithRules(rpList)
if err != nil {
general.Errorf(" got err: %v in EvictPods", err)
return
return err
}

general.Infof(" evict %d pods in evictionmanager", len(rpList))
_ = m.emitter.StoreInt64(MetricsNameVictimPodCNT, int64(len(rpList)), metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "type", Val: "total"})
metricPodsToEvict(m.emitter, rpList, m.conf.GenericConfiguration.QoSConfiguration, m.conf.GenericEvictionConfiguration.PodMetricLabels)
return nil
}

// ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/evictionmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func TestEvictionManger_collectEvictionResult(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
mgr.conf.GetDynamicConfiguration().DryRun = tt.dryrun

collector := mgr.collectEvictionResult(pods)
collector, _ := mgr.collectEvictionResult(pods)
gotForceEvictPods := sets.String{}
gotSoftEvictPods := sets.String{}
gotConditions := sets.String{}
Expand Down
26 changes: 18 additions & 8 deletions pkg/agent/evictionmanager/plugin/memory/system_pressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
EvictionPluginNameSystemMemoryPressure = "system-memory-pressure-eviction-plugin"
EvictionScopeSystemMemory = "SystemMemory"
evictionConditionMemoryPressure = "MemoryPressure"
systemMemoryPressureHealthCheck = "system_memory_pressure_eviction_detect"
syncTolerationTurns = 3
)

func NewSystemPressureEvictionPlugin(_ *client.GenericClientSet, _ events.EventRecorder,
Expand Down Expand Up @@ -99,6 +101,8 @@ func (s *SystemPressureEvictionPlugin) Name() string {
}

func (s *SystemPressureEvictionPlugin) Start() {
general.RegisterHeartbeatCheck(systemMemoryPressureHealthCheck, syncTolerationTurns*s.syncPeriod,
general.HealthzCheckStateNotReady, syncTolerationTurns*s.syncPeriod)
go wait.UntilWithContext(context.TODO(), s.detectSystemPressures, s.syncPeriod)
}

Expand Down Expand Up @@ -137,12 +141,16 @@ func (s *SystemPressureEvictionPlugin) ThresholdMet(_ context.Context) (*plugina
func (s *SystemPressureEvictionPlugin) detectSystemPressures(_ context.Context) {
s.Lock()
defer s.Unlock()
var err error
defer func() {
_ = general.UpdateHealthzStateByError(systemMemoryPressureHealthCheck, err)
}()

s.isUnderSystemPressure = false
s.systemAction = actionNoop

s.detectSystemWatermarkPressure()
s.detectSystemKswapdStealPressure()
err = s.detectSystemWatermarkPressure()
err = s.detectSystemKswapdStealPressure()

switch s.systemAction {
case actionReclaimedEviction:
Expand All @@ -162,15 +170,15 @@ func (s *SystemPressureEvictionPlugin) detectSystemPressures(_ context.Context)
}
}

func (s *SystemPressureEvictionPlugin) detectSystemWatermarkPressure() {
func (s *SystemPressureEvictionPlugin) detectSystemWatermarkPressure() error {
free, total, scaleFactor, err := helper.GetWatermarkMetrics(s.metaServer.MetricsFetcher, s.emitter, nonExistNumaID)
if err != nil {
_ = s.emitter.StoreInt64(metricsNameFetchMetricError, 1, metrics.MetricTypeNameCount,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyNumaID: strconv.Itoa(nonExistNumaID),
})...)
general.Errorf("failed to getWatermarkMetrics for system, err: %v", err)
return
return err
}

thresholdMinimum := float64(s.dynamicConfig.GetDynamicConfiguration().SystemFreeMemoryThresholdMinimum)
Expand All @@ -184,9 +192,10 @@ func (s *SystemPressureEvictionPlugin) detectSystemWatermarkPressure() {
s.isUnderSystemPressure = true
s.systemAction = actionReclaimedEviction
}
return nil
}

func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() {
func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() error {
kswapdSteal, err := helper.GetNodeMetricWithTime(s.metaServer.MetricsFetcher, s.emitter, consts.MetricMemKswapdstealSystem)
if err != nil {
s.kswapdStealPreviousCycle = kswapdStealPreviousCycleMissing
Expand All @@ -196,12 +205,12 @@ func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() {
metricsTagKeyNumaID: strconv.Itoa(nonExistNumaID),
})...)
general.Errorf("failed to getSystemKswapdStealMetrics, err: %v", err)
return
return err
}

if kswapdSteal.Time.Equal(s.kswapdStealPreviousCycleTime) {
general.Warningf("getSystemKswapdStealMetrics get same result as last round,skip current round")
return
return nil
}

dynamicConfig := s.dynamicConfig.GetDynamicConfiguration()
Expand Down Expand Up @@ -229,7 +238,7 @@ func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() {
s.kswapdStealPreviousCycleTime = *(kswapdSteal.Time)
if kswapdStealPreviousCycle == kswapdStealPreviousCycleMissing {
general.Warningf("kswapd steal of the previous cycle is missing")
return
return nil
}

if (kswapdSteal.Value-kswapdStealPreviousCycle)/(kswapdSteal.Time.Sub(kswapdStealPreviousCycleTime)).Seconds() >= float64(dynamicConfig.SystemKswapdRateThreshold) {
Expand All @@ -249,6 +258,7 @@ func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() {
s.systemAction = actionEviction
}
}
return nil
}

func (s *SystemPressureEvictionPlugin) GetTopEvictionPods(_ context.Context, request *pluginapi.GetTopEvictionPodsRequest) (*pluginapi.GetTopEvictionPodsResponse, error) {
Expand Down
35 changes: 21 additions & 14 deletions pkg/agent/resourcemanager/fetcher/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (

const reporterManagerCheckpoint = "reporter_manager_checkpoint"

const healthzNameReporterFetcherReady = "ReporterFetcherReady"
const reporterFetcherHealthCheckName = "reporter_fetcher_sync"

const healthzGracePeriodMultiplier = 3

Expand Down Expand Up @@ -220,14 +220,12 @@ func (m *ReporterPluginManager) DeRegisterPlugin(pluginName string) {

// Run start the reporter plugin manager
func (m *ReporterPluginManager) Run(ctx context.Context) {
general.RegisterHeartbeatCheck(reporterFetcherHealthCheckName, m.reconcilePeriod*healthzGracePeriodMultiplier,
general.HealthzCheckStateReady, m.reconcilePeriod*healthzGracePeriodMultiplier)
go wait.UntilWithContext(ctx, m.syncFunc, m.reconcilePeriod)

klog.Infof("reporter plugin manager started")
m.reporter.Run(ctx)

general.RegisterHeartbeatCheck(healthzNameReporterFetcherReady, m.reconcilePeriod*healthzGracePeriodMultiplier,
general.HealthzCheckStateReady, m.reconcilePeriod*healthzGracePeriodMultiplier)
go wait.UntilWithContext(ctx, m.healthz, m.reconcilePeriod)
}

func (m *ReporterPluginManager) isVersionCompatibleWithPlugin(versions []string) bool {
Expand Down Expand Up @@ -294,9 +292,9 @@ func (m *ReporterPluginManager) runEndpoint(pluginName string, e plugin.Endpoint
// and the manager can read it from Endpoint cache to obtain content changes initiative
func (m *ReporterPluginManager) genericCallback(pluginName string, _ *v1alpha1.GetReportContentResponse) {
klog.Infof("genericCallback")
// get report content from each healthy Endpoint from cache, the lastly response
// get report content from each healthy Endpoint from cache, the last response
// from this plugin has been already stored to its Endpoint cache before this callback called
reportResponses := m.getReportContent(true)
reportResponses, _ := m.getReportContent(true)

err := m.pushContents(context.Background(), reportResponses)
if err != nil {
Expand All @@ -318,6 +316,11 @@ func (m *ReporterPluginManager) pushContents(ctx context.Context, reportResponse
// genericSync periodically calls the Get function to obtain content changes
func (m *ReporterPluginManager) genericSync(ctx context.Context) {
klog.Infof("genericSync")
errList := make([]error, 0)
defer func() {
_ = general.UpdateHealthzStateByError(reporterFetcherHealthCheckName, errors.NewAggregate(errList))
}()

begin := time.Now()
defer func() {
costs := time.Since(begin)
Expand All @@ -329,15 +332,17 @@ func (m *ReporterPluginManager) genericSync(ctx context.Context) {
m.clearUnhealthyPlugin()

// get report content from each healthy Endpoint directly
reportResponses := m.getReportContent(false)

err := m.pushContents(ctx, reportResponses)
reportResponses, err := m.getReportContent(false)
if err != nil {
errList = append(errList, err)
}

pushErr := m.pushContents(ctx, reportResponses)
if pushErr != nil {
_ = m.emitter.StoreInt64("reporter_plugin_sync_push_failed", 1, metrics.MetricTypeNameCount)
klog.Errorf("report plugin failed with error: %v", err)
errList = append(errList, pushErr)
}

m.healthzSyncLoop()
}

// clearUnhealthyPlugin is to clear stopped plugins from cache which exceeded grace period
Expand All @@ -360,8 +365,9 @@ func (m *ReporterPluginManager) clearUnhealthyPlugin() {

// getReportContent is to get reportContent from plugins. if cacheFirst is true,
// use plugin cache (when it is no nil), otherwise we call plugin directly.
func (m *ReporterPluginManager) getReportContent(cacheFirst bool) map[string]*v1alpha1.GetReportContentResponse {
func (m *ReporterPluginManager) getReportContent(cacheFirst bool) (map[string]*v1alpha1.GetReportContentResponse, error) {
reportResponses := make(map[string]*v1alpha1.GetReportContentResponse)
errList := make([]error, 0)

begin := time.Now()
m.mutex.Lock()
Expand Down Expand Up @@ -395,6 +401,7 @@ func (m *ReporterPluginManager) getReportContent(cacheFirst bool) map[string]*v1
klog.InfoS("GetReportContent", "costs", epCosts, "pluginName", pluginName)
_ = m.emitter.StoreInt64(metricsNameGetContentPluginCost, epCosts.Microseconds(), metrics.MetricTypeNameRaw, []metrics.MetricTag{{Key: "plugin", Val: pluginName}}...)
if err != nil {
errList = append(errList, err)
s, _ := status.FromError(err)
_ = m.emitter.StoreInt64("reporter_plugin_get_content_failed", 1, metrics.MetricTypeNameCount, []metrics.MetricTag{
{Key: "code", Val: s.Code().String()},
Expand All @@ -409,7 +416,7 @@ func (m *ReporterPluginManager) getReportContent(cacheFirst bool) map[string]*v1
reportResponses[pluginName] = resp
}

return reportResponses
return reportResponses, errors.NewAggregate(errList)
}

func (m *ReporterPluginManager) writeCheckpoint(reportResponses map[string]*v1alpha1.GetReportContentResponse) error {
Expand Down
Loading
Loading