diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go index cf4c92d62..a2472af6a 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go @@ -736,13 +736,22 @@ func (p *DynamicPolicy) handleAdvisorMemoryOffloading(_ *config.Configuration, } } + cpuSetStats, err := cgroupmgr.GetCPUSetWithAbsolutePath(absCGPath) + if err != nil { + return fmt.Errorf("GetCPUSetWithAbsolutePath failed with error: %v", err) + } + mems, err := machine.Parse(cpuSetStats.Mems) + if err != nil { + return fmt.Errorf("parse cpuSetStats failed with error: %v", err) + } + // start a asynchronous work to execute memory offloading err = p.defaultAsyncLimitedWorkers.AddWork( &asyncworker.Work{ Name: memoryOffloadingWorkName, UID: uuid.NewUUID(), Fn: cgroupmgr.MemoryOffloadingWithAbsolutePath, - Params: []interface{}{absCGPath, memoryOffloadingSizeInBytesInt64}, + Params: []interface{}{absCGPath, memoryOffloadingSizeInBytesInt64, mems}, DeliveredAt: time.Now(), }, asyncworker.DuplicateWorkPolicyOverride) if err != nil { diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go index 5d4d410ca..6b336d7e9 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go @@ -608,6 +608,7 @@ func (tmo *transparentMemoryOffloading) GetAdvices() types.InternalMemoryCalcula if tmoEngine.GetConf().EnableSwap { enableSwap = consts.ControlKnobON } + entry := types.ContainerMemoryAdvices{ PodUID: tmoEngine.GetContainerInfo().PodUID, ContainerName: tmoEngine.GetContainerInfo().ContainerName, diff --git a/pkg/util/cgroup/manager/cgroup.go b/pkg/util/cgroup/manager/cgroup.go index 77d58baa5..1cd145d43 100644 --- a/pkg/util/cgroup/manager/cgroup.go +++ b/pkg/util/cgroup/manager/cgroup.go @@ -23,15 +23,31 @@ import ( "math" "os/exec" "path/filepath" + "runtime" "strconv" + "syscall" "time" + "unsafe" + + "k8s.io/klog/v2" "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/asyncworker" + "github.com/kubewharf/katalyst-core/pkg/util/bitmask" "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" "github.com/kubewharf/katalyst-core/pkg/util/eventbus" "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/machine" +) + +const ( + MPOL_DEFAULT = iota + MPOL_PREFERRED + MPOL_BIND + MPOL_INTERLEAVE + MPOL_LOCAL + MPOL_MAX ) func ApplyMemoryWithRelativePath(relCgroupPath string, data *common.MemoryData) error { @@ -476,7 +492,71 @@ func DisableSwapMaxWithAbsolutePathRecursive(absCgroupPath string) error { return nil } -func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string, nbytes int64) error { +func GetMemPolicy(addr unsafe.Pointer, flags int) (mode int, nodemask bitmask.BitMask, err error) { + var mask uint64 + _, _, errno := syscall.Syscall6(syscall.SYS_GET_MEMPOLICY, + uintptr(unsafe.Pointer(&mode)), uintptr(unsafe.Pointer(&mask)), 64, uintptr(addr), uintptr(flags), 0) + if errno != 0 { + err = errno + } + nodemask = bitmask.NewEmptyBitMask() + bit := 0 + for mask != 0 { + if mask&1 == 1 { + nodemask.Add(bit) + } + mask >>= 1 + bit++ + } + return +} + +func SetMemPolicy(mode int, nodemask bitmask.BitMask) (err error) { + var mask uint64 + for _, bit := range nodemask.GetBits() { + mask |= 1 << uint64(bit) + } + _, _, errno := syscall.Syscall(syscall.SYS_SET_MEMPOLICY, uintptr(mode), uintptr(unsafe.Pointer(&mask)), 64) + if errno != 0 { + err = errno + } + return +} + +func doReclaimMemory(cmd string, mems machine.CPUSet) error { + // When memory is reclaimed by calling memory.reclaim interface and offloaded to zram, + // kernel will allocate additional zram memory at the NUMAs allowed for the caller process to store the compressed memory. + // However, the allowed NUMA nodes of katalyst and business processes may not be consistent, + // which will lead to the allocation of zram memory on NUMAs that are not affinity to the business processes, + // bringing unpredictable problems. Therefore, before reclaiming business memory, + // we have to temporarily set the allowed NUMAs of katalyst same as business workload, and restore it after reclaiming. + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + // save original memory policy + mode, mask, err := GetMemPolicy(nil, 0) + if err != nil { + return err + } + + newMask := bitmask.NewEmptyBitMask() + newMask.Add(mems.ToSliceInt()...) + + if err := SetMemPolicy(MPOL_BIND, newMask); err != nil { + return err + } + + _, err = exec.Command("bash", "-c", cmd).Output() + klog.ErrorS(err, "failed to exec %v", cmd) + + // restore original memory policy + if err := SetMemPolicy(mode, mask); err != nil { + return err + } + return nil +} + +func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string, nbytes int64, mems machine.CPUSet) error { startTime := time.Now() var cmd string @@ -493,7 +573,7 @@ func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string, return nil } - _, err := exec.Command("bash", "-c", cmd).Output() + err := doReclaimMemory(cmd, mems) _ = asyncworker.EmitAsyncedMetrics(ctx, metrics.ConvertMapToTags(map[string]string{ "absCGPath": absCgroupPath, diff --git a/pkg/util/cgroup/manager/cgroup_test.go b/pkg/util/cgroup/manager/cgroup_test.go index 8d4f661d5..8a3383865 100644 --- a/pkg/util/cgroup/manager/cgroup_test.go +++ b/pkg/util/cgroup/manager/cgroup_test.go @@ -26,15 +26,18 @@ import ( "math" "os" "path/filepath" + "runtime" "testing" "bou.ke/monkey" "github.com/opencontainers/runc/libcontainer/cgroups" "github.com/stretchr/testify/assert" + "github.com/kubewharf/katalyst-core/pkg/util/bitmask" "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" v1 "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager/v1" v2 "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager/v2" + "github.com/kubewharf/katalyst-core/pkg/util/machine" ) func TestManager(t *testing.T) { @@ -60,6 +63,7 @@ func testV2Manager(t *testing.T) { testManager(t, "v2") testSwapMax(t) testMemPressure(t) + testMemoryOffloadingWithAbsolutePath(t) } func testManager(t *testing.T, version string) { @@ -231,3 +235,77 @@ func testMemPressureV1(t *testing.T) { assert.Equal(t, "0", fmt.Sprint(some.Avg60)) assert.Equal(t, "0", fmt.Sprint(some.Avg300)) } + +func TestMemoryPolicy(t *testing.T) { + t.Parallel() + + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + mode, mask, err := GetMemPolicy(nil, 0) + assert.NoError(t, err) + + assert.Equal(t, MPOL_DEFAULT, mode) + + t.Logf("mask: %v", mask) + + mems := machine.NewCPUSet(0) + newMask := bitmask.NewEmptyBitMask() + newMask.Add(mems.ToSliceInt()...) + + err = SetMemPolicy(MPOL_BIND, newMask) + assert.NoError(t, err) + + mode, mask, err = GetMemPolicy(nil, 0) + assert.NoError(t, err) + + assert.Equal(t, MPOL_BIND, mode) + + expectMask, err := bitmask.NewBitMask(0) + assert.NoError(t, err) + + assert.Equal(t, expectMask, mask) + + t.Logf("mask: %v", mask) +} + +func testMemoryOffloadingWithAbsolutePath(t *testing.T) { + defer monkey.UnpatchAll() + monkey.Patch(common.CheckCgroup2UnifiedMode, func() bool { return true }) + monkey.Patch(GetManager, func() Manager { return v2.NewManager() }) + monkey.Patch(cgroups.ReadFile, func(dir, file string) (string, error) { + f := filepath.Join(dir, file) + tmp, err := ioutil.ReadFile(f) + if err != nil { + return "", err + } + return string(tmp), nil + }) + monkey.Patch(cgroups.WriteFile, func(dir, file, data string) error { + f := filepath.Join(dir, file) + return ioutil.WriteFile(f, []byte(data), 0o700) + }) + + rootDir := os.TempDir() + dir := filepath.Join(rootDir, "tmp") + err := os.Mkdir(dir, 0o700) + assert.NoError(t, err) + + tmpDir, err := ioutil.TempDir(dir, "fake-cgroup") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + monkey.Patch(common.GetCgroupRootPath, func(s string) string { + t.Logf("rootDir=%v", rootDir) + return rootDir + }) + + err = MemoryOffloadingWithAbsolutePath(context.TODO(), tmpDir, 100, machine.NewCPUSet(0)) + assert.NoError(t, err) + + reclaimFile := filepath.Join(tmpDir, "memory.reclaim") + + s, err := ioutil.ReadFile(reclaimFile) + assert.NoError(t, err) + assert.Equal(t, fmt.Sprintf("%v\n", 100), string(s)) +}