Skip to content

Commit

Permalink
feat(qrm): use async worker to set extra cg mem limit to avoid thread…
Browse files Browse the repository at this point in the history
… stucking
  • Loading branch information
csfldf committed Apr 29, 2024
1 parent f39c3e0 commit effa978
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 24 deletions.
16 changes: 9 additions & 7 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ import (
const (
MemoryResourcePluginPolicyNameDynamic = string(apiconsts.ResourcePluginPolicyNameDynamic)

memoryPluginStateFileName = "memory_plugin_state"
memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers"
memoryPluginAsyncWorkTopicDropCache = "qrm_memory_plugin_drop_cache"
memoryPluginAsyncWorkTopicMovePage = "qrm_memory_plugin_move_page"

dropCacheTimeoutSeconds = 30
memoryPluginStateFileName = "memory_plugin_state"
memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers"
memoryPluginAsyncWorkTopicDropCache = "qrm_memory_plugin_drop_cache"
memoryPluginAsyncWorkTopicSetExtraCGMemLimit = "qrm_memory_plugin_set_extra_mem_limit"
memoryPluginAsyncWorkTopicMovePage = "qrm_memory_plugin_move_page"

dropCacheTimeoutSeconds = 30
setExtraCGMemLimitTimeoutSeconds = 60
)

const (
Expand Down Expand Up @@ -229,7 +231,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
}

memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyMemoryLimitInBytes,
memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorMemoryLimitInBytes))
memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryLimitInBytes))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyCPUSetMems,
memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorCPUSetMems))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyDropCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (p *DynamicPolicy) handleAdvisorResp(advisorResp *advisorsvc.ListAndWatchRe
return nil
}

func handleAdvisorMemoryLimitInBytes(
func (p *DynamicPolicy) handleAdvisorMemoryLimitInBytes(
_ *config.Configuration,
_ interface{},
_ *dynamicconfig.DynamicAgentConfiguration,
Expand All @@ -245,13 +245,16 @@ func handleAdvisorMemoryLimitInBytes(
}

if calculationInfo.CgroupPath != "" {
err = cgroupmgr.ApplyMemoryWithRelativePath(calculationInfo.CgroupPath, &cgroupcommon.MemoryData{
LimitInBytes: calculatedLimitInBytesInt64,
})
setExtraCGMemLimitWorkName := util.GetAsyncWorkNameByPrefix(calculationInfo.CgroupPath, memoryPluginAsyncWorkTopicSetExtraCGMemLimit)
err = p.asyncWorkers.AddWork(setExtraCGMemLimitWorkName,
&asyncworker.Work{
Fn: cgroupmgr.SetExtraCGMemLimitWithTimeoutAndRelCGPath,
Params: []interface{}{calculationInfo.CgroupPath, setExtraCGMemLimitTimeoutSeconds, calculatedLimitInBytesInt64},
DeliveredAt: time.Now()}, asyncworker.DuplicateWorkPolicyOverride)

if err != nil {
return fmt.Errorf("apply %s: %d to cgroup: %s failed with error: %v",
memoryadvisor.ControlKnobKeyMemoryLimitInBytes, calculatedLimitInBytesInt64,
calculationInfo.CgroupPath, err)
return fmt.Errorf("add work: %s pod: %s container: %s failed with error: %v",
setExtraCGMemLimitWorkName, entryName, subEntryName, err)
}

_ = emitter.StoreInt64(util.MetricNameMemoryHandleAdvisorMemoryLimit, calculatedLimitInBytesInt64,
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, machi
consts.PodAnnotationQoSLevelReclaimedCores: policyImplement.reclaimedCoresHintHandler,
}

policyImplement.asyncWorkers = asyncworker.NewAsyncWorkers(memoryPluginAsyncWorkersName, policyImplement.emitter)

return policyImplement, nil
}

Expand Down Expand Up @@ -2115,7 +2117,7 @@ func TestHandleAdvisorResp(t *testing.T) {
}

memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyMemoryLimitInBytes,
memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorMemoryLimitInBytes))
memoryadvisor.ControlKnobHandlerWithChecker(dynamicPolicy.handleAdvisorMemoryLimitInBytes))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyCPUSetMems,
memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorCPUSetMems))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyDropCache,
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/qrm-plugins/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ func GetContainerAsyncWorkName(podUID, containerName, topic string) string {
return strings.Join([]string{podUID, containerName, topic}, asyncworker.WorkNameSeperator)
}

func GetAsyncWorkNameByPrefix(prefix, topic string) string {
return strings.Join([]string{prefix, topic}, asyncworker.WorkNameSeperator)
}

func GetKubeletReservedQuantity(resourceName string, klConfig *kubeletconfigv1beta1.KubeletConfiguration) (resource.Quantity, bool, error) {
if klConfig == nil {
return resource.MustParse("0"), false, fmt.Errorf("nil klConfig")
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/asyncworker/async_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (aws *AsyncWorkers) AddWork(workName string, work *Work, policy DuplicateWo
"AsyncWorkers", aws.name, "workName", workName)
status = &workStatus{}
aws.workStatuses[workName] = status
} else if policy == DuplicateWorkPolicyDiscard {
} else if status.IsWorking() && policy == DuplicateWorkPolicyDiscard {
general.InfoS("work %v already exists, discard new work", workName)
return nil
}
Expand Down Expand Up @@ -267,8 +267,8 @@ func (aws *AsyncWorkers) WorkExists(workName string) bool {
aws.workLock.Lock()
defer aws.workLock.Unlock()

_, hasRunningWork := aws.workStatuses[workName]
if hasRunningWork {
status, hasRunningWork := aws.workStatuses[workName]
if hasRunningWork && status.IsWorking() {
return true
}

Expand Down
57 changes: 52 additions & 5 deletions pkg/util/cgroup/manager/cgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,12 @@ func GetCPUSetForContainer(podUID, containerId string) (*common.CPUSetStats, err
}

func DropCacheWithTimeoutForContainer(ctx context.Context, podUID, containerId string, timeoutSecs int, nbytes int64) error {
cpusetAbsCGPath, err := common.GetContainerAbsCgroupPath(common.CgroupSubsysMemory, podUID, containerId)
memoryAbsCGPath, err := common.GetContainerAbsCgroupPath(common.CgroupSubsysMemory, podUID, containerId)
if err != nil {
return fmt.Errorf("GetContainerAbsCgroupPath failed with error: %v", err)
}

err = DropCacheWithTimeoutWithRelativePath(timeoutSecs, cpusetAbsCGPath, nbytes)
err = DropCacheWithTimeoutAndAbsCGPath(timeoutSecs, memoryAbsCGPath, nbytes)
_ = asyncworker.EmitAsyncedMetrics(ctx, metrics.ConvertMapToTags(map[string]string{
"podUID": podUID,
"containerID": containerId,
Expand All @@ -271,13 +271,13 @@ func DropCacheWithTimeoutForContainer(ctx context.Context, podUID, containerId s
return err
}

func DropCacheWithTimeoutWithRelativePath(timeoutSecs int, absCgroupPath string, nbytes int64) error {
func DropCacheWithTimeoutAndAbsCGPath(timeoutSecs int, absCgroupPath string, nbytes int64) error {
startTime := time.Now()

var cmd string
if common.CheckCgroup2UnifiedMode() {
if nbytes == 0 {
general.Infof("[DropCacheWithTimeoutWithRelativePath] skip drop cache on %s since nbytes is zero", absCgroupPath)
general.Infof("[DropCacheWithTimeoutAndAbsCGPath] skip drop cache on %s since nbytes is zero", absCgroupPath)
return nil
}
//cgv2
Expand All @@ -290,7 +290,54 @@ func DropCacheWithTimeoutWithRelativePath(timeoutSecs int, absCgroupPath string,
_, err := exec.Command("bash", "-c", cmd).Output()

delta := time.Since(startTime).Seconds()
general.Infof("[DropCacheWithTimeoutWithRelativePath] it takes %v to do \"%s\" on cgroup: %s", delta, cmd, absCgroupPath)
general.Infof("[DropCacheWithTimeoutAndAbsCGPath] it takes %v to do \"%s\" on cgroup: %s", delta, cmd, absCgroupPath)

// if this command timeout, a none-nil error will be returned,
// but we should return error iff error returns without timeout
if err != nil && int(delta) < timeoutSecs {
return err
}

return nil
}

func SetExtraCGMemLimitWithTimeoutAndRelCGPath(ctx context.Context, relCgroupPath string, timeoutSecs int, nbytes int64) error {
memoryAbsCGPath := common.GetAbsCgroupPath(common.CgroupSubsysMemory, relCgroupPath)

err := SetExtraCGMemLimitWithTimeoutAndAbsCGPath(timeoutSecs, memoryAbsCGPath, nbytes)
_ = asyncworker.EmitAsyncedMetrics(ctx, metrics.ConvertMapToTags(map[string]string{
"relCgroupPath": relCgroupPath,
"succeeded": fmt.Sprintf("%v", err == nil),
})...)
return err
}

func SetExtraCGMemLimitWithTimeoutAndAbsCGPath(timeoutSecs int, absCgroupPath string, nbytes int64) error {
if nbytes == 0 {
return fmt.Errorf("invalid memory limit nbytes: %d", nbytes)
}

startTime := time.Now()

var interfacePath string
if common.CheckCgroup2UnifiedMode() {
if nbytes == 0 {
general.Infof("[SetExtraCGMemLimitWithTimeoutAndAbsCGPath] skip drop cache on %s since nbytes is zero", absCgroupPath)
return nil
}
//cgv2
interfacePath = filepath.Join(absCgroupPath, "memory.max")
} else {
//cgv1
interfacePath = filepath.Join(absCgroupPath, "memory.limit_in_bytes")
}

cmd := fmt.Sprintf("timeout %d echo %d > %s", timeoutSecs, nbytes, interfacePath)

_, err := exec.Command("bash", "-c", cmd).Output()

delta := time.Since(startTime).Seconds()
general.Infof("[SetExtraCGMemLimitWithTimeoutAndAbsCGPath] it takes %v to do \"%s\" on cgroup: %s", delta, cmd, absCgroupPath)

// if this command timeout, a none-nil error will be returned,
// but we should return error iff error returns without timeout
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/cgroup/manager/cgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func testManager(t *testing.T, version string) {
_, _ = GetTasksWithAbsolutePath("/")

_ = DropCacheWithTimeoutForContainer(context.Background(), "fake-pod", "fake-container", 1, 0)
_ = DropCacheWithTimeoutWithRelativePath(1, "/test", 0)
_ = DropCacheWithTimeoutAndAbsCGPath(1, "/test", 0)
}

func testNetCls(t *testing.T, version string) {
Expand Down

0 comments on commit effa978

Please sign in to comment.