Skip to content

Commit

Permalink
feat(qrm): refine healthz
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzhhb committed Feb 23, 2024
1 parent d0d9365 commit edf17a0
Show file tree
Hide file tree
Showing 16 changed files with 361 additions and 127 deletions.
6 changes: 3 additions & 3 deletions cmd/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,13 @@ func (c *GenericContext) StartInformer(ctx context.Context) {
// serveHealthZHTTP is used to provide health check for current running components.
func (c *GenericContext) serveHealthZHTTP(mux *http.ServeMux) {
mux.HandleFunc(healthZPath, func(w http.ResponseWriter, r *http.Request) {
ok, reason := c.healthChecker.CheckHealthy()
ok, content := c.healthChecker.CheckHealthy()
if ok {
w.WriteHeader(200)
_, _ = w.Write([]byte("ok"))
_, _ = w.Write([]byte(content))
} else {
w.WriteHeader(500)
_, _ = w.Write([]byte(reason))
_, _ = w.Write([]byte(content))
}
})
}
Expand Down
62 changes: 31 additions & 31 deletions cmd/base/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package katalyst_base
import (
"context"
"encoding/json"
"time"

"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/util/wait"
"time"

"github.com/kubewharf/katalyst-core/pkg/util/general"
)
Expand All @@ -39,41 +37,43 @@ func NewHealthzChecker() *HealthzChecker {
}
}

func (h *HealthzChecker) Run(ctx context.Context) {
go wait.Until(h.check, time.Second*3, ctx.Done())
}
func (h *HealthzChecker) Run(_ context.Context) {}

// CheckHealthy returns whether the component is healthy.
func (h *HealthzChecker) CheckHealthy() (bool, string) {
if reason := h.unhealthyReason.Load(); reason != "" {
return false, reason
}

return true, ""
}
status := general.GetRegisterReadinessCheckStatus()
result := true
forLoop:
for _, checkStatus := range status {
switch checkStatus.Mode {
case general.HealthzCheckModeHeartBeat:
if checkStatus.TimeoutPeriod > 0 && time.Now().Sub(checkStatus.LastUpdateTime) > checkStatus.TimeoutPeriod {
result = false
break forLoop
}

// Since readiness check in kubernetes supports to config with graceful seconds both for
// failed-to-success and success-to-failed, so we don't need to record the detailed state,
// state transition time, and state lasting period here.
//
// for more information about readiness check, please refer to
// https://github.com/kubernetes/api/blob/9a776fe3a720323e4f706b3ebb462b3dd661634f/core/v1/types.go#L2565
func (h *HealthzChecker) check() {
responses := general.CheckHealthz()
if checkStatus.GracePeriod <= 0 && checkStatus.State != general.HealthzCheckStateReady {
result = false
break forLoop
}

// every time we call healthz check functions, we will override the healthz map instead of
// replacing. If some checks must be ensured to exist, this strategy might not work.
unhealthyReasons := make(map[string]general.HealthzCheckResponse)
for name, resp := range responses {
if resp.State != general.HealthzCheckStateReady {
unhealthyReasons[string(name)] = resp
if checkStatus.GracePeriod > 0 && time.Now().Sub(checkStatus.UnhealthyStartTime) > checkStatus.GracePeriod &&
checkStatus.State != general.HealthzCheckStateReady {
result = false
break forLoop
}
case general.HealthzCheckModeReport:
if checkStatus.LatestUnhealthyTime.After(time.Now().Add(-checkStatus.GracePeriod)) {
result = false
break forLoop
}
}
}

if len(unhealthyReasons) > 0 {
reasons, _ := json.Marshal(unhealthyReasons)
h.unhealthyReason.Store(string(reasons))
} else {
h.unhealthyReason.Store("")
statusBytes, err := json.Marshal(status)
if err != nil {
general.Errorf("marshal healthz content failed,err:%v", err)
}

return result, string(statusBytes)
}
13 changes: 2 additions & 11 deletions cmd/katalyst-agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"
"time"

"go.uber.org/atomic"
"k8s.io/klog/v2"

katalystbase "github.com/kubewharf/katalyst-core/cmd/base"
Expand Down Expand Up @@ -149,19 +148,11 @@ func monitorAgentStart(genericCtx *agent.GenericContext) {
// any process that wants to enter main loop, should acquire file lock firstly.
func acquireLock(genericCtx *agent.GenericContext, conf *config.Configuration) *general.Flock {
// register a not-ready state for lock-acquiring when we starts
state := atomic.NewString(string(general.HealthzCheckStateNotReady))
message := atomic.NewString("locking-file is failed to acquire")
general.RegisterHealthzCheckRules(healthzNameLockingFileAcquired, func() (general.HealthzCheckResponse, error) {
return general.HealthzCheckResponse{
State: general.HealthzCheckState(state.Load()),
Message: message.Load(),
}, nil
})
general.RegisterHeartbeatCheck(healthzNameLockingFileAcquired, 1*time.Minute, general.HealthzCheckStateNotReady, 1*time.Minute)

// set a ready state for lock-acquiring when we acquire locking successfully
defer func() {
state.Store(string(general.HealthzCheckStateReady))
message.Store("")
_ = general.UpdateHealthzState(healthzNameLockingFileAcquired, general.HealthzCheckStateReady, "")
}()

for {
Expand Down
22 changes: 21 additions & 1 deletion pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ const (
stateCheckPeriod = 30 * time.Second
maxResidualTime = 5 * time.Minute
syncCPUIdlePeriod = 30 * time.Second

HealthCheckDynamicPolicyName = qrm.QRMPluginNameCPU + "_" + cpuconsts.CPUResourcePluginPolicyNameDynamic
HealthCheckCPUPressureEviction = HealthCheckDynamicPolicyName + "_cpu_pressure_eviction"
HealthCheckClearResidualState = HealthCheckDynamicPolicyName + "_clear_residual_state"
HealthCheckCPUSet = HealthCheckDynamicPolicyName + "_check_cpuset"
HealthCheckSyncCPUIdle = HealthCheckDynamicPolicyName + "_sync_cpu_idle"
HealthCheckCommunicateWithAdvisor = HealthCheckDynamicPolicyName + "_communicate_with_advisor"
)

var (
Expand Down Expand Up @@ -270,12 +277,17 @@ func (p *DynamicPolicy) Start() (err error) {
go wait.Until(func() {
_ = p.emitter.StoreInt64(util.MetricNameHeartBeat, 1, metrics.MetricTypeNameRaw)
}, time.Second*30, p.stopCh)

general.RegisterHeartbeatCheck(HealthCheckClearResidualState, 3*time.Minute, general.HealthzCheckStateNotReady, 3*time.Minute)
go wait.Until(p.clearResidualState, stateCheckPeriod, p.stopCh)

general.RegisterHeartbeatCheck(HealthCheckCPUSet, 30*time.Second, general.HealthzCheckStateNotReady, 30*time.Second)
go wait.Until(p.checkCPUSet, cpusetCheckPeriod, p.stopCh)

// start cpu-idle syncing if needed
if p.enableSyncingCPUIdle {
general.Infof("syncCPUIdle enabled")
general.RegisterHeartbeatCheck(HealthCheckSyncCPUIdle, 3*time.Minute, general.HealthzCheckStateNotReady, 3*time.Minute)

if p.reclaimRelativeRootCgroupPath == "" {
return fmt.Errorf("enable syncing cpu idle but not set reclaiemd relative root cgroup path in configuration")
Expand Down Expand Up @@ -314,8 +326,15 @@ func (p *DynamicPolicy) Start() (err error) {
800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 0, &clock.RealClock{}), true, p.stopCh)

communicateWithCPUAdvisorServer := func() {
var (
conn *grpc.ClientConn
err error
)
defer func() {
_ = general.UpdateHealthzStateByError(HealthCheckCommunicateWithAdvisor, err)
}()
general.Infof("waiting cpu plugin checkpoint server serving confirmation")
if conn, err := process.Dial(p.cpuPluginSocketAbsPath, 5*time.Second); err != nil {
if conn, err = process.Dial(p.cpuPluginSocketAbsPath, 5*time.Second); err != nil {
general.Errorf("dial check at socket: %s failed with err: %v", p.cpuPluginSocketAbsPath, err)
return
} else {
Expand All @@ -338,6 +357,7 @@ func (p *DynamicPolicy) Start() (err error) {
}
}

general.RegisterHeartbeatCheck(HealthCheckCommunicateWithAdvisor, 2*time.Minute, general.HealthzCheckStateNotReady, 2*time.Minute)
go wait.BackoffUntil(communicateWithCPUAdvisorServer, wait.NewExponentialBackoffManager(800*time.Millisecond,
30*time.Second, 2*time.Minute, 2.0, 0, &clock.RealClock{}), true, p.stopCh)

Expand Down
50 changes: 43 additions & 7 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_async_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/kubewharf/katalyst-api/pkg/consts"
Expand All @@ -37,6 +38,23 @@ import (
// checkCPUSet emit errors if the memory allocation falls into unexpected results
func (p *DynamicPolicy) checkCPUSet() {
general.Infof("exec checkCPUSet")
var (
err error
invalidCPUSet = false
cpuSetOverlap = false
)

defer func() {
if err != nil {
_ = general.UpdateHealthzState(HealthCheckCPUSet, general.HealthzCheckStateNotReady, err.Error())
} else if invalidCPUSet {
_ = general.UpdateHealthzState(HealthCheckCPUSet, general.HealthzCheckStateNotReady, "invalid cpuset exists")
} else if cpuSetOverlap {
_ = general.UpdateHealthzState(HealthCheckCPUSet, general.HealthzCheckStateNotReady, "cpuset overlap")
} else {
_ = general.UpdateHealthzState(HealthCheckCPUSet, general.HealthzCheckStateReady, "")
}
}()

podEntries := p.state.GetPodEntries()
actualCPUSets := make(map[string]map[string]machine.CPUSet)
Expand All @@ -59,14 +77,18 @@ func (p *DynamicPolicy) checkCPUSet() {
"podName": allocationInfo.PodName,
"containerName": allocationInfo.ContainerName,
})
var (
containerId string
cpuSetStats *cgroupcm.CPUSetStats
)

containerId, err := p.metaServer.GetContainerID(podUID, containerName)
containerId, err = p.metaServer.GetContainerID(podUID, containerName)
if err != nil {
general.Errorf("get container id of pod: %s container: %s failed with error: %v", podUID, containerName, err)
continue
}

cpusetStats, err := cgroupcmutils.GetCPUSetForContainer(podUID, containerId)
cpuSetStats, err = cgroupcmutils.GetCPUSetForContainer(podUID, containerId)
if err != nil {
general.Errorf("GetCPUSet of pod: %s container: name(%s), id(%s) failed with error: %v",
podUID, containerName, containerId, err)
Expand All @@ -77,7 +99,7 @@ func (p *DynamicPolicy) checkCPUSet() {
if actualCPUSets[podUID] == nil {
actualCPUSets[podUID] = make(map[string]machine.CPUSet)
}
actualCPUSets[podUID][containerName] = machine.MustParse(cpusetStats.CPUs)
actualCPUSets[podUID][containerName] = machine.MustParse(cpuSetStats.CPUs)

general.Infof("pod: %s/%s, container: %s, state CPUSet: %s, actual CPUSet: %s",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName,
Expand All @@ -89,6 +111,7 @@ func (p *DynamicPolicy) checkCPUSet() {
}

if !actualCPUSets[podUID][containerName].Equals(allocationInfo.OriginalAllocationResult) {
invalidCPUSet = true
general.Errorf("pod: %s/%s, container: %s, cpuset invalid",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName)
_ = p.emitter.StoreInt64(util.MetricNameCPUSetInvalid, 1, metrics.MetricTypeNameRaw, tags...)
Expand All @@ -99,7 +122,6 @@ func (p *DynamicPolicy) checkCPUSet() {
unionDedicatedCPUSet := machine.NewCPUSet()
unionSharedCPUSet := machine.NewCPUSet()

var cpuSetOverlap bool
for podUID, containerEntries := range actualCPUSets {
for containerName, cset := range containerEntries {
allocationInfo := podEntries[podUID][containerName]
Expand Down Expand Up @@ -141,15 +163,23 @@ func (p *DynamicPolicy) checkCPUSet() {
// clearResidualState is used to clean residual pods in local state
func (p *DynamicPolicy) clearResidualState() {
general.Infof("exec clearResidualState")
var (
err error
podList []*v1.Pod
)
residualSet := make(map[string]bool)

defer func() {
_ = general.UpdateHealthzStateByError(HealthCheckClearResidualState, err)
}()

if p.metaServer == nil {
general.Errorf("nil metaServer")
return
}

ctx := context.Background()
podList, err := p.metaServer.GetPodList(ctx, nil)
podList, err = p.metaServer.GetPodList(ctx, nil)
if err != nil {
general.Errorf("get pod list failed: %v", err)
return
Expand Down Expand Up @@ -211,7 +241,8 @@ func (p *DynamicPolicy) clearResidualState() {
delete(podEntries, podUID)
}

updatedMachineState, err := generateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries)
var updatedMachineState state.NUMANodeMap
updatedMachineState, err = generateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries)
if err != nil {
general.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
return
Expand All @@ -230,12 +261,17 @@ func (p *DynamicPolicy) clearResidualState() {
// syncCPUIdle is used to set cpu idle for reclaimed cores
func (p *DynamicPolicy) syncCPUIdle() {
general.Infof("exec syncCPUIdle")
var err error
defer func() {
_ = general.UpdateHealthzStateByError(HealthCheckSyncCPUIdle, err)
}()

if !cgroupcm.IsCPUIdleSupported() {
general.Warningf("cpu idle isn't unsupported, skip syncing")
return
}

err := cgroupcmutils.ApplyCPUWithRelativePath(p.reclaimRelativeRootCgroupPath, &cgroupcm.CPUData{CpuIdlePtr: &p.enableCPUIdle})
err = cgroupcmutils.ApplyCPUWithRelativePath(p.reclaimRelativeRootCgroupPath, &cgroupcm.CPUData{CpuIdlePtr: &p.enableCPUIdle})
if err != nil {
general.Errorf("ApplyCPUWithRelativePath in %s with enableCPUIdle: %v in failed with error: %v",
p.reclaimRelativeRootCgroupPath, p.enableCPUIdle, err)
Expand Down
37 changes: 37 additions & 0 deletions pkg/agent/qrm-plugins/memory/consts/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package consts

import (
apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent/qrm"
)

const (
MemoryResourcePluginPolicyNameDynamic = string(apiconsts.ResourcePluginPolicyNameDynamic)

HealthCheckDynamicPolicyName = qrm.QRMPluginNameMemory + "_" + MemoryResourcePluginPolicyNameDynamic
HealthCheckClearResidualState = HealthCheckDynamicPolicyName + "_clear_residual_state"
HealthCheckMemSet = HealthCheckDynamicPolicyName + "_check_mem_set"
HealthCheckApplyExternalCGParams = HealthCheckDynamicPolicyName + "_apply_external_cg_params"
HealthCheckSetExtraControlKnob = HealthCheckDynamicPolicyName + "_set_extra_control_knob"
HealthCheckSetMemoryMigrate = HealthCheckDynamicPolicyName + "_set_memory_migrate"
HealthCheckOOMPriority = HealthCheckDynamicPolicyName + "_oom_priority"
HealthCheckSetSockMem = HealthCheckDynamicPolicyName + "_set_sock_mem"
HealthCheckCommunicateWithAdvisor = HealthCheckDynamicPolicyName + "_communicate_with_advisor"
HealthCheckDropCache = HealthCheckDynamicPolicyName + "_drop_cache"
)
Loading

0 comments on commit edf17a0

Please sign in to comment.