Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make kernel allocate zram memory at allowed NUMAs of workload process #685

Merged
merged 2 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -736,13 +736,22 @@ func (p *DynamicPolicy) handleAdvisorMemoryOffloading(_ *config.Configuration,
}
}

cpuSetStats, err := cgroupmgr.GetCPUSetWithAbsolutePath(absCGPath)
if err != nil {
return fmt.Errorf("GetCPUSetWithAbsolutePath failed with error: %v", err)
}
mems, err := machine.Parse(cpuSetStats.Mems)
if err != nil {
return fmt.Errorf("parse cpuSetStats failed with error: %v", err)
}

// start a asynchronous work to execute memory offloading
err = p.defaultAsyncLimitedWorkers.AddWork(
&asyncworker.Work{
Name: memoryOffloadingWorkName,
UID: uuid.NewUUID(),
Fn: cgroupmgr.MemoryOffloadingWithAbsolutePath,
Params: []interface{}{absCGPath, memoryOffloadingSizeInBytesInt64},
Params: []interface{}{absCGPath, memoryOffloadingSizeInBytesInt64, mems},
DeliveredAt: time.Now(),
}, asyncworker.DuplicateWorkPolicyOverride)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ func (tmo *transparentMemoryOffloading) GetAdvices() types.InternalMemoryCalcula
if tmoEngine.GetConf().EnableSwap {
enableSwap = consts.ControlKnobON
}

entry := types.ContainerMemoryAdvices{
PodUID: tmoEngine.GetContainerInfo().PodUID,
ContainerName: tmoEngine.GetContainerInfo().ContainerName,
Expand Down
84 changes: 82 additions & 2 deletions pkg/util/cgroup/manager/cgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,31 @@ import (
"math"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"syscall"
"time"
"unsafe"

"k8s.io/klog/v2"

"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/asyncworker"
"github.com/kubewharf/katalyst-core/pkg/util/bitmask"
"github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
"github.com/kubewharf/katalyst-core/pkg/util/eventbus"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)

const (
MPOL_DEFAULT = iota
MPOL_PREFERRED
MPOL_BIND
MPOL_INTERLEAVE
MPOL_LOCAL
MPOL_MAX
)

func ApplyMemoryWithRelativePath(relCgroupPath string, data *common.MemoryData) error {
Expand Down Expand Up @@ -476,7 +492,71 @@ func DisableSwapMaxWithAbsolutePathRecursive(absCgroupPath string) error {
return nil
}

func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string, nbytes int64) error {
func GetMemPolicy(addr unsafe.Pointer, flags int) (mode int, nodemask bitmask.BitMask, err error) {
var mask uint64
_, _, errno := syscall.Syscall6(syscall.SYS_GET_MEMPOLICY,
uintptr(unsafe.Pointer(&mode)), uintptr(unsafe.Pointer(&mask)), 64, uintptr(addr), uintptr(flags), 0)
if errno != 0 {
err = errno
}
nodemask = bitmask.NewEmptyBitMask()
bit := 0
for mask != 0 {
if mask&1 == 1 {
nodemask.Add(bit)
}
mask >>= 1
bit++
}
return
}

func SetMemPolicy(mode int, nodemask bitmask.BitMask) (err error) {
var mask uint64
for _, bit := range nodemask.GetBits() {
mask |= 1 << uint64(bit)
}
_, _, errno := syscall.Syscall(syscall.SYS_SET_MEMPOLICY, uintptr(mode), uintptr(unsafe.Pointer(&mask)), 64)
if errno != 0 {
err = errno
}
return
}

func doReclaimMemory(cmd string, mems machine.CPUSet) error {
// When memory is reclaimed by calling memory.reclaim interface and offloaded to zram,
// kernel will allocate additional zram memory at the NUMAs allowed for the caller process to store the compressed memory.
// However, the allowed NUMA nodes of katalyst and business processes may not be consistent,
// which will lead to the allocation of zram memory on NUMAs that are not affinity to the business processes,
// bringing unpredictable problems. Therefore, before reclaiming business memory,
// we have to temporarily set the allowed NUMAs of katalyst same as business workload, and restore it after reclaiming.
runtime.LockOSThread()
defer runtime.UnlockOSThread()

// save original memory policy
mode, mask, err := GetMemPolicy(nil, 0)
if err != nil {
return err
}

newMask := bitmask.NewEmptyBitMask()
newMask.Add(mems.ToSliceInt()...)

if err := SetMemPolicy(MPOL_BIND, newMask); err != nil {
return err
}

_, err = exec.Command("bash", "-c", cmd).Output()
klog.ErrorS(err, "failed to exec %v", cmd)

// restore original memory policy
if err := SetMemPolicy(mode, mask); err != nil {
return err
}
return nil
}

func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string, nbytes int64, mems machine.CPUSet) error {
startTime := time.Now()

var cmd string
Expand All @@ -493,7 +573,7 @@ func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string,
return nil
}

_, err := exec.Command("bash", "-c", cmd).Output()
err := doReclaimMemory(cmd, mems)

_ = asyncworker.EmitAsyncedMetrics(ctx, metrics.ConvertMapToTags(map[string]string{
"absCGPath": absCgroupPath,
Expand Down
78 changes: 78 additions & 0 deletions pkg/util/cgroup/manager/cgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@ import (
"math"
"os"
"path/filepath"
"runtime"
"testing"

"bou.ke/monkey"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/stretchr/testify/assert"

"github.com/kubewharf/katalyst-core/pkg/util/bitmask"
"github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
v1 "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager/v1"
v2 "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager/v2"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)

func TestManager(t *testing.T) {
Expand All @@ -60,6 +63,7 @@ func testV2Manager(t *testing.T) {
testManager(t, "v2")
testSwapMax(t)
testMemPressure(t)
testMemoryOffloadingWithAbsolutePath(t)
}

func testManager(t *testing.T, version string) {
Expand Down Expand Up @@ -231,3 +235,77 @@ func testMemPressureV1(t *testing.T) {
assert.Equal(t, "0", fmt.Sprint(some.Avg60))
assert.Equal(t, "0", fmt.Sprint(some.Avg300))
}

func TestMemoryPolicy(t *testing.T) {
t.Parallel()

runtime.LockOSThread()
defer runtime.UnlockOSThread()

mode, mask, err := GetMemPolicy(nil, 0)
assert.NoError(t, err)

assert.Equal(t, MPOL_DEFAULT, mode)

t.Logf("mask: %v", mask)

mems := machine.NewCPUSet(0)
newMask := bitmask.NewEmptyBitMask()
newMask.Add(mems.ToSliceInt()...)

err = SetMemPolicy(MPOL_BIND, newMask)
assert.NoError(t, err)

mode, mask, err = GetMemPolicy(nil, 0)
assert.NoError(t, err)

assert.Equal(t, MPOL_BIND, mode)

expectMask, err := bitmask.NewBitMask(0)
assert.NoError(t, err)

assert.Equal(t, expectMask, mask)

t.Logf("mask: %v", mask)
}

func testMemoryOffloadingWithAbsolutePath(t *testing.T) {
defer monkey.UnpatchAll()
monkey.Patch(common.CheckCgroup2UnifiedMode, func() bool { return true })
monkey.Patch(GetManager, func() Manager { return v2.NewManager() })
monkey.Patch(cgroups.ReadFile, func(dir, file string) (string, error) {
f := filepath.Join(dir, file)
tmp, err := ioutil.ReadFile(f)
if err != nil {
return "", err
}
return string(tmp), nil
})
monkey.Patch(cgroups.WriteFile, func(dir, file, data string) error {
f := filepath.Join(dir, file)
return ioutil.WriteFile(f, []byte(data), 0o700)
})

rootDir := os.TempDir()
dir := filepath.Join(rootDir, "tmp")
err := os.Mkdir(dir, 0o700)
assert.NoError(t, err)

tmpDir, err := ioutil.TempDir(dir, "fake-cgroup")
assert.NoError(t, err)
defer os.RemoveAll(dir)

monkey.Patch(common.GetCgroupRootPath, func(s string) string {
t.Logf("rootDir=%v", rootDir)
return rootDir
})

err = MemoryOffloadingWithAbsolutePath(context.TODO(), tmpDir, 100, machine.NewCPUSet(0))
assert.NoError(t, err)

reclaimFile := filepath.Join(tmpDir, "memory.reclaim")

s, err := ioutil.ReadFile(reclaimFile)
assert.NoError(t, err)
assert.Equal(t, fmt.Sprintf("%v\n", 100), string(s))
}
Loading