Skip to content

Commit

Permalink
refactor(qrm): refactor the state module for enhanced extensibility
Browse files Browse the repository at this point in the history
  • Loading branch information
xu282934741 committed Sep 11, 2024
1 parent fcb8f49 commit e24d704
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func makeState(topo *machine.CPUTopology) (qrmstate.State, error) {
if err != nil {
return nil, fmt.Errorf("make tmp dir for checkpoint failed with error: %v", err)
}
return qrmstate.NewCheckpointState(tmpDir, "test", "test", topo, false)
return qrmstate.NewCheckpointState(tmpDir, "test", "test", topo, false, qrmstate.GenerateMachineStateFromPodEntries)
}

func TestNewCPUPressureEviction(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func makeState(topo *machine.CPUTopology) (qrmstate.State, error) {
if err != nil {
return nil, fmt.Errorf("make tmp dir for checkpoint failed with error: %v", err)
}
return qrmstate.NewCheckpointState(tmpDir, "test", "test", topo, false)
return qrmstate.NewCheckpointState(tmpDir, "test", "test", topo, false, qrmstate.GenerateMachineStateFromPodEntries)
}

func TestNewCPUPressureLoadEviction(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
}

stateImpl, stateErr := state.NewCheckpointState(conf.GenericQRMPluginConfiguration.StateFileDirectory, cpuPluginStateFileName,
cpuconsts.CPUResourcePluginPolicyNameDynamic, agentCtx.CPUTopology, conf.SkipCPUStateCorruption)
cpuconsts.CPUResourcePluginPolicyNameDynamic, agentCtx.CPUTopology, conf.SkipCPUStateCorruption, state.GenerateMachineStateFromPodEntries)
if stateErr != nil {
return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
Expand All @@ -53,6 +52,7 @@ import (
cgroupcm "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
cgroupcmutils "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
"github.com/stretchr/testify/require"
)

const (
Expand Down Expand Up @@ -82,7 +82,7 @@ func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, state
}

func getTestDynamicPolicyWithoutInitialization(topology *machine.CPUTopology, stateFileDirectory string) (*DynamicPolicy, error) {
stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuPluginStateFileName, cpuconsts.CPUResourcePluginPolicyNameDynamic, topology, false)
stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuPluginStateFileName, cpuconsts.CPUResourcePluginPolicyNameDynamic, topology, false, state.GenerateMachineStateFromPodEntries)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,3 +749,5 @@ type State interface {
type ReadonlyState interface {
reader
}

type GenerateMachineStateFromPodEntriesFunc func(topology *machine.CPUTopology, podEntries PodEntries) (NUMANodeMap, error)
22 changes: 12 additions & 10 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,37 @@ type stateCheckpoint struct {
checkpointName string
// when we add new properties to checkpoint,
// it will cause checkpoint corruption, and we should skip it
skipStateCorruption bool
skipStateCorruption bool
GenerateMachineStateFromPodEntries GenerateMachineStateFromPodEntriesFunc
}

var _ State = &stateCheckpoint{}

func NewCheckpointState(stateDir, checkpointName, policyName string,
topology *machine.CPUTopology, skipStateCorruption bool,
topology *machine.CPUTopology, skipStateCorruption bool, generateMachineStateFunc GenerateMachineStateFromPodEntriesFunc,
) (State, error) {
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}

sc := &stateCheckpoint{
cache: NewCPUPluginState(topology),
policyName: policyName,
checkpointManager: checkpointManager,
checkpointName: checkpointName,
skipStateCorruption: skipStateCorruption,
cache: NewCPUPluginState(topology),
policyName: policyName,
checkpointManager: checkpointManager,
checkpointName: checkpointName,
skipStateCorruption: skipStateCorruption,
GenerateMachineStateFromPodEntries: generateMachineStateFunc,
}

if err := sc.restoreState(topology); err != nil {
if err := sc.RestoreState(topology); err != nil {
return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete "+
"the cpu plugin checkpoint file %q before restarting Kubelet", err, path.Join(stateDir, checkpointName))
}
return sc, nil
}

func (sc *stateCheckpoint) restoreState(topology *machine.CPUTopology) error {
func (sc *stateCheckpoint) RestoreState(topology *machine.CPUTopology) error {
sc.Lock()
defer sc.Unlock()
var err error
Expand All @@ -94,7 +96,7 @@ func (sc *stateCheckpoint) restoreState(topology *machine.CPUTopology) error {
return fmt.Errorf("[cpu_plugin] configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName)
}

generatedMachineState, err := GenerateMachineStateFromPodEntries(topology, checkpoint.PodEntries, sc.policyName)
generatedMachineState, err := sc.GenerateMachineStateFromPodEntries(topology, checkpoint.PodEntries)
if err != nil {
return fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1449,15 +1449,15 @@ func TestNewCheckpointState(t *testing.T) {
require.NoError(t, cpm.CreateCheckpoint(cpuPluginStateFileName, checkpoint), "could not create testing checkpoint")
}

restoredState, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, cpuTopology, false)
restoredState, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, cpuTopology, false, GenerateMachineStateFromPodEntries)
if strings.TrimSpace(tc.expectedError) != "" {
require.Error(t, err)
require.Contains(t, err.Error(), "could not restore state from checkpoint:")
require.Contains(t, err.Error(), tc.expectedError)

// test skip corruption
if strings.Contains(err.Error(), "checkpoint is corrupted") {
_, err = NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, cpuTopology, true)
_, err = NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, cpuTopology, true, GenerateMachineStateFromPodEntries)
require.Nil(t, err)
}
} else {
Expand Down Expand Up @@ -1945,15 +1945,15 @@ func TestClearState(t *testing.T) {
}
defer os.RemoveAll(testingDir)

state1, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false)
state1, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false, GenerateMachineStateFromPodEntries)
as.Nil(err)

state1.ClearState()

state1.SetMachineState(tc.machineState)
state1.SetPodEntries(tc.podEntries)

state2, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false)
state2, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false, GenerateMachineStateFromPodEntries)
as.Nil(err)
assertStateEqual(t, state2, state1)
})
Expand Down Expand Up @@ -2438,7 +2438,7 @@ func TestCheckpointStateHelpers(t *testing.T) {
t.Run(tc.description, func(t *testing.T) {
t.Parallel()

state, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false)
state, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false, GenerateMachineStateFromPodEntries)
as.Nil(err)

state.ClearState()
Expand Down
130 changes: 67 additions & 63 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"k8s.io/klog/v2"

apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
Expand Down Expand Up @@ -187,68 +186,9 @@ func GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries PodEntries, n
return CPUPreciseCeil(reqFloat64)
}

// GenerateMachineStateFromPodEntries returns NUMANodeMap for given resource based on
// machine info and reserved resources along with existed pod entries
func GenerateMachineStateFromPodEntries(topology *machine.CPUTopology, podEntries PodEntries, policyName string) (NUMANodeMap, error) {
if topology == nil {
return nil, fmt.Errorf("GenerateMachineStateFromPodEntries got nil topology")
}

machineState := make(NUMANodeMap)
for _, numaNode := range topology.CPUDetails.NUMANodes().ToSliceInt64() {
numaNodeState := &NUMANodeState{}
numaNodeAllCPUs := topology.CPUDetails.CPUsInNUMANodes(int(numaNode)).Clone()
allocatedCPUsInNumaNode := machine.NewCPUSet()

for podUID, containerEntries := range podEntries {
if containerEntries.IsPoolEntry() {
continue
}
for containerName, allocationInfo := range containerEntries {
if allocationInfo == nil {
general.Warningf("nil allocationInfo in podEntries")
continue
}

// the container hasn't cpuset assignment in the current NUMA node
if allocationInfo.OriginalTopologyAwareAssignments[int(numaNode)].Size() == 0 &&
allocationInfo.TopologyAwareAssignments[int(numaNode)].Size() == 0 {
continue
}

switch policyName {
case consts.CPUResourcePluginPolicyNameDynamic:
// only modify allocated and default properties in NUMA node state if the policy is dynamic and the entry indicates numa_binding.
// shared_cores with numa_binding also contributes to numaNodeState.AllocatedCPUSet,
// it's convenient that we can skip NUMA with AllocatedCPUSet > 0 when allocating CPUs for dedicated_cores with numa_binding.
if CheckNUMABinding(allocationInfo) {
allocatedCPUsInNumaNode = allocatedCPUsInNumaNode.Union(allocationInfo.OriginalTopologyAwareAssignments[int(numaNode)])
}
case consts.CPUResourcePluginPolicyNameNative:
// only modify allocated and default properties in NUMA node state if the policy is native and the QoS class is Guaranteed
if CheckDedicatedPool(allocationInfo) {
allocatedCPUsInNumaNode = allocatedCPUsInNumaNode.Union(allocationInfo.OriginalTopologyAwareAssignments[int(numaNode)])
}
}

topologyAwareAssignments, _ := machine.GetNumaAwareAssignments(topology, allocationInfo.AllocationResult.Intersection(numaNodeAllCPUs))
originalTopologyAwareAssignments, _ := machine.GetNumaAwareAssignments(topology, allocationInfo.OriginalAllocationResult.Intersection(numaNodeAllCPUs))

numaNodeAllocationInfo := allocationInfo.Clone()
numaNodeAllocationInfo.AllocationResult = allocationInfo.AllocationResult.Intersection(numaNodeAllCPUs)
numaNodeAllocationInfo.OriginalAllocationResult = allocationInfo.OriginalAllocationResult.Intersection(numaNodeAllCPUs)
numaNodeAllocationInfo.TopologyAwareAssignments = topologyAwareAssignments
numaNodeAllocationInfo.OriginalTopologyAwareAssignments = originalTopologyAwareAssignments

numaNodeState.SetAllocationInfo(podUID, containerName, numaNodeAllocationInfo)
}
}

numaNodeState.AllocatedCPUSet = allocatedCPUsInNumaNode.Clone()
numaNodeState.DefaultCPUSet = numaNodeAllCPUs.Difference(numaNodeState.AllocatedCPUSet)
machineState[int(numaNode)] = numaNodeState
}
return machineState, nil
// GenerateMachineStateFromPodEntries for dynamic policy
func GenerateMachineStateFromPodEntries(topology *machine.CPUTopology, podEntries PodEntries) (NUMANodeMap, error) {
return GenerateMachineStateFromPodEntriesByPolicy(topology, podEntries, cpuconsts.CPUResourcePluginPolicyNameDynamic)
}

func IsIsolationPool(poolName string) bool {
Expand Down Expand Up @@ -490,3 +430,67 @@ func checkCPUSetMap(map1, map2 map[int]machine.CPUSet) bool {
func CPUPreciseCeil(request float64) int {
return int(math.Ceil(float64(int(request*1000)) / 1000))
}

// GenerateMachineStateFromPodEntriesByPolicy returns NUMANodeMap for given resource based on
// machine info and reserved resources along with existed pod entries and policy name
func GenerateMachineStateFromPodEntriesByPolicy(topology *machine.CPUTopology, podEntries PodEntries, policyName string) (NUMANodeMap, error) {
if topology == nil {
return nil, fmt.Errorf("GenerateMachineStateFromPodEntriesByPolicy got nil topology")
}

machineState := make(NUMANodeMap)
for _, numaNode := range topology.CPUDetails.NUMANodes().ToSliceInt64() {
numaNodeState := &NUMANodeState{}
numaNodeAllCPUs := topology.CPUDetails.CPUsInNUMANodes(int(numaNode)).Clone()
allocatedCPUsInNumaNode := machine.NewCPUSet()

for podUID, containerEntries := range podEntries {
if containerEntries.IsPoolEntry() {
continue
}
for containerName, allocationInfo := range containerEntries {
if allocationInfo == nil {
general.Warningf("nil allocationInfo in podEntries")
continue
}

// the container hasn't cpuset assignment in the current NUMA node
if allocationInfo.OriginalTopologyAwareAssignments[int(numaNode)].Size() == 0 &&
allocationInfo.TopologyAwareAssignments[int(numaNode)].Size() == 0 {
continue
}

switch policyName {
case cpuconsts.CPUResourcePluginPolicyNameDynamic:
// only modify allocated and default properties in NUMA node state if the policy is dynamic and the entry indicates numa_binding.
// shared_cores with numa_binding also contributes to numaNodeState.AllocatedCPUSet,
// it's convenient that we can skip NUMA with AllocatedCPUSet > 0 when allocating CPUs for dedicated_cores with numa_binding.
if CheckNUMABinding(allocationInfo) {
allocatedCPUsInNumaNode = allocatedCPUsInNumaNode.Union(allocationInfo.OriginalTopologyAwareAssignments[int(numaNode)])
}
case cpuconsts.CPUResourcePluginPolicyNameNative:
// only modify allocated and default properties in NUMA node state if the policy is native and the QoS class is Guaranteed
if CheckDedicatedPool(allocationInfo) {
allocatedCPUsInNumaNode = allocatedCPUsInNumaNode.Union(allocationInfo.OriginalTopologyAwareAssignments[int(numaNode)])
}
}

topologyAwareAssignments, _ := machine.GetNumaAwareAssignments(topology, allocationInfo.AllocationResult.Intersection(numaNodeAllCPUs))
originalTopologyAwareAssignments, _ := machine.GetNumaAwareAssignments(topology, allocationInfo.OriginalAllocationResult.Intersection(numaNodeAllCPUs))

numaNodeAllocationInfo := allocationInfo.Clone()
numaNodeAllocationInfo.AllocationResult = allocationInfo.AllocationResult.Intersection(numaNodeAllCPUs)
numaNodeAllocationInfo.OriginalAllocationResult = allocationInfo.OriginalAllocationResult.Intersection(numaNodeAllCPUs)
numaNodeAllocationInfo.TopologyAwareAssignments = topologyAwareAssignments
numaNodeAllocationInfo.OriginalTopologyAwareAssignments = originalTopologyAwareAssignments

numaNodeState.SetAllocationInfo(podUID, containerName, numaNodeAllocationInfo)
}
}

numaNodeState.AllocatedCPUSet = allocatedCPUsInNumaNode.Clone()
numaNodeState.DefaultCPUSet = numaNodeAllCPUs.Difference(numaNodeState.AllocatedCPUSet)
machineState[int(numaNode)] = numaNodeState
}
return machineState, nil
}
2 changes: 1 addition & 1 deletion pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func TestGenerateCPUMachineStateByPodEntries(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "checkpoint-TestGenerateCPUMachineStateByPodEntries")
as.Nil(err)

machineState, err := GenerateMachineStateFromPodEntries(tc.cpuTopology, tc.podEntries, cpuconsts.CPUResourcePluginPolicyNameDynamic)
machineState, err := GenerateMachineStateFromPodEntries(tc.cpuTopology, tc.podEntries)
as.Nil(err)

as.Equalf(tc.expectedMachineState, machineState, "failed in test case: %s", tc.description)
Expand Down
3 changes: 1 addition & 2 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"

apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
Expand All @@ -38,7 +37,7 @@ func getProportionalSize(oldPoolSize, oldTotalSize, newTotalSize int, ceil bool)
}

func generateMachineStateFromPodEntries(topology *machine.CPUTopology, podEntries state.PodEntries) (state.NUMANodeMap, error) {
return state.GenerateMachineStateFromPodEntries(topology, podEntries, cpuconsts.CPUResourcePluginPolicyNameDynamic)
return state.GenerateMachineStateFromPodEntries(topology, podEntries)
}

// updateAllocationInfoByReq updates allocationInfo by latest req when admitting active pod,
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewNativePolicy(agentCtx *agent.GenericContext, conf *config.Configuration,
general.Infof("new native policy")

stateImpl, stateErr := state.NewCheckpointState(conf.GenericQRMPluginConfiguration.StateFileDirectory, cpuPluginStateFileName,
cpuconsts.CPUResourcePluginPolicyNameNative, agentCtx.CPUTopology, conf.SkipCPUStateCorruption)
cpuconsts.CPUResourcePluginPolicyNameNative, agentCtx.CPUTopology, conf.SkipCPUStateCorruption, nativepolicyutil.GenerateMachineStateFromPodEntries)
if stateErr != nil {
return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
nativepolicyutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/nativepolicy/util"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
"github.com/kubewharf/katalyst-core/pkg/metrics"
Expand All @@ -42,7 +43,7 @@ const (

func getTestNativePolicy(topology *machine.CPUTopology, stateFileDirectory string) (*NativePolicy, error) {
stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuPluginStateFileName,
cpuconsts.CPUResourcePluginPolicyNameNative, topology, false)
cpuconsts.CPUResourcePluginPolicyNameNative, topology, false, nativepolicyutil.GenerateMachineStateFromPodEntries)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/qrm-plugins/cpu/nativepolicy/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)

// GenerateMachineStateFromPodEntries for native policy
func GenerateMachineStateFromPodEntries(topology *machine.CPUTopology, podEntries state.PodEntries) (state.NUMANodeMap, error) {
return state.GenerateMachineStateFromPodEntries(topology, podEntries, cpuconsts.CPUResourcePluginPolicyNameNative)
return state.GenerateMachineStateFromPodEntriesByPolicy(topology, podEntries, cpuconsts.CPUResourcePluginPolicyNameNative)
}

0 comments on commit e24d704

Please sign in to comment.