diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go index 6a1d83167..8adc4c057 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go @@ -736,13 +736,9 @@ func (p *DynamicPolicy) handleAdvisorMemoryOffloading(_ *config.Configuration, } } - cpuSetStats, err := cgroupmgr.GetCPUSetWithAbsolutePath(absCGPath) + _, mems, err := cgroupmgr.GetEffectiveCPUSetWithAbsolutePath(absCGPath) if err != nil { - return fmt.Errorf("GetCPUSetWithAbsolutePath failed with error: %v", err) - } - mems, err := machine.Parse(cpuSetStats.Mems) - if err != nil { - return fmt.Errorf("parse cpuSetStats failed with error: %v", err) + return fmt.Errorf("GetEffectiveCPUSetWithAbsolutePath failed with error: %v", err) } // start a asynchronous work to execute memory offloading diff --git a/pkg/agent/sysadvisor/plugin/qosaware/reporter/nodemetric_reporter.go b/pkg/agent/sysadvisor/plugin/qosaware/reporter/nodemetric_reporter.go index 9fc0ea933..09bf90ba3 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/reporter/nodemetric_reporter.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/reporter/nodemetric_reporter.go @@ -540,7 +540,11 @@ func (p *nodeMetricsReporterPlugin) getGroupUsage(pods []*v1.Pod, qosLevel strin resourceMetric.CPU = aggCPU } - for numaID, resourceUsages := range numaUsages { + for numaID := 0; numaID < p.metaServer.NumNUMANodes; numaID++ { + resourceUsages, ok := numaUsages[numaID] + if !ok { + continue + } resourceNUMAMetric := nodeapis.ResourceMetric{} cpuUsage := resourceUsages[v1.ResourceCPU] diff --git a/pkg/util/cgroup/common/types.go b/pkg/util/cgroup/common/types.go index 1eeaa4a34..8d87e4baa 100644 --- a/pkg/util/cgroup/common/types.go +++ b/pkg/util/cgroup/common/types.go @@ -184,8 +184,10 @@ type CPUStats struct { // CPUSetStats get cgroup cpuset data type CPUSetStats struct { - CPUs string - Mems string + CPUs string + EffectiveCPUs string + Mems string + EffectiveMems string } // MemoryMetrics get memory cgroup metrics diff --git a/pkg/util/cgroup/manager/cgroup.go b/pkg/util/cgroup/manager/cgroup.go index 4622ea058..0fcea607e 100644 --- a/pkg/util/cgroup/manager/cgroup.go +++ b/pkg/util/cgroup/manager/cgroup.go @@ -21,11 +21,15 @@ import ( "fmt" "io/fs" "math" + "os" "os/exec" "path/filepath" "strconv" + "syscall" "time" + "golang.org/x/sys/unix" + "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/asyncworker" @@ -514,3 +518,50 @@ func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string, return err } + +func IsCgroupPath(path string) bool { + var fstat syscall.Statfs_t + err := syscall.Statfs(path, &fstat) + if err != nil { + general.ErrorS(err, "failed to Statfs", "path", path) + return false + } + return fstat.Type == unix.CGROUP2_SUPER_MAGIC || fstat.Type == unix.CGROUP_SUPER_MAGIC +} + +func GetEffectiveCPUSetWithAbsolutePath(absCgroupPath string) (machine.CPUSet, machine.CPUSet, error) { + if !IsCgroupPath(absCgroupPath) { + return machine.CPUSet{}, machine.CPUSet{}, fmt.Errorf("path %s is not a cgroup", absCgroupPath) + } + + cpusetStat, err := GetCPUSetWithAbsolutePath(absCgroupPath) + if err != nil { + // if controller is disabled, we should walk the parent's dir. + if os.IsNotExist(err) { + return GetEffectiveCPUSetWithAbsolutePath(filepath.Dir(absCgroupPath)) + } + return machine.CPUSet{}, machine.CPUSet{}, err + } + // if the cpus or mems is empty, they will inherit the parent's mask. + cpus, err := machine.Parse(cpusetStat.EffectiveCPUs) + if err != nil { + return machine.CPUSet{}, machine.CPUSet{}, err + } + if cpus.IsEmpty() { + cpus, _, err = GetEffectiveCPUSetWithAbsolutePath(filepath.Dir(absCgroupPath)) + if err != nil { + return machine.CPUSet{}, machine.CPUSet{}, err + } + } + mems, err := machine.Parse(cpusetStat.EffectiveMems) + if err != nil { + return machine.CPUSet{}, machine.CPUSet{}, err + } + if mems.IsEmpty() { + _, mems, err = GetEffectiveCPUSetWithAbsolutePath(filepath.Dir(absCgroupPath)) + if err != nil { + return machine.CPUSet{}, machine.CPUSet{}, err + } + } + return cpus, mems, nil +} diff --git a/pkg/util/cgroup/manager/cgroup_test.go b/pkg/util/cgroup/manager/cgroup_test.go index 9992cd7fa..95b45484f 100644 --- a/pkg/util/cgroup/manager/cgroup_test.go +++ b/pkg/util/cgroup/manager/cgroup_test.go @@ -26,6 +26,8 @@ import ( "math" "os" "path/filepath" + "strings" + "sync" "testing" "bou.ke/monkey" @@ -38,25 +40,32 @@ import ( "github.com/kubewharf/katalyst-core/pkg/util/machine" ) -func TestManager(t *testing.T) { - t.Parallel() - - _ = GetManager() +var mu = sync.Mutex{} - testV1Manager(t) - testV2Manager(t) -} +func TestV1Manager(t *testing.T) { + cgroups.TestMode = true + t.Parallel() + mu.Lock() + defer mu.Unlock() -func testV1Manager(t *testing.T) { - _ = v1.NewManager() + defer monkey.UnpatchAll() + monkey.Patch(common.CheckCgroup2UnifiedMode, func() bool { return false }) + monkey.Patch(GetManager, func() Manager { return v1.NewManager() }) testManager(t, "v1") testNetCls(t, "v1") testMemPressureV1(t) } -func testV2Manager(t *testing.T) { - _ = v2.NewManager() +func TestV2Manager(t *testing.T) { + cgroups.TestMode = true + t.Parallel() + mu.Lock() + defer mu.Unlock() + + defer monkey.UnpatchAll() + monkey.Patch(common.CheckCgroup2UnifiedMode, func() bool { return true }) + monkey.Patch(GetManager, func() Manager { return v2.NewManager() }) testManager(t, "v2") testSwapMax(t) @@ -109,96 +118,60 @@ func testNetCls(t *testing.T, version string) { } func testSwapMax(t *testing.T) { - defer monkey.UnpatchAll() - monkey.Patch(common.CheckCgroup2UnifiedMode, func() bool { return true }) - monkey.Patch(GetManager, func() Manager { return v2.NewManager() }) - monkey.Patch(cgroups.ReadFile, func(dir, file string) (string, error) { - f := filepath.Join(dir, file) - tmp, err := ioutil.ReadFile(f) - if err != nil { - return "", err - } - return string(tmp), nil - }) - monkey.Patch(cgroups.WriteFile, func(dir, file, data string) error { - f := filepath.Join(dir, file) - return ioutil.WriteFile(f, []byte(data), 0o700) - }) - rootDir := os.TempDir() dir := filepath.Join(rootDir, "tmp") err := os.Mkdir(dir, 0o700) assert.NoError(t, err) + defer os.RemoveAll(dir) tmpDir, err := ioutil.TempDir(dir, "fake-cgroup") assert.NoError(t, err) - defer os.RemoveAll(dir) monkey.Patch(common.GetCgroupRootPath, func(s string) string { t.Logf("rootDir=%v", rootDir) return rootDir }) - sawpFile := filepath.Join(tmpDir, "memory.swap.max") - err = ioutil.WriteFile(sawpFile, []byte{}, 0o700) + err = cgroups.WriteFile(tmpDir, "memory.swap.max", "") assert.NoError(t, err) - sawpFile2 := filepath.Join(dir, "memory.swap.max") - err = ioutil.WriteFile(sawpFile2, []byte{}, 0o700) + err = cgroups.WriteFile(dir, "memory.swap.max", "") assert.NoError(t, err) - maxFile := filepath.Join(tmpDir, "memory.max") - err = ioutil.WriteFile(maxFile, []byte("12800"), 0o700) + err = cgroups.WriteFile(tmpDir, "memory.max", "12800") assert.NoError(t, err) - curFile := filepath.Join(tmpDir, "memory.current") - err = ioutil.WriteFile(curFile, []byte("12600"), 0o700) + err = cgroups.WriteFile(tmpDir, "memory.current", "12600") assert.NoError(t, err) err = SetSwapMaxWithAbsolutePathRecursive(tmpDir) assert.NoError(t, err) - s, err := ioutil.ReadFile(sawpFile) + s, err := cgroups.ReadFile(tmpDir, "memory.swap.max") assert.NoError(t, err) - assert.Equal(t, fmt.Sprintf("%v", 200), string(s)) + assert.Equal(t, fmt.Sprintf("%v", 200), s) - s, err = ioutil.ReadFile(sawpFile2) + s, err = cgroups.ReadFile(dir, "memory.swap.max") assert.NoError(t, err) - assert.Equal(t, fmt.Sprintf("%v", math.MaxInt64), string(s)) + assert.Equal(t, fmt.Sprintf("%v", math.MaxInt64), s) err = DisableSwapMaxWithAbsolutePathRecursive(tmpDir) assert.NoError(t, err) - s, err = ioutil.ReadFile(sawpFile) + s, err = cgroups.ReadFile(tmpDir, "memory.swap.max") assert.NoError(t, err) - assert.Equal(t, fmt.Sprintf("%v", 0), string(s)) + assert.Equal(t, fmt.Sprintf("%v", 0), s) } func testMemPressure(t *testing.T) { - defer monkey.UnpatchAll() - monkey.Patch(common.CheckCgroup2UnifiedMode, func() bool { return true }) - monkey.Patch(GetManager, func() Manager { return v2.NewManager() }) - monkey.Patch(cgroups.ReadFile, func(dir, file string) (string, error) { - f := filepath.Join(dir, file) - tmp, err := ioutil.ReadFile(f) - if err != nil { - return "", err - } - return string(tmp), nil - }) - monkey.Patch(cgroups.WriteFile, func(dir, file, data string) error { - f := filepath.Join(dir, file) - return ioutil.WriteFile(f, []byte(data), 0o700) - }) - rootDir := os.TempDir() dir := filepath.Join(rootDir, "tmp") err := os.Mkdir(dir, 0o700) assert.NoError(t, err) + defer os.RemoveAll(dir) tmpDir, err := ioutil.TempDir(dir, "fake-cgroup") assert.NoError(t, err) - defer os.RemoveAll(dir) monkey.Patch(common.GetCgroupRootPath, func(s string) string { t.Logf("rootDir=%v", rootDir) @@ -235,30 +208,14 @@ func testMemPressureV1(t *testing.T) { } func testMemoryOffloadingWithAbsolutePath(t *testing.T) { - defer monkey.UnpatchAll() - monkey.Patch(common.CheckCgroup2UnifiedMode, func() bool { return true }) - monkey.Patch(GetManager, func() Manager { return v2.NewManager() }) - monkey.Patch(cgroups.ReadFile, func(dir, file string) (string, error) { - f := filepath.Join(dir, file) - tmp, err := ioutil.ReadFile(f) - if err != nil { - return "", err - } - return string(tmp), nil - }) - monkey.Patch(cgroups.WriteFile, func(dir, file, data string) error { - f := filepath.Join(dir, file) - return ioutil.WriteFile(f, []byte(data), 0o700) - }) - rootDir := os.TempDir() dir := filepath.Join(rootDir, "tmp") err := os.Mkdir(dir, 0o700) assert.NoError(t, err) + defer os.RemoveAll(dir) tmpDir, err := ioutil.TempDir(dir, "fake-cgroup") assert.NoError(t, err) - defer os.RemoveAll(dir) monkey.Patch(common.GetCgroupRootPath, func(s string) string { t.Logf("rootDir=%v", rootDir) @@ -274,3 +231,214 @@ func testMemoryOffloadingWithAbsolutePath(t *testing.T) { assert.NoError(t, err) assert.Equal(t, fmt.Sprintf("%v\n", 100), string(s)) } + +func TestGetEffectiveCPUSetWithAbsolutePathV1(t *testing.T) { + cgroups.TestMode = true + t.Parallel() + mu.Lock() + defer mu.Unlock() + + defer monkey.UnpatchAll() + monkey.Patch(common.CheckCgroup2UnifiedMode, func() bool { return false }) + monkey.Patch(GetManager, func() Manager { return v1.NewManager() }) + + rootDir := os.TempDir() + dir := filepath.Join(rootDir, "tmp") + err := os.Mkdir(dir, 0o700) + assert.NoError(t, err) + defer os.RemoveAll(dir) + + tmpDir, err := ioutil.TempDir(dir, "fake-cgroup") + assert.NoError(t, err) + + monkey.Patch(IsCgroupPath, func(path string) bool { + return strings.Contains(path, tmpDir) + }) + + monkey.Patch(common.GetCgroupRootPath, func(s string) string { + t.Logf("rootDir=%v", rootDir) + return rootDir + }) + + // tmpDir is root cgroup + err = cgroups.WriteFile(tmpDir, "cpuset.cpus", "0-1") + assert.NoError(t, err) + + err = cgroups.WriteFile(tmpDir, "cpuset.effective_cpus", "0-1") + assert.NoError(t, err) + + err = cgroups.WriteFile(tmpDir, "cpuset.mems", "0-1") + assert.NoError(t, err) + + err = cgroups.WriteFile(tmpDir, "cpuset.effective_mems", "0-1") + assert.NoError(t, err) + + cpus, mems, err := GetEffectiveCPUSetWithAbsolutePath(tmpDir) + assert.NoError(t, err) + assert.Equal(t, "0-1", cpus.String()) + assert.Equal(t, "0-1", mems.String()) + + // cg1 is sub cgroup + cg1 := filepath.Join(tmpDir, "cg1") + err = os.Mkdir(cg1, 0o700) + assert.NoError(t, err) + + err = cgroups.WriteFile(cg1, "cpuset.cpus", "") + assert.NoError(t, err) + + err = cgroups.WriteFile(cg1, "cpuset.effective_cpus", "0") + assert.NoError(t, err) + + err = cgroups.WriteFile(cg1, "cpuset.mems", "") + assert.NoError(t, err) + + err = cgroups.WriteFile(cg1, "cpuset.effective_mems", "") + assert.NoError(t, err) + + cpus, mems, err = GetEffectiveCPUSetWithAbsolutePath(cg1) + assert.NoError(t, err) + assert.Equal(t, "0", cpus.String()) + assert.Equal(t, "0-1", mems.String()) + + // cg2 is sub cgroup + cg2 := filepath.Join(tmpDir, "cg2") + err = os.Mkdir(cg2, 0o700) + assert.NoError(t, err) + + err = cgroups.WriteFile(cg2, "cpuset.cpus", "") + assert.NoError(t, err) + + err = cgroups.WriteFile(cg2, "cpuset.effective_cpus", "") + assert.NoError(t, err) + + err = cgroups.WriteFile(cg2, "cpuset.mems", "") + assert.NoError(t, err) + + err = cgroups.WriteFile(cg2, "cpuset.effective_mems", "0") + assert.NoError(t, err) + + cpus, mems, err = GetEffectiveCPUSetWithAbsolutePath(cg2) + assert.NoError(t, err) + assert.Equal(t, "0-1", cpus.String()) + assert.Equal(t, "0", mems.String()) + + // not existed dir + _, _, err = GetEffectiveCPUSetWithAbsolutePath("xxx") + assert.Error(t, err) +} + +func TestGetEffectiveCPUSetWithAbsolutePathV2(t *testing.T) { + cgroups.TestMode = true + t.Parallel() + mu.Lock() + defer mu.Unlock() + + defer monkey.UnpatchAll() + monkey.Patch(common.CheckCgroup2UnifiedMode, func() bool { return true }) + monkey.Patch(GetManager, func() Manager { return v2.NewManager() }) + + rootDir := os.TempDir() + dir := filepath.Join(rootDir, "tmp") + err := os.Mkdir(dir, 0o700) + assert.NoError(t, err) + defer os.RemoveAll(dir) + + tmpDir, err := ioutil.TempDir(dir, "fake-cgroup") + assert.NoError(t, err) + + monkey.Patch(IsCgroupPath, func(path string) bool { + return strings.Contains(path, tmpDir) + }) + + monkey.Patch(common.GetCgroupRootPath, func(s string) string { + t.Logf("rootDir=%v", rootDir) + return rootDir + }) + + // tmpDir is root cgroup + err = cgroups.WriteFile(tmpDir, "cpuset.cpus", "0-1") + assert.NoError(t, err) + + err = cgroups.WriteFile(tmpDir, "cpuset.cpus.effective", "0-1") + assert.NoError(t, err) + + err = cgroups.WriteFile(tmpDir, "cpuset.mems", "0-1") + assert.NoError(t, err) + + err = cgroups.WriteFile(tmpDir, "cpuset.mems.effective", "0-1") + assert.NoError(t, err) + + cpus, mems, err := GetEffectiveCPUSetWithAbsolutePath(tmpDir) + assert.NoError(t, err) + assert.Equal(t, "0-1", cpus.String()) + assert.Equal(t, "0-1", mems.String()) + + // cg1 is sub cgroup + cg1 := filepath.Join(tmpDir, "cg1") + err = os.Mkdir(cg1, 0o700) + assert.NoError(t, err) + + err = cgroups.WriteFile(cg1, "cpuset.cpus", "") + assert.NoError(t, err) + + err = cgroups.WriteFile(cg1, "cpuset.cpus.effective", "0") + assert.NoError(t, err) + + err = cgroups.WriteFile(cg1, "cpuset.mems", "") + assert.NoError(t, err) + + err = cgroups.WriteFile(cg1, "cpuset.mems.effective", "") + assert.NoError(t, err) + + cpus, mems, err = GetEffectiveCPUSetWithAbsolutePath(cg1) + assert.NoError(t, err) + assert.Equal(t, "0", cpus.String()) + assert.Equal(t, "0-1", mems.String()) + + // cg2 is sub cgroup + cg2 := filepath.Join(tmpDir, "cg2") + err = os.Mkdir(cg2, 0o700) + assert.NoError(t, err) + + err = cgroups.WriteFile(cg2, "cpuset.cpus", "") + assert.NoError(t, err) + + err = cgroups.WriteFile(cg2, "cpuset.cpus.effective", "") + assert.NoError(t, err) + + err = cgroups.WriteFile(cg2, "cpuset.mems", "") + assert.NoError(t, err) + + err = cgroups.WriteFile(cg2, "cpuset.mems.effective", "0") + assert.NoError(t, err) + + cpus, mems, err = GetEffectiveCPUSetWithAbsolutePath(cg2) + assert.NoError(t, err) + assert.Equal(t, "0-1", cpus.String()) + assert.Equal(t, "0", mems.String()) + + // not existed dir + _, _, err = GetEffectiveCPUSetWithAbsolutePath("xxx") + assert.Error(t, err) + + // cg3's cpuset controller is disabled + cg3 := filepath.Join(tmpDir, "cg3") + err = os.Mkdir(cg3, 0o700) + assert.NoError(t, err) + cpus, mems, err = GetEffectiveCPUSetWithAbsolutePath(cg3) + assert.NoError(t, err) + assert.Equal(t, "0-1", cpus.String()) + assert.Equal(t, "0-1", mems.String()) +} + +func TestIsCgroup(t *testing.T) { + t.Parallel() + mu.Lock() + defer mu.Unlock() + + mounts, err := cgroups.GetCgroupMounts(true) + assert.NoError(t, err) + for _, mount := range mounts { + assert.True(t, IsCgroupPath(mount.Mountpoint)) + } +} diff --git a/pkg/util/cgroup/manager/v1/fs_linux.go b/pkg/util/cgroup/manager/v1/fs_linux.go index d5cc807ee..54848b387 100644 --- a/pkg/util/cgroup/manager/v1/fs_linux.go +++ b/pkg/util/cgroup/manager/v1/fs_linux.go @@ -308,12 +308,20 @@ func (m *manager) GetCPUSet(absCgroupPath string) (*common.CPUSetStats, error) { var err error cpusetStats.CPUs, err = fscommon.GetCgroupParamString(absCgroupPath, "cpuset.cpus") if err != nil { - return nil, fmt.Errorf("read cpuset.cpus failed with error: %v", err) + return nil, err + } + cpusetStats.EffectiveCPUs, err = fscommon.GetCgroupParamString(absCgroupPath, "cpuset.effective_cpus") + if err != nil { + return nil, err } cpusetStats.Mems, err = fscommon.GetCgroupParamString(absCgroupPath, "cpuset.mems") if err != nil { - return nil, fmt.Errorf("read cpuset.mems failed with error: %v", err) + return nil, err + } + cpusetStats.EffectiveMems, err = fscommon.GetCgroupParamString(absCgroupPath, "cpuset.effective_mems") + if err != nil { + return nil, err } return cpusetStats, nil diff --git a/pkg/util/cgroup/manager/v2/fs_linux.go b/pkg/util/cgroup/manager/v2/fs_linux.go index c6998cdb3..7337ee71f 100644 --- a/pkg/util/cgroup/manager/v2/fs_linux.go +++ b/pkg/util/cgroup/manager/v2/fs_linux.go @@ -432,12 +432,20 @@ func (m *manager) GetCPUSet(absCgroupPath string) (*common.CPUSetStats, error) { var err error cpusetStats.CPUs, err = fscommon.GetCgroupParamString(absCgroupPath, "cpuset.cpus") if err != nil { - return nil, fmt.Errorf("read cpuset.cpus failed with error: %v", err) + return nil, err + } + cpusetStats.EffectiveCPUs, err = fscommon.GetCgroupParamString(absCgroupPath, "cpuset.cpus.effective") + if err != nil { + return nil, err } cpusetStats.Mems, err = fscommon.GetCgroupParamString(absCgroupPath, "cpuset.mems") if err != nil { - return nil, fmt.Errorf("read cpuset.mems failed with error: %v", err) + return nil, err + } + cpusetStats.EffectiveMems, err = fscommon.GetCgroupParamString(absCgroupPath, "cpuset.mems.effective") + if err != nil { + return nil, err } return cpusetStats, nil