Skip to content

Commit

Permalink
refactor: make kernel allocate zram memory at allowed NUMAs of worklo…
Browse files Browse the repository at this point in the history
…ad process

Signed-off-by: linzhecheng <linzhecheng@bytedance.com>
  • Loading branch information
cheney-lin committed Sep 12, 2024
1 parent f975841 commit 1c50599
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -736,13 +736,22 @@ func (p *DynamicPolicy) handleAdvisorMemoryOffloading(_ *config.Configuration,
}
}

cpuSetStats, err := cgroupmgr.GetCPUSetWithAbsolutePath(absCGPath)
if err != nil {
return fmt.Errorf("GetCPUSetWithAbsolutePath failed with error: %v", err)
}
mems, err := machine.Parse(cpuSetStats.Mems)
if err != nil {
return fmt.Errorf("parse cpuSetStats failed with error: %v", err)
}

// start a asynchronous work to execute memory offloading
err = p.defaultAsyncLimitedWorkers.AddWork(
&asyncworker.Work{
Name: memoryOffloadingWorkName,
UID: uuid.NewUUID(),
Fn: cgroupmgr.MemoryOffloadingWithAbsolutePath,
Params: []interface{}{absCGPath, memoryOffloadingSizeInBytesInt64},
Params: []interface{}{absCGPath, memoryOffloadingSizeInBytesInt64, mems},
DeliveredAt: time.Now(),
}, asyncworker.DuplicateWorkPolicyOverride)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ func (tmo *transparentMemoryOffloading) GetAdvices() types.InternalMemoryCalcula
if tmoEngine.GetConf().EnableSwap {
enableSwap = consts.ControlKnobON
}

entry := types.ContainerMemoryAdvices{
PodUID: tmoEngine.GetContainerInfo().PodUID,
ContainerName: tmoEngine.GetContainerInfo().ContainerName,
Expand Down
84 changes: 82 additions & 2 deletions pkg/util/cgroup/manager/cgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,31 @@ import (
"math"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"syscall"
"time"
"unsafe"

"k8s.io/klog/v2"

"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/asyncworker"
"github.com/kubewharf/katalyst-core/pkg/util/bitmask"
"github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
"github.com/kubewharf/katalyst-core/pkg/util/eventbus"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)

const (
MPOL_DEFAULT = iota
MPOL_PREFERRED
MPOL_BIND
MPOL_INTERLEAVE
MPOL_LOCAL
MPOL_MAX
)

func ApplyMemoryWithRelativePath(relCgroupPath string, data *common.MemoryData) error {
Expand Down Expand Up @@ -476,7 +492,71 @@ func DisableSwapMaxWithAbsolutePathRecursive(absCgroupPath string) error {
return nil
}

func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string, nbytes int64) error {
func GetMemPolicy(addr unsafe.Pointer, flags int) (mode int, nodemask bitmask.BitMask, err error) {
var mask uint64
_, _, errno := syscall.Syscall6(syscall.SYS_GET_MEMPOLICY,
uintptr(unsafe.Pointer(&mode)), uintptr(unsafe.Pointer(&mask)), 64, uintptr(addr), uintptr(flags), 0)
if errno != 0 {
err = errno
}
nodemask = bitmask.NewEmptyBitMask()
bit := 0
for mask != 0 {
if mask&1 == 1 {
nodemask.Add(bit)
}
mask >>= 1
bit++
}
return
}

func SetMemPolicy(mode int, nodemask bitmask.BitMask) (err error) {
var mask uint64
for _, bit := range nodemask.GetBits() {
mask |= 1 << uint64(bit)
}
_, _, errno := syscall.Syscall(syscall.SYS_SET_MEMPOLICY, uintptr(mode), uintptr(unsafe.Pointer(&mask)), 64)
if errno != 0 {
err = errno
}
return
}

func doReclaimMemory(cmd string, mems machine.CPUSet) error {
// When memory is reclaimed by calling memory.reclaim interface and offloaded to zram,
// kernel will allocate additional zram memory at the NUMAs allowed for the caller process to store the compressed memory.
// However, the allowed NUMA nodes of katalyst and business processes may not be consistent,
// which will lead to the allocation of zram memory on NUMAs that are not affinity to the business processes,
// bringing unpredictable problems. Therefore, before reclaiming business memory,
// we have to temporarily set the allowed NUMAs of katalyst same as business workload, and restore it after reclaiming.
runtime.LockOSThread()
defer runtime.UnlockOSThread()

// save original memory policy
mode, mask, err := GetMemPolicy(nil, 0)
if err != nil {
return err
}

newMask := bitmask.NewEmptyBitMask()
newMask.Add(mems.ToSliceInt()...)

if err := SetMemPolicy(MPOL_BIND, newMask); err != nil {
return err
}

_, err = exec.Command("bash", "-c", cmd).Output()
klog.ErrorS(err, "failed to exec %v", cmd)

// restore original memory policy
if err := SetMemPolicy(mode, mask); err != nil {
return err
}
return nil
}

func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string, nbytes int64, mems machine.CPUSet) error {
startTime := time.Now()

var cmd string
Expand All @@ -493,7 +573,7 @@ func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string,
return nil
}

_, err := exec.Command("bash", "-c", cmd).Output()
err := doReclaimMemory(cmd, mems)

_ = asyncworker.EmitAsyncedMetrics(ctx, metrics.ConvertMapToTags(map[string]string{
"absCGPath": absCgroupPath,
Expand Down

0 comments on commit 1c50599

Please sign in to comment.