Skip to content

Commit

Permalink
feat(qrm): refine healthz (#482)
Browse files Browse the repository at this point in the history
* feat(qrm): refine healthz
  • Loading branch information
zzzzhhb authored Mar 13, 2024
1 parent 4912a09 commit 65db018
Show file tree
Hide file tree
Showing 24 changed files with 483 additions and 152 deletions.
16 changes: 11 additions & 5 deletions cmd/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func NewGenericContext(

// add profiling and health check http paths listening on generic endpoint
serveProfilingHTTP(mux)
c.serveHealthZHTTP(mux)
c.serveHealthZHTTP(mux, genericConf.EnableHealthzCheck)

return c, nil
}
Expand Down Expand Up @@ -264,15 +264,21 @@ func (c *GenericContext) StartInformer(ctx context.Context) {
}

// serveHealthZHTTP is used to provide health check for current running components.
func (c *GenericContext) serveHealthZHTTP(mux *http.ServeMux) {
func (c *GenericContext) serveHealthZHTTP(mux *http.ServeMux, enableHealthzCheck bool) {
mux.HandleFunc(healthZPath, func(w http.ResponseWriter, r *http.Request) {
ok, reason := c.healthChecker.CheckHealthy()
if !enableHealthzCheck {
w.WriteHeader(200)
_, _ = w.Write([]byte("healthz check is disabled"))
return
}

ok, content := c.healthChecker.CheckHealthy()
if ok {
w.WriteHeader(200)
_, _ = w.Write([]byte("ok"))
_, _ = w.Write([]byte(content))
} else {
w.WriteHeader(500)
_, _ = w.Write([]byte(reason))
_, _ = w.Write([]byte(content))
}
})
}
Expand Down
43 changes: 11 additions & 32 deletions cmd/base/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package katalyst_base
import (
"context"
"encoding/json"
"time"

"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/kubewharf/katalyst-core/pkg/util/general"
)
Expand All @@ -39,41 +37,22 @@ func NewHealthzChecker() *HealthzChecker {
}
}

func (h *HealthzChecker) Run(ctx context.Context) {
go wait.Until(h.check, time.Second*3, ctx.Done())
}
func (h *HealthzChecker) Run(_ context.Context) {}

// CheckHealthy returns whether the component is healthy.
func (h *HealthzChecker) CheckHealthy() (bool, string) {
if reason := h.unhealthyReason.Load(); reason != "" {
return false, reason
}

return true, ""
}

// Since readiness check in kubernetes supports to config with graceful seconds both for
// failed-to-success and success-to-failed, so we don't need to record the detailed state,
// state transition time, and state lasting period here.
//
// for more information about readiness check, please refer to
// https://github.com/kubernetes/api/blob/9a776fe3a720323e4f706b3ebb462b3dd661634f/core/v1/types.go#L2565
func (h *HealthzChecker) check() {
responses := general.CheckHealthz()

// every time we call healthz check functions, we will override the healthz map instead of
// replacing. If some checks must be ensured to exist, this strategy might not work.
unhealthyReasons := make(map[string]general.HealthzCheckResponse)
for name, resp := range responses {
if resp.State != general.HealthzCheckStateReady {
unhealthyReasons[string(name)] = resp
results := general.GetRegisterReadinessCheckResult()
healthy := true
for _, result := range results {
if !result.Ready {
healthy = false
}
}

if len(unhealthyReasons) > 0 {
reasons, _ := json.Marshal(unhealthyReasons)
h.unhealthyReason.Store(string(reasons))
} else {
h.unhealthyReason.Store("")
resultBytes, err := json.Marshal(results)
if err != nil {
general.Errorf("marshal healthz content failed,err:%v", err)
}

return healthy, string(resultBytes)
}
6 changes: 5 additions & 1 deletion cmd/base/options/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (

// GenericOptions holds the configurations for multi components.
type GenericOptions struct {
DryRun bool
DryRun bool
EnableHealthzCheck bool

MasterURL string
KubeConfig string
Expand All @@ -53,6 +54,7 @@ type GenericOptions struct {
func NewGenericOptions() *GenericOptions {
return &GenericOptions{
DryRun: false,
EnableHealthzCheck: false,
TransformedInformerForPod: false,
GenericEndpoint: ":9316",
qosOptions: NewQoSOptions(),
Expand All @@ -75,6 +77,7 @@ func (o *GenericOptions) AddFlags(fss *cliflag.NamedFlagSets) {
})

fs.BoolVar(&o.DryRun, "dry-run", o.DryRun, "A bool to enable and disable dry-run.")
fs.BoolVar(&o.EnableHealthzCheck, "enable-healthz-check", o.EnableHealthzCheck, "A bool to enable and disable healthz check.")

fs.BoolVar(&o.TransformedInformerForPod, "transformed-informer-pod", o.TransformedInformerForPod,
"whether we should enable the ability of transformed informer for pods")
Expand All @@ -100,6 +103,7 @@ func (o *GenericOptions) AddFlags(fss *cliflag.NamedFlagSets) {
// ApplyTo fills up config with options
func (o *GenericOptions) ApplyTo(c *generic.GenericConfiguration) error {
c.DryRun = o.DryRun
c.EnableHealthzCheck = o.EnableHealthzCheck

c.TransformedInformerForPod = o.TransformedInformerForPod

Expand Down
13 changes: 2 additions & 11 deletions cmd/katalyst-agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"
"time"

"go.uber.org/atomic"
"k8s.io/klog/v2"

katalystbase "github.com/kubewharf/katalyst-core/cmd/base"
Expand Down Expand Up @@ -149,19 +148,11 @@ func monitorAgentStart(genericCtx *agent.GenericContext) {
// any process that wants to enter main loop, should acquire file lock firstly.
func acquireLock(genericCtx *agent.GenericContext, conf *config.Configuration) *general.Flock {
// register a not-ready state for lock-acquiring when we starts
state := atomic.NewString(string(general.HealthzCheckStateNotReady))
message := atomic.NewString("locking-file is failed to acquire")
general.RegisterHealthzCheckRules(healthzNameLockingFileAcquired, func() (general.HealthzCheckResponse, error) {
return general.HealthzCheckResponse{
State: general.HealthzCheckState(state.Load()),
Message: message.Load(),
}, nil
})
general.RegisterHeartbeatCheck(healthzNameLockingFileAcquired, 0, general.HealthzCheckStateNotReady, 0)

// set a ready state for lock-acquiring when we acquire locking successfully
defer func() {
state.Store(string(general.HealthzCheckStateReady))
message.Store("")
_ = general.UpdateHealthzState(healthzNameLockingFileAcquired, general.HealthzCheckStateReady, "")
}()

for {
Expand Down
10 changes: 9 additions & 1 deletion pkg/agent/qrm-plugins/cpu/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@ limitations under the License.

package consts

import "github.com/kubewharf/katalyst-api/pkg/consts"
import (
"github.com/kubewharf/katalyst-api/pkg/consts"
)

const (
// CPUResourcePluginPolicyNameDynamic is the name of the dynamic policy.
CPUResourcePluginPolicyNameDynamic = string(consts.ResourcePluginPolicyNameDynamic)

// CPUResourcePluginPolicyNameNative is the name of the native policy.
CPUResourcePluginPolicyNameNative = string(consts.ResourcePluginPolicyNameNative)

CPUPluginDynamicPolicyName = "qrm_cpu_plugin_" + CPUResourcePluginPolicyNameDynamic
ClearResidualState = CPUPluginDynamicPolicyName + "_clear_residual_state"
CheckCPUSet = CPUPluginDynamicPolicyName + "_check_cpuset"
SyncCPUIdle = CPUPluginDynamicPolicyName + "_sync_cpu_idle"
CommunicateWithAdvisor = CPUPluginDynamicPolicyName + "_communicate_with_advisor"
)
24 changes: 21 additions & 3 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ const (
stateCheckPeriod = 30 * time.Second
maxResidualTime = 5 * time.Minute
syncCPUIdlePeriod = 30 * time.Second

healthCheckTolerationTimes = 3
)

var (
Expand Down Expand Up @@ -272,8 +274,18 @@ func (p *DynamicPolicy) Start() (err error) {
go wait.Until(func() {
_ = p.emitter.StoreInt64(util.MetricNameHeartBeat, 1, metrics.MetricTypeNameRaw)
}, time.Second*30, p.stopCh)
go wait.Until(p.clearResidualState, stateCheckPeriod, p.stopCh)
go wait.Until(p.checkCPUSet, cpusetCheckPeriod, p.stopCh)

err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(cpuconsts.ClearResidualState, general.HealthzCheckStateNotReady,
qrm.QRMCPUPluginPeriodicalHandlerGroupName, p.clearResidualState, stateCheckPeriod, healthCheckTolerationTimes)
if err != nil {
general.Errorf("start %v failed,err:%v", cpuconsts.ClearResidualState, err)
}

err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(cpuconsts.CheckCPUSet, general.HealthzCheckStateNotReady,
qrm.QRMCPUPluginPeriodicalHandlerGroupName, p.checkCPUSet, cpusetCheckPeriod, healthCheckTolerationTimes)
if err != nil {
general.Errorf("start %v failed,err:%v", cpuconsts.CheckCPUSet, err)
}

// start cpu-idle syncing if needed
if p.enableSyncingCPUIdle {
Expand All @@ -282,7 +294,12 @@ func (p *DynamicPolicy) Start() (err error) {
if p.reclaimRelativeRootCgroupPath == "" {
return fmt.Errorf("enable syncing cpu idle but not set reclaiemd relative root cgroup path in configuration")
}
go wait.Until(p.syncCPUIdle, syncCPUIdlePeriod, p.stopCh)

err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(cpuconsts.SyncCPUIdle, general.HealthzCheckStateNotReady,
qrm.QRMCPUPluginPeriodicalHandlerGroupName, p.syncCPUIdle, syncCPUIdlePeriod, healthCheckTolerationTimes)
if err != nil {
general.Errorf("start %v failed,err:%v", cpuconsts.SyncCPUIdle, err)
}
}

// start cpu-pressure eviction plugin if needed
Expand Down Expand Up @@ -340,6 +357,7 @@ func (p *DynamicPolicy) Start() (err error) {
}
}

general.RegisterHeartbeatCheck(cpuconsts.CommunicateWithAdvisor, 2*time.Minute, general.HealthzCheckStateNotReady, 2*time.Minute)
go wait.BackoffUntil(communicateWithCPUAdvisorServer, wait.NewExponentialBackoffManager(800*time.Millisecond,
30*time.Second, 2*time.Minute, 2.0, 0, &clock.RealClock{}), true, p.stopCh)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc"
cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/calculator"
advisorapi "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
Expand Down Expand Up @@ -242,6 +243,7 @@ func (p *DynamicPolicy) lwCPUAdvisorServer(stopCh <-chan struct{}) error {
if err != nil {
general.Errorf("allocate by ListAndWatch response of CPUAdvisorServer failed with error: %v", err)
}
_ = general.UpdateHealthzStateByError(cpuconsts.CommunicateWithAdvisor, err)
}
}

Expand Down
Loading

0 comments on commit 65db018

Please sign in to comment.