Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(qrm): refine healthz #482

Merged
merged 2 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions cmd/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func NewGenericContext(

// add profiling and health check http paths listening on generic endpoint
serveProfilingHTTP(mux)
c.serveHealthZHTTP(mux)
c.serveHealthZHTTP(mux, genericConf.EnableHealthzCheck)

return c, nil
}
Expand Down Expand Up @@ -264,15 +264,21 @@ func (c *GenericContext) StartInformer(ctx context.Context) {
}

// serveHealthZHTTP is used to provide health check for current running components.
func (c *GenericContext) serveHealthZHTTP(mux *http.ServeMux) {
func (c *GenericContext) serveHealthZHTTP(mux *http.ServeMux, enableHealthzCheck bool) {
mux.HandleFunc(healthZPath, func(w http.ResponseWriter, r *http.Request) {
ok, reason := c.healthChecker.CheckHealthy()
if !enableHealthzCheck {
w.WriteHeader(200)
_, _ = w.Write([]byte("healthz check is disabled"))
return
}

ok, content := c.healthChecker.CheckHealthy()
if ok {
w.WriteHeader(200)
_, _ = w.Write([]byte("ok"))
_, _ = w.Write([]byte(content))
} else {
w.WriteHeader(500)
_, _ = w.Write([]byte(reason))
_, _ = w.Write([]byte(content))
}
})
}
Expand Down
43 changes: 11 additions & 32 deletions cmd/base/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package katalyst_base
import (
"context"
"encoding/json"
"time"

"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/kubewharf/katalyst-core/pkg/util/general"
)
Expand All @@ -39,41 +37,22 @@ func NewHealthzChecker() *HealthzChecker {
}
}

func (h *HealthzChecker) Run(ctx context.Context) {
go wait.Until(h.check, time.Second*3, ctx.Done())
}
func (h *HealthzChecker) Run(_ context.Context) {}

// CheckHealthy returns whether the component is healthy.
func (h *HealthzChecker) CheckHealthy() (bool, string) {
if reason := h.unhealthyReason.Load(); reason != "" {
return false, reason
}

return true, ""
}

// Since readiness check in kubernetes supports to config with graceful seconds both for
// failed-to-success and success-to-failed, so we don't need to record the detailed state,
// state transition time, and state lasting period here.
//
// for more information about readiness check, please refer to
// https://github.com/kubernetes/api/blob/9a776fe3a720323e4f706b3ebb462b3dd661634f/core/v1/types.go#L2565
func (h *HealthzChecker) check() {
responses := general.CheckHealthz()

// every time we call healthz check functions, we will override the healthz map instead of
// replacing. If some checks must be ensured to exist, this strategy might not work.
unhealthyReasons := make(map[string]general.HealthzCheckResponse)
for name, resp := range responses {
if resp.State != general.HealthzCheckStateReady {
unhealthyReasons[string(name)] = resp
results := general.GetRegisterReadinessCheckResult()
healthy := true
for _, result := range results {
if !result.Ready {
healthy = false
}
}

if len(unhealthyReasons) > 0 {
reasons, _ := json.Marshal(unhealthyReasons)
h.unhealthyReason.Store(string(reasons))
} else {
h.unhealthyReason.Store("")
resultBytes, err := json.Marshal(results)
if err != nil {
general.Errorf("marshal healthz content failed,err:%v", err)
}

return healthy, string(resultBytes)
}
6 changes: 5 additions & 1 deletion cmd/base/options/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (

// GenericOptions holds the configurations for multi components.
type GenericOptions struct {
DryRun bool
DryRun bool
EnableHealthzCheck bool

MasterURL string
KubeConfig string
Expand All @@ -53,6 +54,7 @@ type GenericOptions struct {
func NewGenericOptions() *GenericOptions {
return &GenericOptions{
DryRun: false,
EnableHealthzCheck: false,
TransformedInformerForPod: false,
GenericEndpoint: ":9316",
qosOptions: NewQoSOptions(),
Expand All @@ -75,6 +77,7 @@ func (o *GenericOptions) AddFlags(fss *cliflag.NamedFlagSets) {
})

fs.BoolVar(&o.DryRun, "dry-run", o.DryRun, "A bool to enable and disable dry-run.")
fs.BoolVar(&o.EnableHealthzCheck, "enable-healthz-check", o.EnableHealthzCheck, "A bool to enable and disable healthz check.")

fs.BoolVar(&o.TransformedInformerForPod, "transformed-informer-pod", o.TransformedInformerForPod,
"whether we should enable the ability of transformed informer for pods")
Expand All @@ -100,6 +103,7 @@ func (o *GenericOptions) AddFlags(fss *cliflag.NamedFlagSets) {
// ApplyTo fills up config with options
func (o *GenericOptions) ApplyTo(c *generic.GenericConfiguration) error {
c.DryRun = o.DryRun
c.EnableHealthzCheck = o.EnableHealthzCheck

c.TransformedInformerForPod = o.TransformedInformerForPod

Expand Down
13 changes: 2 additions & 11 deletions cmd/katalyst-agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"
"time"

"go.uber.org/atomic"
"k8s.io/klog/v2"

katalystbase "github.com/kubewharf/katalyst-core/cmd/base"
Expand Down Expand Up @@ -149,19 +148,11 @@ func monitorAgentStart(genericCtx *agent.GenericContext) {
// any process that wants to enter main loop, should acquire file lock firstly.
func acquireLock(genericCtx *agent.GenericContext, conf *config.Configuration) *general.Flock {
// register a not-ready state for lock-acquiring when we starts
state := atomic.NewString(string(general.HealthzCheckStateNotReady))
message := atomic.NewString("locking-file is failed to acquire")
general.RegisterHealthzCheckRules(healthzNameLockingFileAcquired, func() (general.HealthzCheckResponse, error) {
return general.HealthzCheckResponse{
State: general.HealthzCheckState(state.Load()),
Message: message.Load(),
}, nil
})
general.RegisterHeartbeatCheck(healthzNameLockingFileAcquired, 0, general.HealthzCheckStateNotReady, 0)

// set a ready state for lock-acquiring when we acquire locking successfully
defer func() {
state.Store(string(general.HealthzCheckStateReady))
message.Store("")
_ = general.UpdateHealthzState(healthzNameLockingFileAcquired, general.HealthzCheckStateReady, "")
}()

for {
Expand Down
10 changes: 9 additions & 1 deletion pkg/agent/qrm-plugins/cpu/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@ limitations under the License.

package consts

import "github.com/kubewharf/katalyst-api/pkg/consts"
import (
"github.com/kubewharf/katalyst-api/pkg/consts"
)

const (
// CPUResourcePluginPolicyNameDynamic is the name of the dynamic policy.
CPUResourcePluginPolicyNameDynamic = string(consts.ResourcePluginPolicyNameDynamic)

// CPUResourcePluginPolicyNameNative is the name of the native policy.
CPUResourcePluginPolicyNameNative = string(consts.ResourcePluginPolicyNameNative)

CPUPluginDynamicPolicyName = "qrm_cpu_plugin_" + CPUResourcePluginPolicyNameDynamic
ClearResidualState = CPUPluginDynamicPolicyName + "_clear_residual_state"
CheckCPUSet = CPUPluginDynamicPolicyName + "_check_cpuset"
SyncCPUIdle = CPUPluginDynamicPolicyName + "_sync_cpu_idle"
CommunicateWithAdvisor = CPUPluginDynamicPolicyName + "_communicate_with_advisor"
)
24 changes: 21 additions & 3 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ const (
stateCheckPeriod = 30 * time.Second
maxResidualTime = 5 * time.Minute
syncCPUIdlePeriod = 30 * time.Second

healthCheckTolerationTimes = 3
)

var (
Expand Down Expand Up @@ -270,8 +272,18 @@ func (p *DynamicPolicy) Start() (err error) {
go wait.Until(func() {
_ = p.emitter.StoreInt64(util.MetricNameHeartBeat, 1, metrics.MetricTypeNameRaw)
}, time.Second*30, p.stopCh)
go wait.Until(p.clearResidualState, stateCheckPeriod, p.stopCh)
go wait.Until(p.checkCPUSet, cpusetCheckPeriod, p.stopCh)

err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(cpuconsts.ClearResidualState, general.HealthzCheckStateNotReady,
qrm.QRMCPUPluginPeriodicalHandlerGroupName, p.clearResidualState, stateCheckPeriod, healthCheckTolerationTimes)
if err != nil {
general.Errorf("start %v failed,err:%v", cpuconsts.ClearResidualState, err)
}

err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(cpuconsts.CheckCPUSet, general.HealthzCheckStateNotReady,
qrm.QRMCPUPluginPeriodicalHandlerGroupName, p.checkCPUSet, cpusetCheckPeriod, healthCheckTolerationTimes)
if err != nil {
general.Errorf("start %v failed,err:%v", cpuconsts.CheckCPUSet, err)
}

// start cpu-idle syncing if needed
if p.enableSyncingCPUIdle {
Expand All @@ -280,7 +292,12 @@ func (p *DynamicPolicy) Start() (err error) {
if p.reclaimRelativeRootCgroupPath == "" {
return fmt.Errorf("enable syncing cpu idle but not set reclaiemd relative root cgroup path in configuration")
}
go wait.Until(p.syncCPUIdle, syncCPUIdlePeriod, p.stopCh)

err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(cpuconsts.SyncCPUIdle, general.HealthzCheckStateNotReady,
qrm.QRMCPUPluginPeriodicalHandlerGroupName, p.syncCPUIdle, syncCPUIdlePeriod, healthCheckTolerationTimes)
if err != nil {
general.Errorf("start %v failed,err:%v", cpuconsts.SyncCPUIdle, err)
}
}

// start cpu-pressure eviction plugin if needed
Expand Down Expand Up @@ -338,6 +355,7 @@ func (p *DynamicPolicy) Start() (err error) {
}
}

general.RegisterHeartbeatCheck(cpuconsts.CommunicateWithAdvisor, 2*time.Minute, general.HealthzCheckStateNotReady, 2*time.Minute)
go wait.BackoffUntil(communicateWithCPUAdvisorServer, wait.NewExponentialBackoffManager(800*time.Millisecond,
30*time.Second, 2*time.Minute, 2.0, 0, &clock.RealClock{}), true, p.stopCh)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc"
cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/calculator"
advisorapi "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
Expand Down Expand Up @@ -242,6 +243,7 @@ func (p *DynamicPolicy) lwCPUAdvisorServer(stopCh <-chan struct{}) error {
if err != nil {
general.Errorf("allocate by ListAndWatch response of CPUAdvisorServer failed with error: %v", err)
}
_ = general.UpdateHealthzStateByError(cpuconsts.CommunicateWithAdvisor, err)
}
}

Expand Down
Loading
Loading