diff --git a/cmd/base/context.go b/cmd/base/context.go index f13d3dcc0..905e2ee46 100644 --- a/cmd/base/context.go +++ b/cmd/base/context.go @@ -205,7 +205,7 @@ func NewGenericContext( // add profiling and health check http paths listening on generic endpoint serveProfilingHTTP(mux) - c.serveHealthZHTTP(mux) + c.serveHealthZHTTP(mux, genericConf.EnableHealthzCheck) return c, nil } @@ -264,15 +264,21 @@ func (c *GenericContext) StartInformer(ctx context.Context) { } // serveHealthZHTTP is used to provide health check for current running components. -func (c *GenericContext) serveHealthZHTTP(mux *http.ServeMux) { +func (c *GenericContext) serveHealthZHTTP(mux *http.ServeMux, enableHealthzCheck bool) { mux.HandleFunc(healthZPath, func(w http.ResponseWriter, r *http.Request) { - ok, reason := c.healthChecker.CheckHealthy() + if !enableHealthzCheck { + w.WriteHeader(200) + _, _ = w.Write([]byte("healthz check is disabled")) + return + } + + ok, content := c.healthChecker.CheckHealthy() if ok { w.WriteHeader(200) - _, _ = w.Write([]byte("ok")) + _, _ = w.Write([]byte(content)) } else { w.WriteHeader(500) - _, _ = w.Write([]byte(reason)) + _, _ = w.Write([]byte(content)) } }) } diff --git a/cmd/base/healthz.go b/cmd/base/healthz.go index 783f6f895..51ec15aa6 100644 --- a/cmd/base/healthz.go +++ b/cmd/base/healthz.go @@ -19,10 +19,8 @@ package katalyst_base import ( "context" "encoding/json" - "time" "go.uber.org/atomic" - "k8s.io/apimachinery/pkg/util/wait" "github.com/kubewharf/katalyst-core/pkg/util/general" ) @@ -39,41 +37,22 @@ func NewHealthzChecker() *HealthzChecker { } } -func (h *HealthzChecker) Run(ctx context.Context) { - go wait.Until(h.check, time.Second*3, ctx.Done()) -} +func (h *HealthzChecker) Run(_ context.Context) {} // CheckHealthy returns whether the component is healthy. func (h *HealthzChecker) CheckHealthy() (bool, string) { - if reason := h.unhealthyReason.Load(); reason != "" { - return false, reason - } - - return true, "" -} - -// Since readiness check in kubernetes supports to config with graceful seconds both for -// failed-to-success and success-to-failed, so we don't need to record the detailed state, -// state transition time, and state lasting period here. -// -// for more information about readiness check, please refer to -// https://github.com/kubernetes/api/blob/9a776fe3a720323e4f706b3ebb462b3dd661634f/core/v1/types.go#L2565 -func (h *HealthzChecker) check() { - responses := general.CheckHealthz() - - // every time we call healthz check functions, we will override the healthz map instead of - // replacing. If some checks must be ensured to exist, this strategy might not work. - unhealthyReasons := make(map[string]general.HealthzCheckResponse) - for name, resp := range responses { - if resp.State != general.HealthzCheckStateReady { - unhealthyReasons[string(name)] = resp + results := general.GetRegisterReadinessCheckResult() + healthy := true + for _, result := range results { + if !result.Ready { + healthy = false } } - if len(unhealthyReasons) > 0 { - reasons, _ := json.Marshal(unhealthyReasons) - h.unhealthyReason.Store(string(reasons)) - } else { - h.unhealthyReason.Store("") + resultBytes, err := json.Marshal(results) + if err != nil { + general.Errorf("marshal healthz content failed,err:%v", err) } + + return healthy, string(resultBytes) } diff --git a/cmd/base/options/generic.go b/cmd/base/options/generic.go index b78240d1b..af9e239db 100644 --- a/cmd/base/options/generic.go +++ b/cmd/base/options/generic.go @@ -31,7 +31,8 @@ import ( // GenericOptions holds the configurations for multi components. type GenericOptions struct { - DryRun bool + DryRun bool + EnableHealthzCheck bool MasterURL string KubeConfig string @@ -53,6 +54,7 @@ type GenericOptions struct { func NewGenericOptions() *GenericOptions { return &GenericOptions{ DryRun: false, + EnableHealthzCheck: false, TransformedInformerForPod: false, GenericEndpoint: ":9316", qosOptions: NewQoSOptions(), @@ -75,6 +77,7 @@ func (o *GenericOptions) AddFlags(fss *cliflag.NamedFlagSets) { }) fs.BoolVar(&o.DryRun, "dry-run", o.DryRun, "A bool to enable and disable dry-run.") + fs.BoolVar(&o.EnableHealthzCheck, "enable-healthz-check", o.EnableHealthzCheck, "A bool to enable and disable healthz check.") fs.BoolVar(&o.TransformedInformerForPod, "transformed-informer-pod", o.TransformedInformerForPod, "whether we should enable the ability of transformed informer for pods") @@ -100,6 +103,7 @@ func (o *GenericOptions) AddFlags(fss *cliflag.NamedFlagSets) { // ApplyTo fills up config with options func (o *GenericOptions) ApplyTo(c *generic.GenericConfiguration) error { c.DryRun = o.DryRun + c.EnableHealthzCheck = o.EnableHealthzCheck c.TransformedInformerForPod = o.TransformedInformerForPod diff --git a/cmd/katalyst-agent/app/agent.go b/cmd/katalyst-agent/app/agent.go index 91d900a1d..88e9fdc5d 100644 --- a/cmd/katalyst-agent/app/agent.go +++ b/cmd/katalyst-agent/app/agent.go @@ -22,7 +22,6 @@ import ( "sync" "time" - "go.uber.org/atomic" "k8s.io/klog/v2" katalystbase "github.com/kubewharf/katalyst-core/cmd/base" @@ -149,19 +148,11 @@ func monitorAgentStart(genericCtx *agent.GenericContext) { // any process that wants to enter main loop, should acquire file lock firstly. func acquireLock(genericCtx *agent.GenericContext, conf *config.Configuration) *general.Flock { // register a not-ready state for lock-acquiring when we starts - state := atomic.NewString(string(general.HealthzCheckStateNotReady)) - message := atomic.NewString("locking-file is failed to acquire") - general.RegisterHealthzCheckRules(healthzNameLockingFileAcquired, func() (general.HealthzCheckResponse, error) { - return general.HealthzCheckResponse{ - State: general.HealthzCheckState(state.Load()), - Message: message.Load(), - }, nil - }) + general.RegisterHeartbeatCheck(healthzNameLockingFileAcquired, 0, general.HealthzCheckStateNotReady, 0) // set a ready state for lock-acquiring when we acquire locking successfully defer func() { - state.Store(string(general.HealthzCheckStateReady)) - message.Store("") + _ = general.UpdateHealthzState(healthzNameLockingFileAcquired, general.HealthzCheckStateReady, "") }() for { diff --git a/pkg/agent/qrm-plugins/cpu/consts/consts.go b/pkg/agent/qrm-plugins/cpu/consts/consts.go index 00592be3e..c3af07923 100644 --- a/pkg/agent/qrm-plugins/cpu/consts/consts.go +++ b/pkg/agent/qrm-plugins/cpu/consts/consts.go @@ -16,7 +16,9 @@ limitations under the License. package consts -import "github.com/kubewharf/katalyst-api/pkg/consts" +import ( + "github.com/kubewharf/katalyst-api/pkg/consts" +) const ( // CPUResourcePluginPolicyNameDynamic is the name of the dynamic policy. @@ -24,4 +26,10 @@ const ( // CPUResourcePluginPolicyNameNative is the name of the native policy. CPUResourcePluginPolicyNameNative = string(consts.ResourcePluginPolicyNameNative) + + CPUPluginDynamicPolicyName = "qrm_cpu_plugin_" + CPUResourcePluginPolicyNameDynamic + ClearResidualState = CPUPluginDynamicPolicyName + "_clear_residual_state" + CheckCPUSet = CPUPluginDynamicPolicyName + "_check_cpuset" + SyncCPUIdle = CPUPluginDynamicPolicyName + "_sync_cpu_idle" + CommunicateWithAdvisor = CPUPluginDynamicPolicyName + "_communicate_with_advisor" ) diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go index 8c8f5c616..af38e9e94 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go @@ -66,6 +66,8 @@ const ( stateCheckPeriod = 30 * time.Second maxResidualTime = 5 * time.Minute syncCPUIdlePeriod = 30 * time.Second + + healthCheckTolerationTimes = 3 ) var ( @@ -270,8 +272,18 @@ func (p *DynamicPolicy) Start() (err error) { go wait.Until(func() { _ = p.emitter.StoreInt64(util.MetricNameHeartBeat, 1, metrics.MetricTypeNameRaw) }, time.Second*30, p.stopCh) - go wait.Until(p.clearResidualState, stateCheckPeriod, p.stopCh) - go wait.Until(p.checkCPUSet, cpusetCheckPeriod, p.stopCh) + + err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(cpuconsts.ClearResidualState, general.HealthzCheckStateNotReady, + qrm.QRMCPUPluginPeriodicalHandlerGroupName, p.clearResidualState, stateCheckPeriod, healthCheckTolerationTimes) + if err != nil { + general.Errorf("start %v failed,err:%v", cpuconsts.ClearResidualState, err) + } + + err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(cpuconsts.CheckCPUSet, general.HealthzCheckStateNotReady, + qrm.QRMCPUPluginPeriodicalHandlerGroupName, p.checkCPUSet, cpusetCheckPeriod, healthCheckTolerationTimes) + if err != nil { + general.Errorf("start %v failed,err:%v", cpuconsts.CheckCPUSet, err) + } // start cpu-idle syncing if needed if p.enableSyncingCPUIdle { @@ -280,7 +292,12 @@ func (p *DynamicPolicy) Start() (err error) { if p.reclaimRelativeRootCgroupPath == "" { return fmt.Errorf("enable syncing cpu idle but not set reclaiemd relative root cgroup path in configuration") } - go wait.Until(p.syncCPUIdle, syncCPUIdlePeriod, p.stopCh) + + err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(cpuconsts.SyncCPUIdle, general.HealthzCheckStateNotReady, + qrm.QRMCPUPluginPeriodicalHandlerGroupName, p.syncCPUIdle, syncCPUIdlePeriod, healthCheckTolerationTimes) + if err != nil { + general.Errorf("start %v failed,err:%v", cpuconsts.SyncCPUIdle, err) + } } // start cpu-pressure eviction plugin if needed @@ -338,6 +355,7 @@ func (p *DynamicPolicy) Start() (err error) { } } + general.RegisterHeartbeatCheck(cpuconsts.CommunicateWithAdvisor, 2*time.Minute, general.HealthzCheckStateNotReady, 2*time.Minute) go wait.BackoffUntil(communicateWithCPUAdvisorServer, wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 0, &clock.RealClock{}), true, p.stopCh) diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go index f850cac52..953ff9202 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go @@ -31,6 +31,7 @@ import ( "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc" + cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/calculator" advisorapi "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" @@ -242,6 +243,7 @@ func (p *DynamicPolicy) lwCPUAdvisorServer(stopCh <-chan struct{}) error { if err != nil { general.Errorf("allocate by ListAndWatch response of CPUAdvisorServer failed with error: %v", err) } + _ = general.UpdateHealthzStateByError(cpuconsts.CommunicateWithAdvisor, err) } } diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_async_handler.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_async_handler.go index 5d941b84c..4f332135d 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_async_handler.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_async_handler.go @@ -21,12 +21,17 @@ import ( "fmt" "time" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc" + cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" + coreconfig "github.com/kubewharf/katalyst-core/pkg/config" + dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" + "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" cgroupcm "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" cgroupcmutils "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager" @@ -35,8 +40,29 @@ import ( ) // checkCPUSet emit errors if the memory allocation falls into unexpected results -func (p *DynamicPolicy) checkCPUSet() { +func (p *DynamicPolicy) checkCPUSet(_ *coreconfig.Configuration, + _ interface{}, + _ *dynamicconfig.DynamicAgentConfiguration, + _ metrics.MetricEmitter, + _ *metaserver.MetaServer) { general.Infof("exec checkCPUSet") + var ( + err error + invalidCPUSet = false + cpuSetOverlap = false + ) + + defer func() { + if err != nil { + _ = general.UpdateHealthzStateByError(cpuconsts.CheckCPUSet, err) + } else if invalidCPUSet { + _ = general.UpdateHealthzState(cpuconsts.CheckCPUSet, general.HealthzCheckStateNotReady, "invalid cpuset exists") + } else if cpuSetOverlap { + _ = general.UpdateHealthzState(cpuconsts.CheckCPUSet, general.HealthzCheckStateNotReady, "cpuset overlap") + } else { + _ = general.UpdateHealthzState(cpuconsts.CheckCPUSet, general.HealthzCheckStateReady, "") + } + }() podEntries := p.state.GetPodEntries() actualCPUSets := make(map[string]map[string]machine.CPUSet) @@ -59,14 +85,18 @@ func (p *DynamicPolicy) checkCPUSet() { "podName": allocationInfo.PodName, "containerName": allocationInfo.ContainerName, }) + var ( + containerId string + cpuSetStats *cgroupcm.CPUSetStats + ) - containerId, err := p.metaServer.GetContainerID(podUID, containerName) + containerId, err = p.metaServer.GetContainerID(podUID, containerName) if err != nil { general.Errorf("get container id of pod: %s container: %s failed with error: %v", podUID, containerName, err) continue } - cpusetStats, err := cgroupcmutils.GetCPUSetForContainer(podUID, containerId) + cpuSetStats, err = cgroupcmutils.GetCPUSetForContainer(podUID, containerId) if err != nil { general.Errorf("GetCPUSet of pod: %s container: name(%s), id(%s) failed with error: %v", podUID, containerName, containerId, err) @@ -77,7 +107,7 @@ func (p *DynamicPolicy) checkCPUSet() { if actualCPUSets[podUID] == nil { actualCPUSets[podUID] = make(map[string]machine.CPUSet) } - actualCPUSets[podUID][containerName] = machine.MustParse(cpusetStats.CPUs) + actualCPUSets[podUID][containerName] = machine.MustParse(cpuSetStats.CPUs) general.Infof("pod: %s/%s, container: %s, state CPUSet: %s, actual CPUSet: %s", allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, @@ -89,6 +119,7 @@ func (p *DynamicPolicy) checkCPUSet() { } if !actualCPUSets[podUID][containerName].Equals(allocationInfo.OriginalAllocationResult) { + invalidCPUSet = true general.Errorf("pod: %s/%s, container: %s, cpuset invalid", allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName) _ = p.emitter.StoreInt64(util.MetricNameCPUSetInvalid, 1, metrics.MetricTypeNameRaw, tags...) @@ -99,7 +130,6 @@ func (p *DynamicPolicy) checkCPUSet() { unionDedicatedCPUSet := machine.NewCPUSet() unionSharedCPUSet := machine.NewCPUSet() - var cpuSetOverlap bool for podUID, containerEntries := range actualCPUSets { for containerName, cset := range containerEntries { allocationInfo := podEntries[podUID][containerName] @@ -139,17 +169,29 @@ func (p *DynamicPolicy) checkCPUSet() { } // clearResidualState is used to clean residual pods in local state -func (p *DynamicPolicy) clearResidualState() { +func (p *DynamicPolicy) clearResidualState(_ *coreconfig.Configuration, + _ interface{}, + _ *dynamicconfig.DynamicAgentConfiguration, + _ metrics.MetricEmitter, + _ *metaserver.MetaServer) { general.Infof("exec clearResidualState") + var ( + err error + podList []*v1.Pod + ) residualSet := make(map[string]bool) + defer func() { + _ = general.UpdateHealthzStateByError(cpuconsts.ClearResidualState, err) + }() + if p.metaServer == nil { general.Errorf("nil metaServer") return } ctx := context.Background() - podList, err := p.metaServer.GetPodList(ctx, nil) + podList, err = p.metaServer.GetPodList(ctx, nil) if err != nil { general.Errorf("get pod list failed: %v", err) return @@ -211,7 +253,8 @@ func (p *DynamicPolicy) clearResidualState() { delete(podEntries, podUID) } - updatedMachineState, err := generateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries) + var updatedMachineState state.NUMANodeMap + updatedMachineState, err = generateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries) if err != nil { general.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err) return @@ -228,14 +271,23 @@ func (p *DynamicPolicy) clearResidualState() { } // syncCPUIdle is used to set cpu idle for reclaimed cores -func (p *DynamicPolicy) syncCPUIdle() { +func (p *DynamicPolicy) syncCPUIdle(_ *coreconfig.Configuration, + _ interface{}, + _ *dynamicconfig.DynamicAgentConfiguration, + _ metrics.MetricEmitter, + _ *metaserver.MetaServer) { general.Infof("exec syncCPUIdle") + var err error + defer func() { + _ = general.UpdateHealthzStateByError(cpuconsts.SyncCPUIdle, err) + }() + if !cgroupcm.IsCPUIdleSupported() { general.Warningf("cpu idle isn't unsupported, skip syncing") return } - err := cgroupcmutils.ApplyCPUWithRelativePath(p.reclaimRelativeRootCgroupPath, &cgroupcm.CPUData{CpuIdlePtr: &p.enableCPUIdle}) + err = cgroupcmutils.ApplyCPUWithRelativePath(p.reclaimRelativeRootCgroupPath, &cgroupcm.CPUData{CpuIdlePtr: &p.enableCPUIdle}) if err != nil { general.Errorf("ApplyCPUWithRelativePath in %s with enableCPUIdle: %v in failed with error: %v", p.reclaimRelativeRootCgroupPath, p.enableCPUIdle, err) diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go index 51eae5b07..e263316f4 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go @@ -2937,7 +2937,7 @@ func TestClearResidualState(t *testing.T) { dynamicPolicy, err := getTestDynamicPolicyWithInitialization(cpuTopology, tmpDir) as.Nil(err) - dynamicPolicy.clearResidualState() + dynamicPolicy.clearResidualState(nil, nil, nil, nil, nil) } func TestStart(t *testing.T) { @@ -2993,7 +2993,7 @@ func TestCheckCPUSet(t *testing.T) { dynamicPolicy, err := getTestDynamicPolicyWithInitialization(cpuTopology, tmpDir) as.Nil(err) - dynamicPolicy.checkCPUSet() + dynamicPolicy.checkCPUSet(nil, nil, nil, nil, nil) } func TestSchedIdle(t *testing.T) { diff --git a/pkg/agent/qrm-plugins/memory/consts/consts.go b/pkg/agent/qrm-plugins/memory/consts/consts.go new file mode 100644 index 000000000..c2cfba521 --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/consts/consts.go @@ -0,0 +1,35 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package consts + +import ( + apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" +) + +const ( + MemoryResourcePluginPolicyNameDynamic = string(apiconsts.ResourcePluginPolicyNameDynamic) + + MemoryPluginDynamicPolicyName = "qrm_memory_plugin_" + MemoryResourcePluginPolicyNameDynamic + ClearResidualState = MemoryPluginDynamicPolicyName + "_clear_residual_state" + CheckMemSet = MemoryPluginDynamicPolicyName + "_check_mem_set" + ApplyExternalCGParams = MemoryPluginDynamicPolicyName + "_apply_external_cg_params" + SetExtraControlKnob = MemoryPluginDynamicPolicyName + "_set_extra_control_knob" + OOMPriority = MemoryPluginDynamicPolicyName + "_oom_priority" + SetSockMem = MemoryPluginDynamicPolicyName + "_set_sock_mem" + CommunicateWithAdvisor = MemoryPluginDynamicPolicyName + "_communicate_with_advisor" + DropCache = MemoryPluginDynamicPolicyName + "_drop_cache" +) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/oom/const.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/oom/const.go index 444860084..5fd4a2b5a 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/oom/const.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/oom/const.go @@ -18,5 +18,4 @@ package oom const ( ClearResidualOOMPriorityPeriodicalHandlerName = "clearResidualOOMPriority" - SyncOOMPriorityPriorityPeriodicalHandlerName = "syncOOMPriority" ) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go index 24e08ef1f..75c856dcb 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go @@ -36,6 +36,7 @@ import ( "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent/qrm" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate" + memconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/oom" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/state" @@ -74,6 +75,9 @@ const ( setExtraControlKnobsPeriod = 5 * time.Second clearOOMPriorityPeriod = 1 * time.Hour syncOOMPriorityPeriod = 5 * time.Second + + healthCheckTolerationTimes = 3 + dropCacheGracePeriod = 60 * time.Second ) var ( @@ -151,7 +155,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration v1.ResourceMemory: reservedMemory, } stateImpl, err := state.NewCheckpointState(conf.GenericQRMPluginConfiguration.StateFileDirectory, memoryPluginStateFileName, - MemoryResourcePluginPolicyNameDynamic, agentCtx.CPUTopology, agentCtx.MachineInfo, resourcesReservedMemory, conf.SkipMemoryStateCorruption) + memconsts.MemoryResourcePluginPolicyNameDynamic, agentCtx.CPUTopology, agentCtx.MachineInfo, resourcesReservedMemory, conf.SkipMemoryStateCorruption) if err != nil { return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", err) } @@ -172,7 +176,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(agentName, metrics.MetricTag{ Key: util.QRMPluginPolicyTagName, - Val: MemoryResourcePluginPolicyNameDynamic, + Val: memconsts.MemoryResourcePluginPolicyNameDynamic, }) policyImplement := &DynamicPolicy{ @@ -186,7 +190,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration residualHitMap: make(map[string]int64), enhancementHandlers: make(util.ResourceEnhancementHandlerMap), extraStateFileAbsPath: conf.ExtraStateFileAbsPath, - name: fmt.Sprintf("%s_%s", agentName, MemoryResourcePluginPolicyNameDynamic), + name: fmt.Sprintf("%s_%s", agentName, memconsts.MemoryResourcePluginPolicyNameDynamic), podDebugAnnoKeys: conf.PodDebugAnnoKeys, asyncWorkers: asyncworker.NewAsyncWorkers(memoryPluginAsyncWorkersName, wrappedEmitter), enableSettingMemoryMigrate: conf.EnableSettingMemoryMigrate, @@ -236,6 +240,10 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration return true, &agent.PluginWrapper{GenericPlugin: pluginWrapper}, nil } +func (p *DynamicPolicy) registerControlKnobHandlerCheckRules() { + general.RegisterReportCheck(memconsts.DropCache, dropCacheGracePeriod) +} + func (p *DynamicPolicy) Start() (err error) { general.Infof("called") @@ -257,13 +265,36 @@ func (p *DynamicPolicy) Start() (err error) { } p.stopCh = make(chan struct{}) + p.registerControlKnobHandlerCheckRules() go wait.Until(func() { _ = p.emitter.StoreInt64(util.MetricNameHeartBeat, 1, metrics.MetricTypeNameRaw) }, time.Second*30, p.stopCh) - go wait.Until(p.clearResidualState, stateCheckPeriod, p.stopCh) - go wait.Until(p.checkMemorySet, memsetCheckPeriod, p.stopCh) - go wait.Until(p.applyExternalCgroupParams, applyCgroupPeriod, p.stopCh) - go wait.Until(p.setExtraControlKnobByConfigs, setExtraControlKnobsPeriod, p.stopCh) + + err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(memconsts.ClearResidualState, + general.HealthzCheckStateNotReady, qrm.QRMMemoryPluginPeriodicalHandlerGroupName, + p.clearResidualState, stateCheckPeriod, healthCheckTolerationTimes) + if err != nil { + general.Errorf("start %v failed, err: %v", memconsts.ClearResidualState, err) + } + + err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(memconsts.CheckMemSet, general.HealthzCheckStateNotReady, + qrm.QRMMemoryPluginPeriodicalHandlerGroupName, p.checkMemorySet, memsetCheckPeriod, healthCheckTolerationTimes) + if err != nil { + general.Errorf("start %v failed, err: %v", memconsts.CheckMemSet, err) + } + + err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(memconsts.ApplyExternalCGParams, general.HealthzCheckStateNotReady, + qrm.QRMMemoryPluginPeriodicalHandlerGroupName, p.applyExternalCgroupParams, applyCgroupPeriod, healthCheckTolerationTimes) + if err != nil { + general.Errorf("start %v failed, err: %v", memconsts.ApplyExternalCGParams, err) + } + + err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(memconsts.SetExtraControlKnob, general.HealthzCheckStateNotReady, + qrm.QRMMemoryPluginPeriodicalHandlerGroupName, p.setExtraControlKnobByConfigs, setExtraControlKnobsPeriod, healthCheckTolerationTimes) + if err != nil { + general.Errorf("start %v failed, err: %v", memconsts.SetExtraControlKnob, err) + } + err = p.asyncWorkers.Start(p.stopCh) if err != nil { general.Errorf("start async worker failed, err: %v", err) @@ -284,8 +315,8 @@ func (p *DynamicPolicy) Start() (err error) { general.Infof("register clearResidualOOMPriority failed, err=%v", err) } - err = periodicalhandler.RegisterPeriodicalHandler(qrm.QRMMemoryPluginPeriodicalHandlerGroupName, - oom.SyncOOMPriorityPriorityPeriodicalHandlerName, p.syncOOMPriority, syncOOMPriorityPeriod) + err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(memconsts.OOMPriority, general.HealthzCheckStateNotReady, + qrm.QRMMemoryPluginPeriodicalHandlerGroupName, p.syncOOMPriority, syncOOMPriorityPeriod, healthCheckTolerationTimes) if err != nil { general.Infof("register syncOOMPriority failed, err=%v", err) } @@ -293,8 +324,9 @@ func (p *DynamicPolicy) Start() (err error) { if p.enableSettingSockMem { general.Infof("setSockMem enabled") - err := periodicalhandler.RegisterPeriodicalHandler(qrm.QRMMemoryPluginPeriodicalHandlerGroupName, - sockmem.EnableSetSockMemPeriodicalHandlerName, sockmem.SetSockMemLimit, 60*time.Second) + err := periodicalhandler.RegisterPeriodicalHandlerWithHealthz(memconsts.SetSockMem, + general.HealthzCheckStateNotReady, qrm.QRMMemoryPluginPeriodicalHandlerGroupName, + sockmem.SetSockMemLimit, 60*time.Second, healthCheckTolerationTimes) if err != nil { general.Infof("setSockMem failed, err=%v", err) } @@ -346,6 +378,8 @@ func (p *DynamicPolicy) Start() (err error) { } } + general.RegisterHeartbeatCheck(memconsts.CommunicateWithAdvisor, 2*time.Minute, general.HealthzCheckStateNotReady, + 2*time.Minute) go wait.BackoffUntil(communicateWithMemoryAdvisorServer, wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 0, &clock.RealClock{}), true, p.stopCh) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go index 957d63124..889247ee3 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go @@ -33,9 +33,9 @@ import ( maputil "k8s.io/kubernetes/pkg/util/maps" apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" - "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate" + memconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/state" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" @@ -105,6 +105,7 @@ func (p *DynamicPolicy) lwMemoryAdvisorServer(stopCh <-chan struct{}) error { if err != nil { general.Errorf("handle ListAndWatch response of MemoryAdvisorServer failed with error: %v", err) } + _ = general.UpdateHealthzStateByError(memconsts.CommunicateWithAdvisor, err) } } @@ -290,6 +291,12 @@ func (p *DynamicPolicy) handleAdvisorDropCache( metaServer *metaserver.MetaServer, entryName, subEntryName string, calculationInfo *advisorsvc.CalculationInfo, podResourceEntries state.PodResourceEntries) error { + var ( + err error + ) + defer func() { + _ = general.UpdateHealthzStateByError(memconsts.DropCache, err) + }() dropCache := calculationInfo.CalculationResult.Values[string(memoryadvisor.ControlKnobKeyDropCache)] dropCacheBool, err := strconv.ParseBool(dropCache) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_async_handler.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_async_handler.go index 32451f65b..faf143534 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_async_handler.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_async_handler.go @@ -24,11 +24,13 @@ import ( "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "github.com/kubewharf/katalyst-api/pkg/consts" apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate" + memconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/oom" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/state" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" @@ -95,8 +97,19 @@ func setExtraControlKnobByConfigForAllocationInfo(allocationInfo *state.Allocati } } -func (p *DynamicPolicy) setExtraControlKnobByConfigs() { +func (p *DynamicPolicy) setExtraControlKnobByConfigs(_ *coreconfig.Configuration, + _ interface{}, + _ *dynamicconfig.DynamicAgentConfiguration, + _ metrics.MetricEmitter, + _ *metaserver.MetaServer) { general.Infof("called") + var ( + err error + podList []*v1.Pod + ) + defer func() { + _ = general.UpdateHealthzStateByError(memconsts.SetExtraControlKnob, err) + }() if p.metaServer == nil { general.Errorf("nil metaServer") @@ -106,7 +119,7 @@ func (p *DynamicPolicy) setExtraControlKnobByConfigs() { return } - podList, err := p.metaServer.GetPodList(context.Background(), nil) + podList, err = p.metaServer.GetPodList(context.Background(), nil) if err != nil { general.Errorf("get pod list failed, err: %v", err) return @@ -139,7 +152,8 @@ func (p *DynamicPolicy) setExtraControlKnobByConfigs() { } } - resourcesMachineState, err := state.GenerateMachineStateFromPodEntries(p.state.GetMachineInfo(), podResourceEntries, p.state.GetReservedMemory()) + var resourcesMachineState state.NUMANodeResourcesMap + resourcesMachineState, err = state.GenerateMachineStateFromPodEntries(p.state.GetMachineInfo(), podResourceEntries, p.state.GetReservedMemory()) if err != nil { general.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err) return @@ -149,9 +163,18 @@ func (p *DynamicPolicy) setExtraControlKnobByConfigs() { p.state.SetMachineState(resourcesMachineState) } -func (p *DynamicPolicy) applyExternalCgroupParams() { +func (p *DynamicPolicy) applyExternalCgroupParams(_ *coreconfig.Configuration, + _ interface{}, + _ *dynamicconfig.DynamicAgentConfiguration, + _ metrics.MetricEmitter, + _ *metaserver.MetaServer) { general.Infof("called") + var err error + defer func() { + _ = general.UpdateHealthzStateByError(memconsts.ApplyExternalCGParams, err) + }() + podEntries := p.state.GetPodResourceEntries()[v1.ResourceMemory] for podUID, containerEntries := range podEntries { @@ -160,7 +183,8 @@ func (p *DynamicPolicy) applyExternalCgroupParams() { continue } - containerID, err := p.metaServer.GetContainerID(podUID, containerName) + var containerID string + containerID, err = p.metaServer.GetContainerID(podUID, containerName) if err != nil { general.Errorf("get container id of pod: %s/%s container: %s failed with error: %v", allocationInfo.PodNamespace, allocationInfo.PodName, @@ -196,7 +220,7 @@ func (p *DynamicPolicy) applyExternalCgroupParams() { "cgroupSubsysName", entry.CgroupSubsysName, "cgroupIfaceName", cgroupIfaceName) - err := cgroupmgr.ApplyUnifiedDataForContainer(podUID, containerID, entry.CgroupSubsysName, cgroupIfaceName, entry.ControlKnobValue) + err = cgroupmgr.ApplyUnifiedDataForContainer(podUID, containerID, entry.CgroupSubsysName, cgroupIfaceName, entry.ControlKnobValue) if err != nil { general.ErrorS(err, "ApplyUnifiedDataForContainer failed", @@ -214,8 +238,29 @@ func (p *DynamicPolicy) applyExternalCgroupParams() { } // checkMemorySet emit errors if the memory allocation falls into unexpected results -func (p *DynamicPolicy) checkMemorySet() { +func (p *DynamicPolicy) checkMemorySet(_ *coreconfig.Configuration, + _ interface{}, + _ *dynamicconfig.DynamicAgentConfiguration, + _ metrics.MetricEmitter, + _ *metaserver.MetaServer) { general.Infof("called") + var ( + err error + invalidMemSet = false + memorySetOverlap = false + ) + + defer func() { + if err != nil { + _ = general.UpdateHealthzStateByError(memconsts.CheckMemSet, err) + } else if invalidMemSet { + _ = general.UpdateHealthzState(memconsts.CheckMemSet, general.HealthzCheckStateNotReady, "invalid mem set exists") + } else if memorySetOverlap { + _ = general.UpdateHealthzState(memconsts.CheckMemSet, general.HealthzCheckStateNotReady, "mem set overlap") + } else { + _ = general.UpdateHealthzState(memconsts.CheckMemSet, general.HealthzCheckStateReady, "") + } + }() podEntries := p.state.GetPodResourceEntries()[v1.ResourceMemory] actualMemorySets := make(map[string]map[string]machine.CPUSet) @@ -240,13 +285,17 @@ func (p *DynamicPolicy) checkMemorySet() { "containerName": allocationInfo.ContainerName, }) - containerID, err := p.metaServer.GetContainerID(podUID, containerName) + var ( + containerID string + cpusetStats *common.CPUSetStats + ) + containerID, err = p.metaServer.GetContainerID(podUID, containerName) if err != nil { general.Errorf("get container id of pod: %s container: %s failed with error: %v", podUID, containerName, err) continue } - cpusetStats, err := cgroupmgr.GetCPUSetForContainer(podUID, containerID) + cpusetStats, err = cgroupmgr.GetCPUSetForContainer(podUID, containerID) if err != nil { general.Errorf("GetMemorySet of pod: %s container: name(%s), id(%s) failed with error: %v", podUID, containerName, containerID, err) @@ -269,6 +318,7 @@ func (p *DynamicPolicy) checkMemorySet() { } if !actualMemorySets[podUID][containerName].Equals(allocationInfo.NumaAllocationResult) { + invalidMemSet = true general.Errorf("pod: %s/%s, container: %s, memset invalid", allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName) _ = p.emitter.StoreInt64(util.MetricNameMemSetInvalid, 1, metrics.MetricTypeNameRaw, tags...) @@ -280,7 +330,6 @@ func (p *DynamicPolicy) checkMemorySet() { unionDedicatedActualMemorySet := machine.NewCPUSet() unionSharedActualMemorySet := machine.NewCPUSet() - var memorySetOverlap bool for podUID, containerEntries := range actualMemorySets { for containerName, cset := range containerEntries { allocationInfo := podEntries[podUID][containerName] @@ -333,12 +382,24 @@ func (p *DynamicPolicy) checkMemorySet() { } // clearResidualState is used to clean residual pods in local state -func (p *DynamicPolicy) clearResidualState() { +func (p *DynamicPolicy) clearResidualState(_ *coreconfig.Configuration, + _ interface{}, + _ *dynamicconfig.DynamicAgentConfiguration, + _ metrics.MetricEmitter, + _ *metaserver.MetaServer) { general.Infof("called") + var ( + err error + podList []*v1.Pod + ) residualSet := make(map[string]bool) + defer func() { + _ = general.UpdateHealthzStateByError(memconsts.ClearResidualState, err) + }() + ctx := context.Background() - podList, err := p.metaServer.GetPodList(ctx, nil) + podList, err = p.metaServer.GetPodList(ctx, nil) if err != nil { general.Infof("get pod list failed: %v", err) return @@ -418,6 +479,7 @@ func (p *DynamicPolicy) clearResidualState() { func (p *DynamicPolicy) setMemoryMigrate() { general.Infof("called") p.RLock() + // TODO update healthz check podResourceEntries := p.state.GetPodResourceEntries() podEntries := podResourceEntries[v1.ResourceMemory] @@ -581,6 +643,15 @@ func (p *DynamicPolicy) clearResidualOOMPriority(conf *coreconfig.Configuration, func (p *DynamicPolicy) syncOOMPriority(conf *coreconfig.Configuration, _ interface{}, _ *dynamicconfig.DynamicAgentConfiguration, emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer) { + var ( + updateBPFMapErr []error + err error + ) + + defer func() { + _ = general.UpdateHealthzStateByError(memconsts.OOMPriority, errors.NewAggregate(append(updateBPFMapErr, err))) + }() + if p.oomPriorityMap == nil { general.Infof("oom priority bpf has not been initialized yet") return @@ -593,7 +664,8 @@ func (p *DynamicPolicy) syncOOMPriority(conf *coreconfig.Configuration, } ctx := context.Background() - podList, err := metaServer.GetPodList(ctx, native.PodIsActive) + var podList []*v1.Pod + podList, err = metaServer.GetPodList(ctx, native.PodIsActive) if err != nil { general.Infof("get pod list failed: %v", err) return @@ -665,6 +737,7 @@ func (p *DynamicPolicy) syncOOMPriority(conf *coreconfig.Configuration, err := p.oomPriorityMap.Put(cgID, int64(oomPriority)) if err != nil { general.Errorf("update oom pinned map failed: %v", err) + updateBPFMapErr = append(updateBPFMapErr, err) _ = emitter.StoreInt64(util.MetricNameMemoryOOMPriorityUpdateFailed, 1, metrics.MetricTypeNameRaw, metrics.ConvertMapToTags(map[string]string{ "cg_id": strconv.FormatUint(cgID, 10), @@ -702,7 +775,8 @@ func (p *DynamicPolicy) syncOOMPriority(conf *coreconfig.Configuration, } } - resourcesMachineState, err := state.GenerateMachineStateFromPodEntries(p.state.GetMachineInfo(), podResourceEntries, p.state.GetReservedMemory()) + var resourcesMachineState state.NUMANodeResourcesMap + resourcesMachineState, err = state.GenerateMachineStateFromPodEntries(p.state.GetMachineInfo(), podResourceEntries, p.state.GetReservedMemory()) if err != nil { general.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err) return diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go index f563a8cc2..24c305288 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go @@ -52,6 +52,7 @@ import ( appagent "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate" + memconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/oom" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/state" @@ -118,7 +119,7 @@ func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, machi }) stateImpl, err := state.NewCheckpointState(stateFileDirectory, memoryPluginStateFileName, - MemoryResourcePluginPolicyNameDynamic, topology, machineInfo, resourcesReservedMemory, false) + memconsts.MemoryResourcePluginPolicyNameDynamic, topology, machineInfo, resourcesReservedMemory, false) if err != nil { return nil, fmt.Errorf("NewCheckpointState failed with error: %v", err) } @@ -199,7 +200,7 @@ func TestCheckMemorySet(t *testing.T) { PodFetcher: &pod.PodFetcherStub{}, }, } - dynamicPolicy.checkMemorySet() + dynamicPolicy.checkMemorySet(nil, nil, nil, nil, nil) } func TestClearResidualState(t *testing.T) { @@ -224,7 +225,7 @@ func TestClearResidualState(t *testing.T) { PodFetcher: &pod.PodFetcherStub{}, }, } - dynamicPolicy.clearResidualState() + dynamicPolicy.clearResidualState(nil, nil, nil, nil, nil) } func TestSetMemoryMigrate(t *testing.T) { @@ -2491,7 +2492,7 @@ func TestSetExtraControlKnobByConfigs(t *testing.T) { dynamicPolicy.state.SetPodResourceEntries(podResourceEntries) dynamicPolicy.state.SetMachineState(resourcesMachineState) - dynamicPolicy.setExtraControlKnobByConfigs() + dynamicPolicy.setExtraControlKnobByConfigs(nil, nil, nil, nil, nil) expectedAllocationInfo := &state.AllocationInfo{ PodUid: pod1UID, @@ -2576,7 +2577,7 @@ func TestNewAndStartDynamicPolicy(t *testing.T) { StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ QRMPluginsConfiguration: &qrmconfig.QRMPluginsConfiguration{ MemoryQRMPluginConfig: &qrmconfig.MemoryQRMPluginConfig{ - PolicyName: MemoryResourcePluginPolicyNameDynamic, + PolicyName: memconsts.MemoryResourcePluginPolicyNameDynamic, ReservedMemoryGB: 4, SkipMemoryStateCorruption: true, EnableSettingMemoryMigrate: false, diff --git a/pkg/agent/qrm-plugins/memory/handlers/sockmem/const.go b/pkg/agent/qrm-plugins/memory/handlers/sockmem/const.go index 677946982..4291b74ad 100644 --- a/pkg/agent/qrm-plugins/memory/handlers/sockmem/const.go +++ b/pkg/agent/qrm-plugins/memory/handlers/sockmem/const.go @@ -16,8 +16,6 @@ limitations under the License. package sockmem -const EnableSetSockMemPeriodicalHandlerName = "SetSockMem" - const ( // Constants for global tcpmem ratio globalTCPMemRatioMin float64 = 20.0 // min ratio for host tcp mem: 20% diff --git a/pkg/agent/qrm-plugins/memory/handlers/sockmem/sockmem_linux.go b/pkg/agent/qrm-plugins/memory/handlers/sockmem/sockmem_linux.go index 577f1657a..b230c928a 100644 --- a/pkg/agent/qrm-plugins/memory/handlers/sockmem/sockmem_linux.go +++ b/pkg/agent/qrm-plugins/memory/handlers/sockmem/sockmem_linux.go @@ -23,7 +23,9 @@ import ( "context" "golang.org/x/sys/unix" + "k8s.io/apimachinery/pkg/util/errors" + memconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/consts" coreconfig "github.com/kubewharf/katalyst-core/pkg/config" dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" coreconsts "github.com/kubewharf/katalyst-core/pkg/consts" @@ -100,6 +102,13 @@ func SetSockMemLimit(conf *coreconfig.Configuration, emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer) { general.Infof("called") + var ( + errList []error + ) + defer func() { + _ = general.UpdateHealthzStateByError(memconsts.SetSockMem, errors.NewAggregate(errList)) + }() + if conf == nil { general.Errorf("nil extraConf") return @@ -157,6 +166,7 @@ func SetSockMemLimit(conf *coreconfig.Configuration, podList, err := metaServer.GetPodList(context.Background(), native.PodIsActive) if err != nil { + errList = append(errList, err) general.Errorf("get pod list failed, err: %v", err) return } @@ -171,17 +181,22 @@ func SetSockMemLimit(conf *coreconfig.Configuration, memLimit, err := helper.GetPodMetric(metaServer.MetricsFetcher, emitter, pod, coreconsts.MetricMemLimitContainer, -1) if err != nil { + errList = append(errList, err) general.Infof("memory limit not found:%v..\n", podUID) continue } memTCPLimit, err := helper.GetPodMetric(metaServer.MetricsFetcher, emitter, pod, coreconsts.MetricMemTCPLimitContainer, -1) if err != nil { + errList = append(errList, err) general.Infof("memory tcp.limit not found:%v..\n", podUID) continue } - _ = setCg1TCPMem(emitter, podUID, containerID, int64(memLimit), int64(memTCPLimit), &sockMemConfig) + err = setCg1TCPMem(emitter, podUID, containerID, int64(memLimit), int64(memTCPLimit), &sockMemConfig) + if err != nil { + errList = append(errList, err) + } } } } diff --git a/pkg/agent/qrm-plugins/memory/memory.go b/pkg/agent/qrm-plugins/memory/memory.go index f70cf2af9..4cbc47a22 100644 --- a/pkg/agent/qrm-plugins/memory/memory.go +++ b/pkg/agent/qrm-plugins/memory/memory.go @@ -18,9 +18,10 @@ package memory import ( "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent/qrm" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy" ) func init() { - qrm.RegisterMemoryPolicyInitializer(dynamicpolicy.MemoryResourcePluginPolicyNameDynamic, dynamicpolicy.NewDynamicPolicy) + qrm.RegisterMemoryPolicyInitializer(consts.MemoryResourcePluginPolicyNameDynamic, dynamicpolicy.NewDynamicPolicy) } diff --git a/pkg/agent/resourcemanager/fetcher/manager.go b/pkg/agent/resourcemanager/fetcher/manager.go index 4ed7b657a..61343fcf8 100644 --- a/pkg/agent/resourcemanager/fetcher/manager.go +++ b/pkg/agent/resourcemanager/fetcher/manager.go @@ -52,6 +52,8 @@ const reporterManagerCheckpoint = "reporter_manager_checkpoint" const healthzNameReporterFetcherReady = "ReporterFetcherReady" +const healthzGracePeriodMultiplier = 3 + // ReporterPluginManager is used to manage in-tree or out-tree reporter plugin registrations and // get report content from these plugins to aggregate them into the Reporter Manager type ReporterPluginManager struct { @@ -217,7 +219,9 @@ func (m *ReporterPluginManager) Run(ctx context.Context) { klog.Infof("reporter plugin manager started") m.reporter.Run(ctx) - general.RegisterHealthzCheckRules(healthzNameReporterFetcherReady, m.healthz) + general.RegisterHeartbeatCheck(healthzNameReporterFetcherReady, m.reconcilePeriod*healthzGracePeriodMultiplier, + general.HealthzCheckStateReady, m.reconcilePeriod*healthzGracePeriodMultiplier) + go wait.UntilWithContext(ctx, m.healthz, m.reconcilePeriod) } func (m *ReporterPluginManager) isVersionCompatibleWithPlugin(versions []string) bool { diff --git a/pkg/agent/resourcemanager/fetcher/manager_healthz.go b/pkg/agent/resourcemanager/fetcher/manager_healthz.go index 47bafdd1d..a0aa943ea 100644 --- a/pkg/agent/resourcemanager/fetcher/manager_healthz.go +++ b/pkg/agent/resourcemanager/fetcher/manager_healthz.go @@ -17,6 +17,7 @@ limitations under the License. package fetcher import ( + "context" "fmt" "strings" "time" @@ -48,10 +49,7 @@ func (m *ReporterPluginManager) healthzSyncLoop() { // // during starting period, the results may last unhealthy for a while, and // the caller functions should handle this situation. -func (m *ReporterPluginManager) healthz() (general.HealthzCheckResponse, error) { - response := general.HealthzCheckResponse{ - State: general.HealthzCheckStateReady, - } +func (m *ReporterPluginManager) healthz(_ context.Context) { now := time.Now() var unHealthy []string @@ -63,9 +61,9 @@ func (m *ReporterPluginManager) healthz() (general.HealthzCheckResponse, error) } if len(unHealthy) != 0 { - response.State = general.HealthzCheckStateNotReady - response.Message = fmt.Sprintf("the following checks timeout: %s", strings.Join(unHealthy, ",")) + _ = general.UpdateHealthzState(healthzNameReporterFetcherReady, general.HealthzCheckStateNotReady, + fmt.Sprintf("the following checks timeout: %s", strings.Join(unHealthy, ","))) + } else { + _ = general.UpdateHealthzState(healthzNameReporterFetcherReady, general.HealthzCheckStateReady, "") } - - return response, nil } diff --git a/pkg/agent/resourcemanager/fetcher/manager_test.go b/pkg/agent/resourcemanager/fetcher/manager_test.go index 276256375..fec141e39 100644 --- a/pkg/agent/resourcemanager/fetcher/manager_test.go +++ b/pkg/agent/resourcemanager/fetcher/manager_test.go @@ -197,10 +197,10 @@ func TestHealthz(t *testing.T) { t.Fatalf("timeout while waiting for manager update") } - results := general.CheckHealthz() + results := general.GetRegisterReadinessCheckResult() for name, response := range results { - if reporterFetcherRules.Has(string(name)) { - require.Equal(t, response.State, general.HealthzCheckStateReady) + if healthzNameReporterFetcherReady == string(name) { + require.True(t, response.Ready) } } diff --git a/pkg/agent/utilcomponent/periodicalhandler/periodical_handler.go b/pkg/agent/utilcomponent/periodicalhandler/periodical_handler.go index cd984288a..1cfc91fff 100644 --- a/pkg/agent/utilcomponent/periodicalhandler/periodical_handler.go +++ b/pkg/agent/utilcomponent/periodicalhandler/periodical_handler.go @@ -116,6 +116,12 @@ func (phm *PeriodicalHandlerManager) Run(ctx context.Context) { var handlerCtxs = make(map[string]map[string]*HandlerCtx) var handlerMtx sync.Mutex +func RegisterPeriodicalHandlerWithHealthz(handlerName string, initState general.HealthzCheckState, groupName string, + handler Handler, interval time.Duration, tolerationTimes int64) (err error) { + general.RegisterHeartbeatCheck(handlerName, time.Duration(tolerationTimes)*interval, initState, time.Duration(tolerationTimes)*interval) + return RegisterPeriodicalHandler(groupName, handlerName, handler, interval) +} + func RegisterPeriodicalHandler(groupName, handlerName string, handler Handler, interval time.Duration) (err error) { if groupName == "" || handlerName == "" { return fmt.Errorf("emptry groupName: %s or handlerName: %s", groupName, handlerName) diff --git a/pkg/config/generic/generic_base.go b/pkg/config/generic/generic_base.go index f811071d1..e301687ac 100644 --- a/pkg/config/generic/generic_base.go +++ b/pkg/config/generic/generic_base.go @@ -25,6 +25,8 @@ import ( type GenericConfiguration struct { DryRun bool + EnableHealthzCheck bool + // for some cases, we may need to enable the ability of transformed informer TransformedInformerForPod bool diff --git a/pkg/util/general/healthz.go b/pkg/util/general/healthz.go index 39f94db5e..67d7f457a 100644 --- a/pkg/util/general/healthz.go +++ b/pkg/util/general/healthz.go @@ -19,11 +19,11 @@ package general import ( "fmt" "sync" - - "k8s.io/klog/v2" + "time" ) -var healthzCheckRules sync.Map +var healthzCheckMap = make(map[HealthzCheckName]*healthzCheckStatus) +var healthzCheckLock sync.RWMutex // HealthzCheckName describes which rule name for this check type HealthzCheckName string @@ -31,9 +31,49 @@ type HealthzCheckName string // HealthzCheckState describes the checking results type HealthzCheckState string -type HealthzCheckResponse struct { - State HealthzCheckState `json:"state"` - Message string `json:"message"` +type HealthzCheckMode string + +type HealthzCheckResult struct { + Ready bool `json:"ready"` + Message string `json:"message"` +} + +type healthzCheckStatus struct { + State HealthzCheckState `json:"state"` + Message string `json:"message"` + LastUpdateTime time.Time `json:"lastUpdateTime"` + + Mode HealthzCheckMode `json:"mode"` + + // in HealthzCheckModeHeartBeat mode, when LastUpdateTime is not updated for more than TimeoutPeriod, we consider this rule is failed. + // 0 or negative value means no need to check the LastUpdateTime. + TimeoutPeriod time.Duration `json:"timeoutPeriod"` + UnhealthyStartTime time.Time `json:"unhealthyStartTime"` + // in HealthzCheckModeHeartBeat mode, when current State is not HealthzCheckStateReady, and it lasts more than + // TolerationPeriod, we consider this rule is failed. 0 or negative value means no need to check the UnhealthyStartTime. + TolerationPeriod time.Duration `json:"gracePeriod"` + + LatestUnhealthyTime time.Time `json:"latestUnhealthyTime"` + // in HealthzCheckModeReport mode, when LatestUnhealthyTime is not earlier than AutoRecoverPeriod ago, we consider this rule + // is failed. + AutoRecoverPeriod time.Duration `json:"autoRecoverPeriod"` + mutex sync.RWMutex +} + +func (h *healthzCheckStatus) update(state HealthzCheckState, message string) { + h.mutex.Lock() + defer h.mutex.Unlock() + + now := time.Now() + h.Message = message + h.LastUpdateTime = now + if h.State == HealthzCheckStateReady && state != HealthzCheckStateReady { + h.UnhealthyStartTime = now + } + if state != HealthzCheckStateReady { + h.LatestUnhealthyTime = now + } + h.State = state } const ( @@ -41,49 +81,106 @@ const ( HealthzCheckStateNotReady HealthzCheckState = "NotReady" HealthzCheckStateUnknown HealthzCheckState = "Unknown" HealthzCheckStateFailed HealthzCheckState = "Failed" + + InitMessage = "Init" + + // HealthzCheckModeHeartBeat in this mode, caller should update the check status regularly like a heartbeat, once + // the heartbeat stops for more than TimeoutPeriod or the state is not HealthzCheckStateReady for more than GracePeriod, + // this rule will be considered as unhealthy. + HealthzCheckModeHeartBeat HealthzCheckMode = "heartbeat" + // HealthzCheckModeReport in this mode, caller only reports the failed state when the function does not work well. + // when the LatestUnhealthyTime is not earlier than the GracePeriod ago, we consider this rule as unhealthy. + // if caller doesn't report new failed state for more than GracePeriod, we consider the exception recovered. + HealthzCheckModeReport HealthzCheckMode = "report" ) // HealthzCheckFunc defined as a common function to define whether the corresponding component is healthy. -type HealthzCheckFunc func() (HealthzCheckResponse, error) +type HealthzCheckFunc func() (healthzCheckStatus, error) + +func RegisterHeartbeatCheck(name string, timeout time.Duration, initState HealthzCheckState, tolerationPeriod time.Duration) { + healthzCheckLock.Lock() + defer healthzCheckLock.Unlock() + + healthzCheckMap[HealthzCheckName(name)] = &healthzCheckStatus{ + State: initState, + Message: InitMessage, + LastUpdateTime: time.Now(), + TimeoutPeriod: timeout, + TolerationPeriod: tolerationPeriod, + Mode: HealthzCheckModeHeartBeat, + } +} + +func RegisterReportCheck(name string, autoRecoverPeriod time.Duration) { + healthzCheckLock.Lock() + defer healthzCheckLock.Unlock() + + healthzCheckMap[HealthzCheckName(name)] = &healthzCheckStatus{ + State: HealthzCheckStateReady, + Message: InitMessage, + AutoRecoverPeriod: autoRecoverPeriod, + Mode: HealthzCheckModeReport, + } +} -// RegisterHealthzCheckRules supports to register healthz check functions. -func RegisterHealthzCheckRules(name HealthzCheckName, f HealthzCheckFunc) { - healthzCheckRules.Store(name, f) +func UpdateHealthzStateByError(name string, err error) error { + if err != nil { + return UpdateHealthzState(name, HealthzCheckStateNotReady, err.Error()) + } else { + return UpdateHealthzState(name, HealthzCheckStateReady, "") + } } -func getRegisterReadinessCheckRules() map[HealthzCheckName]HealthzCheckFunc { - rules := make(map[HealthzCheckName]HealthzCheckFunc) - healthzCheckRules.Range(func(key, value interface{}) bool { - rules[key.(HealthzCheckName)] = value.(HealthzCheckFunc) - return true - }) - return rules +func UpdateHealthzState(name string, state HealthzCheckState, message string) error { + healthzCheckLock.RLock() + defer healthzCheckLock.RUnlock() + + status, ok := healthzCheckMap[HealthzCheckName(name)] + if !ok { + Errorf("check rule %v not found", name) + return fmt.Errorf("check rule %v not found", name) + } + status.update(state, message) + return nil } -// CheckHealthz walks through the registered healthz functions to provide an insight about -// the running states of current process. -// if functions failed, returns HealthzCheckStateFailed as the returned state. -func CheckHealthz() map[HealthzCheckName]HealthzCheckResponse { - rules := getRegisterReadinessCheckRules() - results := make(map[HealthzCheckName]HealthzCheckResponse) - - wg := sync.WaitGroup{} - wg.Add(1) - for name := range rules { - ruleName := name - go func() { - defer wg.Done() - response, err := rules[ruleName]() - if err != nil { - message := fmt.Sprintf("failed to perform healthz check for %v: %v", ruleName, err) - klog.Errorf(message) - - response.State = HealthzCheckStateFailed - response.Message = message +func GetRegisterReadinessCheckResult() map[HealthzCheckName]HealthzCheckResult { + healthzCheckLock.RLock() + defer healthzCheckLock.RUnlock() + + results := make(map[HealthzCheckName]HealthzCheckResult) + for name, checkStatus := range healthzCheckMap { + func() { + checkStatus.mutex.RLock() + defer checkStatus.mutex.RUnlock() + + ready := true + message := checkStatus.Message + switch checkStatus.Mode { + case HealthzCheckModeHeartBeat: + if checkStatus.TimeoutPeriod > 0 && time.Now().Sub(checkStatus.LastUpdateTime) > checkStatus.TimeoutPeriod { + ready = false + message = fmt.Sprintf("the status has not been updated for more than %v, last update time is %v", checkStatus.TimeoutPeriod, checkStatus.LastUpdateTime) + } + + if checkStatus.TolerationPeriod <= 0 && checkStatus.State != HealthzCheckStateReady { + ready = false + } + + if checkStatus.TolerationPeriod > 0 && time.Now().Sub(checkStatus.UnhealthyStartTime) > checkStatus.TolerationPeriod && + checkStatus.State != HealthzCheckStateReady { + ready = false + } + case HealthzCheckModeReport: + if checkStatus.LatestUnhealthyTime.After(time.Now().Add(-checkStatus.TolerationPeriod)) { + ready = false + } + } + results[name] = HealthzCheckResult{ + Ready: ready, + Message: message, } - results[ruleName] = response }() } - wg.Wait() return results }