Skip to content

Commit

Permalink
feat(qrm): refine healthz
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzhhb committed Feb 26, 2024
1 parent d0d9365 commit e9763ce
Show file tree
Hide file tree
Showing 18 changed files with 385 additions and 125 deletions.
6 changes: 3 additions & 3 deletions cmd/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,13 @@ func (c *GenericContext) StartInformer(ctx context.Context) {
// serveHealthZHTTP is used to provide health check for current running components.
func (c *GenericContext) serveHealthZHTTP(mux *http.ServeMux) {
mux.HandleFunc(healthZPath, func(w http.ResponseWriter, r *http.Request) {
ok, reason := c.healthChecker.CheckHealthy()
ok, content := c.healthChecker.CheckHealthy()
if ok {
w.WriteHeader(200)
_, _ = w.Write([]byte("ok"))
_, _ = w.Write([]byte(content))
} else {
w.WriteHeader(500)
_, _ = w.Write([]byte(reason))
_, _ = w.Write([]byte(content))
}
})
}
Expand Down
43 changes: 11 additions & 32 deletions cmd/base/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package katalyst_base
import (
"context"
"encoding/json"
"time"

"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/kubewharf/katalyst-core/pkg/util/general"
)
Expand All @@ -39,41 +37,22 @@ func NewHealthzChecker() *HealthzChecker {
}
}

func (h *HealthzChecker) Run(ctx context.Context) {
go wait.Until(h.check, time.Second*3, ctx.Done())
}
func (h *HealthzChecker) Run(_ context.Context) {}

// CheckHealthy returns whether the component is healthy.
func (h *HealthzChecker) CheckHealthy() (bool, string) {
if reason := h.unhealthyReason.Load(); reason != "" {
return false, reason
}

return true, ""
}

// Since readiness check in kubernetes supports to config with graceful seconds both for
// failed-to-success and success-to-failed, so we don't need to record the detailed state,
// state transition time, and state lasting period here.
//
// for more information about readiness check, please refer to
// https://github.com/kubernetes/api/blob/9a776fe3a720323e4f706b3ebb462b3dd661634f/core/v1/types.go#L2565
func (h *HealthzChecker) check() {
responses := general.CheckHealthz()

// every time we call healthz check functions, we will override the healthz map instead of
// replacing. If some checks must be ensured to exist, this strategy might not work.
unhealthyReasons := make(map[string]general.HealthzCheckResponse)
for name, resp := range responses {
if resp.State != general.HealthzCheckStateReady {
unhealthyReasons[string(name)] = resp
results := general.GetRegisterReadinessCheckResult()
healthy := true
for _, result := range results {
if !result.Ready {
healthy = false
}
}

if len(unhealthyReasons) > 0 {
reasons, _ := json.Marshal(unhealthyReasons)
h.unhealthyReason.Store(string(reasons))
} else {
h.unhealthyReason.Store("")
resultBytes, err := json.Marshal(results)
if err != nil {
general.Errorf("marshal healthz content failed,err:%v", err)
}

return healthy, string(resultBytes)
}
13 changes: 2 additions & 11 deletions cmd/katalyst-agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"
"time"

"go.uber.org/atomic"
"k8s.io/klog/v2"

katalystbase "github.com/kubewharf/katalyst-core/cmd/base"
Expand Down Expand Up @@ -149,19 +148,11 @@ func monitorAgentStart(genericCtx *agent.GenericContext) {
// any process that wants to enter main loop, should acquire file lock firstly.
func acquireLock(genericCtx *agent.GenericContext, conf *config.Configuration) *general.Flock {
// register a not-ready state for lock-acquiring when we starts
state := atomic.NewString(string(general.HealthzCheckStateNotReady))
message := atomic.NewString("locking-file is failed to acquire")
general.RegisterHealthzCheckRules(healthzNameLockingFileAcquired, func() (general.HealthzCheckResponse, error) {
return general.HealthzCheckResponse{
State: general.HealthzCheckState(state.Load()),
Message: message.Load(),
}, nil
})
general.RegisterHeartbeatCheck(healthzNameLockingFileAcquired, 0, general.HealthzCheckStateNotReady, 0)

// set a ready state for lock-acquiring when we acquire locking successfully
defer func() {
state.Store(string(general.HealthzCheckStateReady))
message.Store("")
_ = general.UpdateHealthzState(healthzNameLockingFileAcquired, general.HealthzCheckStateReady, "")
}()

for {
Expand Down
10 changes: 9 additions & 1 deletion pkg/agent/qrm-plugins/cpu/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@ limitations under the License.

package consts

import "github.com/kubewharf/katalyst-api/pkg/consts"
import (
"github.com/kubewharf/katalyst-api/pkg/consts"
)

const (
// CPUResourcePluginPolicyNameDynamic is the name of the dynamic policy.
CPUResourcePluginPolicyNameDynamic = string(consts.ResourcePluginPolicyNameDynamic)

// CPUResourcePluginPolicyNameNative is the name of the native policy.
CPUResourcePluginPolicyNameNative = string(consts.ResourcePluginPolicyNameNative)

HealthCheckDynamicPolicyName = "qrm_cpu_plugin_" + CPUResourcePluginPolicyNameDynamic
HealthCheckClearResidualState = HealthCheckDynamicPolicyName + "_clear_residual_state"
HealthCheckCPUSet = HealthCheckDynamicPolicyName + "_check_cpuset"
HealthCheckSyncCPUIdle = HealthCheckDynamicPolicyName + "_sync_cpu_idle"
HealthCheckCommunicateWithAdvisor = HealthCheckDynamicPolicyName + "_communicate_with_advisor"
)
6 changes: 6 additions & 0 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,17 @@ func (p *DynamicPolicy) Start() (err error) {
go wait.Until(func() {
_ = p.emitter.StoreInt64(util.MetricNameHeartBeat, 1, metrics.MetricTypeNameRaw)
}, time.Second*30, p.stopCh)

general.RegisterHeartbeatCheck(cpuconsts.HealthCheckClearResidualState, 3*time.Minute, general.HealthzCheckStateNotReady, 3*time.Minute)
go wait.Until(p.clearResidualState, stateCheckPeriod, p.stopCh)

general.RegisterHeartbeatCheck(cpuconsts.HealthCheckCPUSet, 30*time.Second, general.HealthzCheckStateNotReady, 30*time.Second)
go wait.Until(p.checkCPUSet, cpusetCheckPeriod, p.stopCh)

// start cpu-idle syncing if needed
if p.enableSyncingCPUIdle {
general.Infof("syncCPUIdle enabled")
general.RegisterHeartbeatCheck(cpuconsts.HealthCheckSyncCPUIdle, 3*time.Minute, general.HealthzCheckStateNotReady, 3*time.Minute)

if p.reclaimRelativeRootCgroupPath == "" {
return fmt.Errorf("enable syncing cpu idle but not set reclaiemd relative root cgroup path in configuration")
Expand Down Expand Up @@ -338,6 +343,7 @@ func (p *DynamicPolicy) Start() (err error) {
}
}

general.RegisterHeartbeatCheck(cpuconsts.HealthCheckCommunicateWithAdvisor, 2*time.Minute, general.HealthzCheckStateNotReady, 2*time.Minute)
go wait.BackoffUntil(communicateWithCPUAdvisorServer, wait.NewExponentialBackoffManager(800*time.Millisecond,
30*time.Second, 2*time.Minute, 2.0, 0, &clock.RealClock{}), true, p.stopCh)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc"
cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/calculator"
advisorapi "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
Expand Down Expand Up @@ -242,6 +243,7 @@ func (p *DynamicPolicy) lwCPUAdvisorServer(stopCh <-chan struct{}) error {
if err != nil {
general.Errorf("allocate by ListAndWatch response of CPUAdvisorServer failed with error: %v", err)
}
_ = general.UpdateHealthzStateByError(cpuconsts.HealthCheckCommunicateWithAdvisor, err)
}
}

Expand Down
51 changes: 44 additions & 7 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_async_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"fmt"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc"
cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/metrics"
Expand All @@ -37,6 +39,23 @@ import (
// checkCPUSet emit errors if the memory allocation falls into unexpected results
func (p *DynamicPolicy) checkCPUSet() {
general.Infof("exec checkCPUSet")
var (
err error
invalidCPUSet = false
cpuSetOverlap = false
)

defer func() {
if err != nil {
_ = general.UpdateHealthzState(cpuconsts.HealthCheckCPUSet, general.HealthzCheckStateNotReady, err.Error())
} else if invalidCPUSet {
_ = general.UpdateHealthzState(cpuconsts.HealthCheckCPUSet, general.HealthzCheckStateNotReady, "invalid cpuset exists")
} else if cpuSetOverlap {
_ = general.UpdateHealthzState(cpuconsts.HealthCheckCPUSet, general.HealthzCheckStateNotReady, "cpuset overlap")
} else {
_ = general.UpdateHealthzState(cpuconsts.HealthCheckCPUSet, general.HealthzCheckStateReady, "")
}
}()

podEntries := p.state.GetPodEntries()
actualCPUSets := make(map[string]map[string]machine.CPUSet)
Expand All @@ -59,14 +78,18 @@ func (p *DynamicPolicy) checkCPUSet() {
"podName": allocationInfo.PodName,
"containerName": allocationInfo.ContainerName,
})
var (
containerId string
cpuSetStats *cgroupcm.CPUSetStats
)

containerId, err := p.metaServer.GetContainerID(podUID, containerName)
containerId, err = p.metaServer.GetContainerID(podUID, containerName)
if err != nil {
general.Errorf("get container id of pod: %s container: %s failed with error: %v", podUID, containerName, err)
continue
}

cpusetStats, err := cgroupcmutils.GetCPUSetForContainer(podUID, containerId)
cpuSetStats, err = cgroupcmutils.GetCPUSetForContainer(podUID, containerId)
if err != nil {
general.Errorf("GetCPUSet of pod: %s container: name(%s), id(%s) failed with error: %v",
podUID, containerName, containerId, err)
Expand All @@ -77,7 +100,7 @@ func (p *DynamicPolicy) checkCPUSet() {
if actualCPUSets[podUID] == nil {
actualCPUSets[podUID] = make(map[string]machine.CPUSet)
}
actualCPUSets[podUID][containerName] = machine.MustParse(cpusetStats.CPUs)
actualCPUSets[podUID][containerName] = machine.MustParse(cpuSetStats.CPUs)

general.Infof("pod: %s/%s, container: %s, state CPUSet: %s, actual CPUSet: %s",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName,
Expand All @@ -89,6 +112,7 @@ func (p *DynamicPolicy) checkCPUSet() {
}

if !actualCPUSets[podUID][containerName].Equals(allocationInfo.OriginalAllocationResult) {
invalidCPUSet = true
general.Errorf("pod: %s/%s, container: %s, cpuset invalid",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName)
_ = p.emitter.StoreInt64(util.MetricNameCPUSetInvalid, 1, metrics.MetricTypeNameRaw, tags...)
Expand All @@ -99,7 +123,6 @@ func (p *DynamicPolicy) checkCPUSet() {
unionDedicatedCPUSet := machine.NewCPUSet()
unionSharedCPUSet := machine.NewCPUSet()

var cpuSetOverlap bool
for podUID, containerEntries := range actualCPUSets {
for containerName, cset := range containerEntries {
allocationInfo := podEntries[podUID][containerName]
Expand Down Expand Up @@ -141,15 +164,23 @@ func (p *DynamicPolicy) checkCPUSet() {
// clearResidualState is used to clean residual pods in local state
func (p *DynamicPolicy) clearResidualState() {
general.Infof("exec clearResidualState")
var (
err error
podList []*v1.Pod
)
residualSet := make(map[string]bool)

defer func() {
_ = general.UpdateHealthzStateByError(cpuconsts.HealthCheckClearResidualState, err)
}()

if p.metaServer == nil {
general.Errorf("nil metaServer")
return
}

ctx := context.Background()
podList, err := p.metaServer.GetPodList(ctx, nil)
podList, err = p.metaServer.GetPodList(ctx, nil)
if err != nil {
general.Errorf("get pod list failed: %v", err)
return
Expand Down Expand Up @@ -211,7 +242,8 @@ func (p *DynamicPolicy) clearResidualState() {
delete(podEntries, podUID)
}

updatedMachineState, err := generateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries)
var updatedMachineState state.NUMANodeMap
updatedMachineState, err = generateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries)
if err != nil {
general.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
return
Expand All @@ -230,12 +262,17 @@ func (p *DynamicPolicy) clearResidualState() {
// syncCPUIdle is used to set cpu idle for reclaimed cores
func (p *DynamicPolicy) syncCPUIdle() {
general.Infof("exec syncCPUIdle")
var err error
defer func() {
_ = general.UpdateHealthzStateByError(cpuconsts.HealthCheckSyncCPUIdle, err)
}()

if !cgroupcm.IsCPUIdleSupported() {
general.Warningf("cpu idle isn't unsupported, skip syncing")
return
}

err := cgroupcmutils.ApplyCPUWithRelativePath(p.reclaimRelativeRootCgroupPath, &cgroupcm.CPUData{CpuIdlePtr: &p.enableCPUIdle})
err = cgroupcmutils.ApplyCPUWithRelativePath(p.reclaimRelativeRootCgroupPath, &cgroupcm.CPUData{CpuIdlePtr: &p.enableCPUIdle})
if err != nil {
general.Errorf("ApplyCPUWithRelativePath in %s with enableCPUIdle: %v in failed with error: %v",
p.reclaimRelativeRootCgroupPath, p.enableCPUIdle, err)
Expand Down
35 changes: 35 additions & 0 deletions pkg/agent/qrm-plugins/memory/consts/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package consts

import (
apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
)

const (
MemoryResourcePluginPolicyNameDynamic = string(apiconsts.ResourcePluginPolicyNameDynamic)

HealthCheckDynamicPolicyName = "qrm_memory_plugin_" + MemoryResourcePluginPolicyNameDynamic
HealthCheckClearResidualState = HealthCheckDynamicPolicyName + "_clear_residual_state"
HealthCheckMemSet = HealthCheckDynamicPolicyName + "_check_mem_set"
HealthCheckApplyExternalCGParams = HealthCheckDynamicPolicyName + "_apply_external_cg_params"
HealthCheckSetExtraControlKnob = HealthCheckDynamicPolicyName + "_set_extra_control_knob"
HealthCheckOOMPriority = HealthCheckDynamicPolicyName + "_oom_priority"
HealthCheckSetSockMem = HealthCheckDynamicPolicyName + "_set_sock_mem"
HealthCheckCommunicateWithAdvisor = HealthCheckDynamicPolicyName + "_communicate_with_advisor"
HealthCheckDropCache = HealthCheckDynamicPolicyName + "_drop_cache"
)
Loading

0 comments on commit e9763ce

Please sign in to comment.