Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(qrm): use async worker to set extra cg mem limit to avoid thread stucking #563

Merged
merged 1 commit into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ import (
const (
MemoryResourcePluginPolicyNameDynamic = string(apiconsts.ResourcePluginPolicyNameDynamic)

memoryPluginStateFileName = "memory_plugin_state"
memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers"
memoryPluginAsyncWorkTopicDropCache = "qrm_memory_plugin_drop_cache"
memoryPluginAsyncWorkTopicMovePage = "qrm_memory_plugin_move_page"
memoryPluginAsyncWorkTopicMemoryOffloading = "qrm_memory_plugin_mem_offload"

dropCacheTimeoutSeconds = 30
memoryPluginStateFileName = "memory_plugin_state"
memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers"
memoryPluginAsyncWorkTopicDropCache = "qrm_memory_plugin_drop_cache"
memoryPluginAsyncWorkTopicSetExtraCGMemLimit = "qrm_memory_plugin_set_extra_mem_limit"
memoryPluginAsyncWorkTopicMovePage = "qrm_memory_plugin_move_page"
memoryPluginAsyncWorkTopicMemoryOffloading = "qrm_memory_plugin_mem_offload"

dropCacheTimeoutSeconds = 30
setExtraCGMemLimitTimeoutSeconds = 60
)

const (
Expand Down Expand Up @@ -230,7 +232,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
}

memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyMemoryLimitInBytes,
memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorMemoryLimitInBytes))
memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryLimitInBytes))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyCPUSetMems,
memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorCPUSetMems))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyDropCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (p *DynamicPolicy) handleAdvisorResp(advisorResp *advisorsvc.ListAndWatchRe
return nil
}

func handleAdvisorMemoryLimitInBytes(
func (p *DynamicPolicy) handleAdvisorMemoryLimitInBytes(
_ *config.Configuration,
_ interface{},
_ *dynamicconfig.DynamicAgentConfiguration,
Expand All @@ -247,13 +247,16 @@ func handleAdvisorMemoryLimitInBytes(
}

if calculationInfo.CgroupPath != "" {
err = cgroupmgr.ApplyMemoryWithRelativePath(calculationInfo.CgroupPath, &cgroupcommon.MemoryData{
LimitInBytes: calculatedLimitInBytesInt64,
})
setExtraCGMemLimitWorkName := util.GetAsyncWorkNameByPrefix(calculationInfo.CgroupPath, memoryPluginAsyncWorkTopicSetExtraCGMemLimit)
err = p.asyncWorkers.AddWork(setExtraCGMemLimitWorkName,
&asyncworker.Work{
Fn: cgroupmgr.SetExtraCGMemLimitWithTimeoutAndRelCGPath,
Params: []interface{}{calculationInfo.CgroupPath, setExtraCGMemLimitTimeoutSeconds, calculatedLimitInBytesInt64},
DeliveredAt: time.Now()}, asyncworker.DuplicateWorkPolicyDiscard)

if err != nil {
return fmt.Errorf("apply %s: %d to cgroup: %s failed with error: %v",
memoryadvisor.ControlKnobKeyMemoryLimitInBytes, calculatedLimitInBytesInt64,
calculationInfo.CgroupPath, err)
return fmt.Errorf("add work: %s pod: %s container: %s failed with error: %v",
setExtraCGMemLimitWorkName, entryName, subEntryName, err)
}

_ = emitter.StoreInt64(util.MetricNameMemoryHandleAdvisorMemoryLimit, calculatedLimitInBytesInt64,
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, machi
consts.PodAnnotationQoSLevelReclaimedCores: policyImplement.reclaimedCoresHintHandler,
}

policyImplement.asyncWorkers = asyncworker.NewAsyncWorkers(memoryPluginAsyncWorkersName, policyImplement.emitter)

return policyImplement, nil
}

Expand Down Expand Up @@ -2129,7 +2131,7 @@ func TestHandleAdvisorResp(t *testing.T) {
}

memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyMemoryLimitInBytes,
memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorMemoryLimitInBytes))
memoryadvisor.ControlKnobHandlerWithChecker(dynamicPolicy.handleAdvisorMemoryLimitInBytes))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyCPUSetMems,
memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorCPUSetMems))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyDropCache,
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/qrm-plugins/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ func GetCgroupAsyncWorkName(cgroup, topic string) string {
return strings.Join([]string{cgroup, topic}, asyncworker.WorkNameSeperator)
}

func GetAsyncWorkNameByPrefix(prefix, topic string) string {
return strings.Join([]string{prefix, topic}, asyncworker.WorkNameSeperator)
}

func GetKubeletReservedQuantity(resourceName string, klConfig *kubeletconfigv1beta1.KubeletConfiguration) (resource.Quantity, bool, error) {
if klConfig == nil {
return resource.MustParse("0"), false, fmt.Errorf("nil klConfig")
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/asyncworker/async_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (aws *AsyncWorkers) AddWork(workName string, work *Work, policy DuplicateWo
"AsyncWorkers", aws.name, "workName", workName)
status = &workStatus{}
aws.workStatuses[workName] = status
} else if policy == DuplicateWorkPolicyDiscard {
} else if status.IsWorking() && policy == DuplicateWorkPolicyDiscard {
general.InfoS("work %v already exists, discard new work", workName)
return nil
}
Expand Down Expand Up @@ -267,8 +267,8 @@ func (aws *AsyncWorkers) WorkExists(workName string) bool {
aws.workLock.Lock()
defer aws.workLock.Unlock()

_, hasRunningWork := aws.workStatuses[workName]
if hasRunningWork {
status, hasRunningWork := aws.workStatuses[workName]
if hasRunningWork && status.IsWorking() {
return true
}

Expand Down
57 changes: 52 additions & 5 deletions pkg/util/cgroup/manager/cgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,12 @@ func GetCPUSetForContainer(podUID, containerId string) (*common.CPUSetStats, err
}

func DropCacheWithTimeoutForContainer(ctx context.Context, podUID, containerId string, timeoutSecs int, nbytes int64) error {
cpusetAbsCGPath, err := common.GetContainerAbsCgroupPath(common.CgroupSubsysMemory, podUID, containerId)
memoryAbsCGPath, err := common.GetContainerAbsCgroupPath(common.CgroupSubsysMemory, podUID, containerId)
if err != nil {
return fmt.Errorf("GetContainerAbsCgroupPath failed with error: %v", err)
}

err = DropCacheWithTimeoutWithRelativePath(timeoutSecs, cpusetAbsCGPath, nbytes)
err = DropCacheWithTimeoutAndAbsCGPath(timeoutSecs, memoryAbsCGPath, nbytes)
_ = asyncworker.EmitAsyncedMetrics(ctx, metrics.ConvertMapToTags(map[string]string{
"podUID": podUID,
"containerID": containerId,
Expand All @@ -273,13 +273,13 @@ func DropCacheWithTimeoutForContainer(ctx context.Context, podUID, containerId s
return err
}

func DropCacheWithTimeoutWithRelativePath(timeoutSecs int, absCgroupPath string, nbytes int64) error {
func DropCacheWithTimeoutAndAbsCGPath(timeoutSecs int, absCgroupPath string, nbytes int64) error {
startTime := time.Now()

var cmd string
if common.CheckCgroup2UnifiedMode() {
if nbytes == 0 {
general.Infof("[DropCacheWithTimeoutWithRelativePath] skip drop cache on %s since nbytes is zero", absCgroupPath)
general.Infof("[DropCacheWithTimeoutAndAbsCGPath] skip drop cache on %s since nbytes is zero", absCgroupPath)
return nil
}
//cgv2
Expand All @@ -292,7 +292,54 @@ func DropCacheWithTimeoutWithRelativePath(timeoutSecs int, absCgroupPath string,
_, err := exec.Command("bash", "-c", cmd).Output()

delta := time.Since(startTime).Seconds()
general.Infof("[DropCacheWithTimeoutWithRelativePath] it takes %v to do \"%s\" on cgroup: %s", delta, cmd, absCgroupPath)
general.Infof("[DropCacheWithTimeoutAndAbsCGPath] it takes %v to do \"%s\" on cgroup: %s", delta, cmd, absCgroupPath)

// if this command timeout, a none-nil error will be returned,
// but we should return error iff error returns without timeout
if err != nil && int(delta) < timeoutSecs {
return err
}

return nil
}

func SetExtraCGMemLimitWithTimeoutAndRelCGPath(ctx context.Context, relCgroupPath string, timeoutSecs int, nbytes int64) error {
memoryAbsCGPath := common.GetAbsCgroupPath(common.CgroupSubsysMemory, relCgroupPath)

err := SetExtraCGMemLimitWithTimeoutAndAbsCGPath(timeoutSecs, memoryAbsCGPath, nbytes)
_ = asyncworker.EmitAsyncedMetrics(ctx, metrics.ConvertMapToTags(map[string]string{
zzzzhhb marked this conversation as resolved.
Show resolved Hide resolved
"relCgroupPath": relCgroupPath,
"succeeded": fmt.Sprintf("%v", err == nil),
})...)
return err
}

func SetExtraCGMemLimitWithTimeoutAndAbsCGPath(timeoutSecs int, absCgroupPath string, nbytes int64) error {
if nbytes == 0 {
return fmt.Errorf("invalid memory limit nbytes: %d", nbytes)
}

startTime := time.Now()

var interfacePath string
if common.CheckCgroup2UnifiedMode() {
if nbytes == 0 {
general.Infof("[SetExtraCGMemLimitWithTimeoutAndAbsCGPath] skip drop cache on %s since nbytes is zero", absCgroupPath)
return nil
}
//cgv2
interfacePath = filepath.Join(absCgroupPath, "memory.max")
} else {
//cgv1
interfacePath = filepath.Join(absCgroupPath, "memory.limit_in_bytes")
}

cmd := fmt.Sprintf("timeout %d echo %d > %s", timeoutSecs, nbytes, interfacePath)

_, err := exec.Command("bash", "-c", cmd).Output()
zzzzhhb marked this conversation as resolved.
Show resolved Hide resolved

delta := time.Since(startTime).Seconds()
general.Infof("[SetExtraCGMemLimitWithTimeoutAndAbsCGPath] it takes %v to do \"%s\" on cgroup: %s", delta, cmd, absCgroupPath)

// if this command timeout, a none-nil error will be returned,
// but we should return error iff error returns without timeout
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/cgroup/manager/cgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func testManager(t *testing.T, version string) {
_, _ = GetTasksWithAbsolutePath("/")

_ = DropCacheWithTimeoutForContainer(context.Background(), "fake-pod", "fake-container", 1, 0)
_ = DropCacheWithTimeoutWithRelativePath(1, "/test", 0)
_ = DropCacheWithTimeoutAndAbsCGPath(1, "/test", 0)
}

func testNetCls(t *testing.T, version string) {
Expand Down
Loading