Skip to content

Commit

Permalink
feat(qrm): refine healthz
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzhhb committed Feb 29, 2024
1 parent 48165f3 commit 6219b51
Show file tree
Hide file tree
Showing 20 changed files with 468 additions and 148 deletions.
6 changes: 3 additions & 3 deletions cmd/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,13 @@ func (c *GenericContext) StartInformer(ctx context.Context) {
// serveHealthZHTTP is used to provide health check for current running components.
func (c *GenericContext) serveHealthZHTTP(mux *http.ServeMux) {
mux.HandleFunc(healthZPath, func(w http.ResponseWriter, r *http.Request) {
ok, reason := c.healthChecker.CheckHealthy()
ok, content := c.healthChecker.CheckHealthy()
if ok {
w.WriteHeader(200)
_, _ = w.Write([]byte("ok"))
_, _ = w.Write([]byte(content))
} else {
w.WriteHeader(500)
_, _ = w.Write([]byte(reason))
_, _ = w.Write([]byte(content))
}
})
}
Expand Down
43 changes: 11 additions & 32 deletions cmd/base/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package katalyst_base
import (
"context"
"encoding/json"
"time"

"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/kubewharf/katalyst-core/pkg/util/general"
)
Expand All @@ -39,41 +37,22 @@ func NewHealthzChecker() *HealthzChecker {
}
}

func (h *HealthzChecker) Run(ctx context.Context) {
go wait.Until(h.check, time.Second*3, ctx.Done())
}
func (h *HealthzChecker) Run(_ context.Context) {}

// CheckHealthy returns whether the component is healthy.
func (h *HealthzChecker) CheckHealthy() (bool, string) {
if reason := h.unhealthyReason.Load(); reason != "" {
return false, reason
}

return true, ""
}

// Since readiness check in kubernetes supports to config with graceful seconds both for
// failed-to-success and success-to-failed, so we don't need to record the detailed state,
// state transition time, and state lasting period here.
//
// for more information about readiness check, please refer to
// https://github.com/kubernetes/api/blob/9a776fe3a720323e4f706b3ebb462b3dd661634f/core/v1/types.go#L2565
func (h *HealthzChecker) check() {
responses := general.CheckHealthz()

// every time we call healthz check functions, we will override the healthz map instead of
// replacing. If some checks must be ensured to exist, this strategy might not work.
unhealthyReasons := make(map[string]general.HealthzCheckResponse)
for name, resp := range responses {
if resp.State != general.HealthzCheckStateReady {
unhealthyReasons[string(name)] = resp
results := general.GetRegisterReadinessCheckResult()
healthy := true
for _, result := range results {
if !result.Ready {
healthy = false
}
}

if len(unhealthyReasons) > 0 {
reasons, _ := json.Marshal(unhealthyReasons)
h.unhealthyReason.Store(string(reasons))
} else {
h.unhealthyReason.Store("")
resultBytes, err := json.Marshal(results)
if err != nil {
general.Errorf("marshal healthz content failed,err:%v", err)
}

return healthy, string(resultBytes)
}
13 changes: 2 additions & 11 deletions cmd/katalyst-agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"
"time"

"go.uber.org/atomic"
"k8s.io/klog/v2"

katalystbase "github.com/kubewharf/katalyst-core/cmd/base"
Expand Down Expand Up @@ -149,19 +148,11 @@ func monitorAgentStart(genericCtx *agent.GenericContext) {
// any process that wants to enter main loop, should acquire file lock firstly.
func acquireLock(genericCtx *agent.GenericContext, conf *config.Configuration) *general.Flock {
// register a not-ready state for lock-acquiring when we starts
state := atomic.NewString(string(general.HealthzCheckStateNotReady))
message := atomic.NewString("locking-file is failed to acquire")
general.RegisterHealthzCheckRules(healthzNameLockingFileAcquired, func() (general.HealthzCheckResponse, error) {
return general.HealthzCheckResponse{
State: general.HealthzCheckState(state.Load()),
Message: message.Load(),
}, nil
})
general.RegisterHeartbeatCheck(healthzNameLockingFileAcquired, 0, general.HealthzCheckStateNotReady, 0)

// set a ready state for lock-acquiring when we acquire locking successfully
defer func() {
state.Store(string(general.HealthzCheckStateReady))
message.Store("")
_ = general.UpdateHealthzState(healthzNameLockingFileAcquired, general.HealthzCheckStateReady, "")
}()

for {
Expand Down
10 changes: 9 additions & 1 deletion pkg/agent/qrm-plugins/cpu/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@ limitations under the License.

package consts

import "github.com/kubewharf/katalyst-api/pkg/consts"
import (
"github.com/kubewharf/katalyst-api/pkg/consts"
)

const (
// CPUResourcePluginPolicyNameDynamic is the name of the dynamic policy.
CPUResourcePluginPolicyNameDynamic = string(consts.ResourcePluginPolicyNameDynamic)

// CPUResourcePluginPolicyNameNative is the name of the native policy.
CPUResourcePluginPolicyNameNative = string(consts.ResourcePluginPolicyNameNative)

CPUPluginDynamicPolicyName = "qrm_cpu_plugin_" + CPUResourcePluginPolicyNameDynamic
ClearResidualState = CPUPluginDynamicPolicyName + "_clear_residual_state"
CheckCPUSet = CPUPluginDynamicPolicyName + "_check_cpuset"
SyncCPUIdle = CPUPluginDynamicPolicyName + "_sync_cpu_idle"
CommunicateWithAdvisor = CPUPluginDynamicPolicyName + "_communicate_with_advisor"
)
24 changes: 21 additions & 3 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ const (
stateCheckPeriod = 30 * time.Second
maxResidualTime = 5 * time.Minute
syncCPUIdlePeriod = 30 * time.Second

healthCheckTolerationTimes = 3
)

var (
Expand Down Expand Up @@ -270,8 +272,18 @@ func (p *DynamicPolicy) Start() (err error) {
go wait.Until(func() {
_ = p.emitter.StoreInt64(util.MetricNameHeartBeat, 1, metrics.MetricTypeNameRaw)
}, time.Second*30, p.stopCh)
go wait.Until(p.clearResidualState, stateCheckPeriod, p.stopCh)
go wait.Until(p.checkCPUSet, cpusetCheckPeriod, p.stopCh)

err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(cpuconsts.ClearResidualState, general.HealthzCheckStateNotReady,
qrm.QRMCPUPluginPeriodicalHandlerGroupName, p.clearResidualState, stateCheckPeriod, healthCheckTolerationTimes)
if err != nil {
general.Errorf("start %v failed,err:%v", cpuconsts.ClearResidualState, err)
}

err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(cpuconsts.CheckCPUSet, general.HealthzCheckStateNotReady,
qrm.QRMCPUPluginPeriodicalHandlerGroupName, p.checkCPUSet, cpusetCheckPeriod, healthCheckTolerationTimes)
if err != nil {
general.Errorf("start %v failed,err:%v", cpuconsts.CheckCPUSet, err)
}

// start cpu-idle syncing if needed
if p.enableSyncingCPUIdle {
Expand All @@ -280,7 +292,12 @@ func (p *DynamicPolicy) Start() (err error) {
if p.reclaimRelativeRootCgroupPath == "" {
return fmt.Errorf("enable syncing cpu idle but not set reclaiemd relative root cgroup path in configuration")
}
go wait.Until(p.syncCPUIdle, syncCPUIdlePeriod, p.stopCh)

err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(cpuconsts.SyncCPUIdle, general.HealthzCheckStateNotReady,
qrm.QRMCPUPluginPeriodicalHandlerGroupName, p.syncCPUIdle, syncCPUIdlePeriod, healthCheckTolerationTimes)
if err != nil {
general.Errorf("start %v failed,err:%v", cpuconsts.SyncCPUIdle, err)
}
}

// start cpu-pressure eviction plugin if needed
Expand Down Expand Up @@ -338,6 +355,7 @@ func (p *DynamicPolicy) Start() (err error) {
}
}

general.RegisterHeartbeatCheck(cpuconsts.CommunicateWithAdvisor, 2*time.Minute, general.HealthzCheckStateNotReady, 2*time.Minute)
go wait.BackoffUntil(communicateWithCPUAdvisorServer, wait.NewExponentialBackoffManager(800*time.Millisecond,
30*time.Second, 2*time.Minute, 2.0, 0, &clock.RealClock{}), true, p.stopCh)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc"
cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/calculator"
advisorapi "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
Expand Down Expand Up @@ -242,6 +243,7 @@ func (p *DynamicPolicy) lwCPUAdvisorServer(stopCh <-chan struct{}) error {
if err != nil {
general.Errorf("allocate by ListAndWatch response of CPUAdvisorServer failed with error: %v", err)
}
_ = general.UpdateHealthzStateByError(cpuconsts.CommunicateWithAdvisor, err)
}
}

Expand Down
72 changes: 62 additions & 10 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_async_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ import (
"fmt"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc"
cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
coreconfig "github.com/kubewharf/katalyst-core/pkg/config"
dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metrics"
cgroupcm "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
cgroupcmutils "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager"
Expand All @@ -35,8 +40,29 @@ import (
)

// checkCPUSet emit errors if the memory allocation falls into unexpected results
func (p *DynamicPolicy) checkCPUSet() {
func (p *DynamicPolicy) checkCPUSet(_ *coreconfig.Configuration,
_ interface{},
_ *dynamicconfig.DynamicAgentConfiguration,
_ metrics.MetricEmitter,
_ *metaserver.MetaServer) {
general.Infof("exec checkCPUSet")
var (
err error
invalidCPUSet = false
cpuSetOverlap = false
)

defer func() {
if err != nil {
_ = general.UpdateHealthzStateByError(cpuconsts.CheckCPUSet, err)
} else if invalidCPUSet {
_ = general.UpdateHealthzState(cpuconsts.CheckCPUSet, general.HealthzCheckStateNotReady, "invalid cpuset exists")
} else if cpuSetOverlap {
_ = general.UpdateHealthzState(cpuconsts.CheckCPUSet, general.HealthzCheckStateNotReady, "cpuset overlap")
} else {
_ = general.UpdateHealthzState(cpuconsts.CheckCPUSet, general.HealthzCheckStateReady, "")
}
}()

podEntries := p.state.GetPodEntries()
actualCPUSets := make(map[string]map[string]machine.CPUSet)
Expand All @@ -59,14 +85,18 @@ func (p *DynamicPolicy) checkCPUSet() {
"podName": allocationInfo.PodName,
"containerName": allocationInfo.ContainerName,
})
var (
containerId string
cpuSetStats *cgroupcm.CPUSetStats
)

containerId, err := p.metaServer.GetContainerID(podUID, containerName)
containerId, err = p.metaServer.GetContainerID(podUID, containerName)
if err != nil {
general.Errorf("get container id of pod: %s container: %s failed with error: %v", podUID, containerName, err)
continue
}

cpusetStats, err := cgroupcmutils.GetCPUSetForContainer(podUID, containerId)
cpuSetStats, err = cgroupcmutils.GetCPUSetForContainer(podUID, containerId)
if err != nil {
general.Errorf("GetCPUSet of pod: %s container: name(%s), id(%s) failed with error: %v",
podUID, containerName, containerId, err)
Expand All @@ -77,7 +107,7 @@ func (p *DynamicPolicy) checkCPUSet() {
if actualCPUSets[podUID] == nil {
actualCPUSets[podUID] = make(map[string]machine.CPUSet)
}
actualCPUSets[podUID][containerName] = machine.MustParse(cpusetStats.CPUs)
actualCPUSets[podUID][containerName] = machine.MustParse(cpuSetStats.CPUs)

general.Infof("pod: %s/%s, container: %s, state CPUSet: %s, actual CPUSet: %s",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName,
Expand All @@ -89,6 +119,7 @@ func (p *DynamicPolicy) checkCPUSet() {
}

if !actualCPUSets[podUID][containerName].Equals(allocationInfo.OriginalAllocationResult) {
invalidCPUSet = true
general.Errorf("pod: %s/%s, container: %s, cpuset invalid",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName)
_ = p.emitter.StoreInt64(util.MetricNameCPUSetInvalid, 1, metrics.MetricTypeNameRaw, tags...)
Expand All @@ -99,7 +130,6 @@ func (p *DynamicPolicy) checkCPUSet() {
unionDedicatedCPUSet := machine.NewCPUSet()
unionSharedCPUSet := machine.NewCPUSet()

var cpuSetOverlap bool
for podUID, containerEntries := range actualCPUSets {
for containerName, cset := range containerEntries {
allocationInfo := podEntries[podUID][containerName]
Expand Down Expand Up @@ -139,17 +169,29 @@ func (p *DynamicPolicy) checkCPUSet() {
}

// clearResidualState is used to clean residual pods in local state
func (p *DynamicPolicy) clearResidualState() {
func (p *DynamicPolicy) clearResidualState(_ *coreconfig.Configuration,
_ interface{},
_ *dynamicconfig.DynamicAgentConfiguration,
_ metrics.MetricEmitter,
_ *metaserver.MetaServer) {
general.Infof("exec clearResidualState")
var (
err error
podList []*v1.Pod
)
residualSet := make(map[string]bool)

defer func() {
_ = general.UpdateHealthzStateByError(cpuconsts.ClearResidualState, err)
}()

if p.metaServer == nil {
general.Errorf("nil metaServer")
return
}

ctx := context.Background()
podList, err := p.metaServer.GetPodList(ctx, nil)
podList, err = p.metaServer.GetPodList(ctx, nil)
if err != nil {
general.Errorf("get pod list failed: %v", err)
return
Expand Down Expand Up @@ -211,7 +253,8 @@ func (p *DynamicPolicy) clearResidualState() {
delete(podEntries, podUID)
}

updatedMachineState, err := generateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries)
var updatedMachineState state.NUMANodeMap
updatedMachineState, err = generateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries)
if err != nil {
general.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
return
Expand All @@ -228,14 +271,23 @@ func (p *DynamicPolicy) clearResidualState() {
}

// syncCPUIdle is used to set cpu idle for reclaimed cores
func (p *DynamicPolicy) syncCPUIdle() {
func (p *DynamicPolicy) syncCPUIdle(_ *coreconfig.Configuration,
_ interface{},
_ *dynamicconfig.DynamicAgentConfiguration,
_ metrics.MetricEmitter,
_ *metaserver.MetaServer) {
general.Infof("exec syncCPUIdle")
var err error
defer func() {
_ = general.UpdateHealthzStateByError(cpuconsts.SyncCPUIdle, err)
}()

if !cgroupcm.IsCPUIdleSupported() {
general.Warningf("cpu idle isn't unsupported, skip syncing")
return
}

err := cgroupcmutils.ApplyCPUWithRelativePath(p.reclaimRelativeRootCgroupPath, &cgroupcm.CPUData{CpuIdlePtr: &p.enableCPUIdle})
err = cgroupcmutils.ApplyCPUWithRelativePath(p.reclaimRelativeRootCgroupPath, &cgroupcm.CPUData{CpuIdlePtr: &p.enableCPUIdle})
if err != nil {
general.Errorf("ApplyCPUWithRelativePath in %s with enableCPUIdle: %v in failed with error: %v",
p.reclaimRelativeRootCgroupPath, p.enableCPUIdle, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2937,7 +2937,7 @@ func TestClearResidualState(t *testing.T) {
dynamicPolicy, err := getTestDynamicPolicyWithInitialization(cpuTopology, tmpDir)
as.Nil(err)

dynamicPolicy.clearResidualState()
dynamicPolicy.clearResidualState(nil, nil, nil, nil, nil)
}

func TestStart(t *testing.T) {
Expand Down Expand Up @@ -2993,7 +2993,7 @@ func TestCheckCPUSet(t *testing.T) {
dynamicPolicy, err := getTestDynamicPolicyWithInitialization(cpuTopology, tmpDir)
as.Nil(err)

dynamicPolicy.checkCPUSet()
dynamicPolicy.checkCPUSet(nil, nil, nil, nil, nil)
}

func TestSchedIdle(t *testing.T) {
Expand Down
Loading

0 comments on commit 6219b51

Please sign in to comment.