diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go index 2d4680bf81..ae6e17d9fb 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go @@ -58,12 +58,14 @@ import ( const ( MemoryResourcePluginPolicyNameDynamic = string(apiconsts.ResourcePluginPolicyNameDynamic) - memoryPluginStateFileName = "memory_plugin_state" - memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers" - memoryPluginAsyncWorkTopicDropCache = "qrm_memory_plugin_drop_cache" - memoryPluginAsyncWorkTopicMovePage = "qrm_memory_plugin_move_page" - - dropCacheTimeoutSeconds = 30 + memoryPluginStateFileName = "memory_plugin_state" + memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers" + memoryPluginAsyncWorkTopicDropCache = "qrm_memory_plugin_drop_cache" + memoryPluginAsyncWorkTopicSetExtraCGMemLimit = "qrm_memory_plugin_set_extra_mem_limit" + memoryPluginAsyncWorkTopicMovePage = "qrm_memory_plugin_move_page" + + dropCacheTimeoutSeconds = 30 + setExtraCGMemLimitTimeoutSeconds = 60 ) const ( @@ -229,7 +231,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration } memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyMemoryLimitInBytes, - memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorMemoryLimitInBytes)) + memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryLimitInBytes)) memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyCPUSetMems, memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorCPUSetMems)) memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyDropCache, diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go index 889247ee31..c1e81b1fa4 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go @@ -229,7 +229,7 @@ func (p *DynamicPolicy) handleAdvisorResp(advisorResp *advisorsvc.ListAndWatchRe return nil } -func handleAdvisorMemoryLimitInBytes( +func (p *DynamicPolicy) handleAdvisorMemoryLimitInBytes( _ *config.Configuration, _ interface{}, _ *dynamicconfig.DynamicAgentConfiguration, @@ -245,13 +245,16 @@ func handleAdvisorMemoryLimitInBytes( } if calculationInfo.CgroupPath != "" { - err = cgroupmgr.ApplyMemoryWithRelativePath(calculationInfo.CgroupPath, &cgroupcommon.MemoryData{ - LimitInBytes: calculatedLimitInBytesInt64, - }) + setExtraCGMemLimitWorkName := util.GetAsyncWorkNameByPrefix(calculationInfo.CgroupPath, memoryPluginAsyncWorkTopicSetExtraCGMemLimit) + err = p.asyncWorkers.AddWork(setExtraCGMemLimitWorkName, + &asyncworker.Work{ + Fn: cgroupmgr.SetExtraCGMemLimitWithTimeoutAndRelCGPath, + Params: []interface{}{calculationInfo.CgroupPath, setExtraCGMemLimitTimeoutSeconds, calculatedLimitInBytesInt64}, + DeliveredAt: time.Now()}, asyncworker.DuplicateWorkPolicyOverride) + if err != nil { - return fmt.Errorf("apply %s: %d to cgroup: %s failed with error: %v", - memoryadvisor.ControlKnobKeyMemoryLimitInBytes, calculatedLimitInBytesInt64, - calculationInfo.CgroupPath, err) + return fmt.Errorf("add work: %s pod: %s container: %s failed with error: %v", + setExtraCGMemLimitWorkName, entryName, subEntryName, err) } _ = emitter.StoreInt64(util.MetricNameMemoryHandleAdvisorMemoryLimit, calculatedLimitInBytesInt64, diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go index 913a8ebd98..f16c790c10 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go @@ -146,6 +146,8 @@ func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, machi consts.PodAnnotationQoSLevelReclaimedCores: policyImplement.reclaimedCoresHintHandler, } + policyImplement.asyncWorkers = asyncworker.NewAsyncWorkers(memoryPluginAsyncWorkersName, policyImplement.emitter) + return policyImplement, nil } @@ -2115,7 +2117,7 @@ func TestHandleAdvisorResp(t *testing.T) { } memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyMemoryLimitInBytes, - memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorMemoryLimitInBytes)) + memoryadvisor.ControlKnobHandlerWithChecker(dynamicPolicy.handleAdvisorMemoryLimitInBytes)) memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyCPUSetMems, memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorCPUSetMems)) memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyDropCache, diff --git a/pkg/agent/qrm-plugins/util/util.go b/pkg/agent/qrm-plugins/util/util.go index f1ecf53289..24ed68c299 100644 --- a/pkg/agent/qrm-plugins/util/util.go +++ b/pkg/agent/qrm-plugins/util/util.go @@ -315,6 +315,10 @@ func GetContainerAsyncWorkName(podUID, containerName, topic string) string { return strings.Join([]string{podUID, containerName, topic}, asyncworker.WorkNameSeperator) } +func GetAsyncWorkNameByPrefix(prefix, topic string) string { + return strings.Join([]string{prefix, topic}, asyncworker.WorkNameSeperator) +} + func GetKubeletReservedQuantity(resourceName string, klConfig *kubeletconfigv1beta1.KubeletConfiguration) (resource.Quantity, bool, error) { if klConfig == nil { return resource.MustParse("0"), false, fmt.Errorf("nil klConfig") diff --git a/pkg/util/asyncworker/async_workers.go b/pkg/util/asyncworker/async_workers.go index f0df4ee25e..a38c327712 100644 --- a/pkg/util/asyncworker/async_workers.go +++ b/pkg/util/asyncworker/async_workers.go @@ -59,7 +59,7 @@ func (aws *AsyncWorkers) AddWork(workName string, work *Work, policy DuplicateWo "AsyncWorkers", aws.name, "workName", workName) status = &workStatus{} aws.workStatuses[workName] = status - } else if policy == DuplicateWorkPolicyDiscard { + } else if status.IsWorking() && policy == DuplicateWorkPolicyDiscard { general.InfoS("work %v already exists, discard new work", workName) return nil } @@ -267,8 +267,8 @@ func (aws *AsyncWorkers) WorkExists(workName string) bool { aws.workLock.Lock() defer aws.workLock.Unlock() - _, hasRunningWork := aws.workStatuses[workName] - if hasRunningWork { + status, hasRunningWork := aws.workStatuses[workName] + if hasRunningWork && status.IsWorking() { return true } diff --git a/pkg/util/cgroup/manager/cgroup.go b/pkg/util/cgroup/manager/cgroup.go index fd401f4982..d05c8e7b89 100644 --- a/pkg/util/cgroup/manager/cgroup.go +++ b/pkg/util/cgroup/manager/cgroup.go @@ -257,12 +257,12 @@ func GetCPUSetForContainer(podUID, containerId string) (*common.CPUSetStats, err } func DropCacheWithTimeoutForContainer(ctx context.Context, podUID, containerId string, timeoutSecs int, nbytes int64) error { - cpusetAbsCGPath, err := common.GetContainerAbsCgroupPath(common.CgroupSubsysMemory, podUID, containerId) + memoryAbsCGPath, err := common.GetContainerAbsCgroupPath(common.CgroupSubsysMemory, podUID, containerId) if err != nil { return fmt.Errorf("GetContainerAbsCgroupPath failed with error: %v", err) } - err = DropCacheWithTimeoutWithRelativePath(timeoutSecs, cpusetAbsCGPath, nbytes) + err = DropCacheWithTimeoutAndAbsCGPath(timeoutSecs, memoryAbsCGPath, nbytes) _ = asyncworker.EmitAsyncedMetrics(ctx, metrics.ConvertMapToTags(map[string]string{ "podUID": podUID, "containerID": containerId, @@ -271,13 +271,13 @@ func DropCacheWithTimeoutForContainer(ctx context.Context, podUID, containerId s return err } -func DropCacheWithTimeoutWithRelativePath(timeoutSecs int, absCgroupPath string, nbytes int64) error { +func DropCacheWithTimeoutAndAbsCGPath(timeoutSecs int, absCgroupPath string, nbytes int64) error { startTime := time.Now() var cmd string if common.CheckCgroup2UnifiedMode() { if nbytes == 0 { - general.Infof("[DropCacheWithTimeoutWithRelativePath] skip drop cache on %s since nbytes is zero", absCgroupPath) + general.Infof("[DropCacheWithTimeoutAndAbsCGPath] skip drop cache on %s since nbytes is zero", absCgroupPath) return nil } //cgv2 @@ -290,7 +290,54 @@ func DropCacheWithTimeoutWithRelativePath(timeoutSecs int, absCgroupPath string, _, err := exec.Command("bash", "-c", cmd).Output() delta := time.Since(startTime).Seconds() - general.Infof("[DropCacheWithTimeoutWithRelativePath] it takes %v to do \"%s\" on cgroup: %s", delta, cmd, absCgroupPath) + general.Infof("[DropCacheWithTimeoutAndAbsCGPath] it takes %v to do \"%s\" on cgroup: %s", delta, cmd, absCgroupPath) + + // if this command timeout, a none-nil error will be returned, + // but we should return error iff error returns without timeout + if err != nil && int(delta) < timeoutSecs { + return err + } + + return nil +} + +func SetExtraCGMemLimitWithTimeoutAndRelCGPath(ctx context.Context, relCgroupPath string, timeoutSecs int, nbytes int64) error { + memoryAbsCGPath := common.GetAbsCgroupPath(common.CgroupSubsysMemory, relCgroupPath) + + err := SetExtraCGMemLimitWithTimeoutAndAbsCGPath(timeoutSecs, memoryAbsCGPath, nbytes) + _ = asyncworker.EmitAsyncedMetrics(ctx, metrics.ConvertMapToTags(map[string]string{ + "relCgroupPath": relCgroupPath, + "succeeded": fmt.Sprintf("%v", err == nil), + })...) + return err +} + +func SetExtraCGMemLimitWithTimeoutAndAbsCGPath(timeoutSecs int, absCgroupPath string, nbytes int64) error { + if nbytes == 0 { + return fmt.Errorf("invalid memory limit nbytes: %d", nbytes) + } + + startTime := time.Now() + + var interfacePath string + if common.CheckCgroup2UnifiedMode() { + if nbytes == 0 { + general.Infof("[SetExtraCGMemLimitWithTimeoutAndAbsCGPath] skip drop cache on %s since nbytes is zero", absCgroupPath) + return nil + } + //cgv2 + interfacePath = filepath.Join(absCgroupPath, "memory.max") + } else { + //cgv1 + interfacePath = filepath.Join(absCgroupPath, "memory.limit_in_bytes") + } + + cmd := fmt.Sprintf("timeout %d echo %d > %s", timeoutSecs, nbytes, interfacePath) + + _, err := exec.Command("bash", "-c", cmd).Output() + + delta := time.Since(startTime).Seconds() + general.Infof("[SetExtraCGMemLimitWithTimeoutAndAbsCGPath] it takes %v to do \"%s\" on cgroup: %s", delta, cmd, absCgroupPath) // if this command timeout, a none-nil error will be returned, // but we should return error iff error returns without timeout diff --git a/pkg/util/cgroup/manager/cgroup_test.go b/pkg/util/cgroup/manager/cgroup_test.go index 188691a8b7..409a1fa531 100644 --- a/pkg/util/cgroup/manager/cgroup_test.go +++ b/pkg/util/cgroup/manager/cgroup_test.go @@ -81,7 +81,7 @@ func testManager(t *testing.T, version string) { _, _ = GetTasksWithAbsolutePath("/") _ = DropCacheWithTimeoutForContainer(context.Background(), "fake-pod", "fake-container", 1, 0) - _ = DropCacheWithTimeoutWithRelativePath(1, "/test", 0) + _ = DropCacheWithTimeoutAndAbsCGPath(1, "/test", 0) } func testNetCls(t *testing.T, version string) {