From 2fa64f4d8c3d593229571834473dbf222c0d4b8f Mon Sep 17 00:00:00 2001 From: luomingmeng Date: Fri, 8 Nov 2024 15:04:38 +0800 Subject: [PATCH] chore(qrm): refactor cpu/memory read-only state and add read-write state --- .../qrm-plugins/cpu/dynamicpolicy/policy.go | 22 +------- .../cpu/dynamicpolicy/policy_test.go | 9 ---- .../cpu/dynamicpolicy/state/state.go | 51 +++++++++++++++++++ .../cpu/dynamicpolicy/state/state_test.go | 19 +++++++ .../qrm-plugins/cpu/nativepolicy/policy.go | 22 +------- .../cpu/nativepolicy/policy_test.go | 9 ---- .../memory/dynamicpolicy/policy.go | 22 +------- .../memory/dynamicpolicy/policy_test.go | 10 ---- .../memory/dynamicpolicy/state/state.go | 51 +++++++++++++++++++ .../memory/dynamicpolicy/state/state_test.go | 43 ++++++++++++++++ 10 files changed, 170 insertions(+), 88 deletions(-) create mode 100644 pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go index 4458f4712..171047239 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go @@ -71,23 +71,6 @@ const ( healthCheckTolerationTimes = 3 ) -var ( - readonlyStateLock sync.RWMutex - readonlyState state.ReadonlyState -) - -// GetReadonlyState returns state.ReadonlyState to provides a way -// to obtain the running states of the plugin -func GetReadonlyState() (state.ReadonlyState, error) { - readonlyStateLock.RLock() - defer readonlyStateLock.RUnlock() - - if readonlyState == nil { - return nil, fmt.Errorf("readonlyState isn't set") - } - return readonlyState, nil -} - // DynamicPolicy is the policy that's used by default; // it will consider the dynamic running information to calculate // and adjust resource requirements and configurations @@ -150,9 +133,8 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr) } - readonlyStateLock.Lock() - readonlyState = stateImpl - readonlyStateLock.Unlock() + state.SetReadonlyState(stateImpl) + state.SetReadWriteState(stateImpl) wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(agentName, metrics.MetricTag{ Key: util.QRMPluginPolicyTagName, diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go index 42c599a5f..51eafa946 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go @@ -4614,15 +4614,6 @@ func entriesMatch(entries1, entries2 state.PodEntries) (bool, error) { return true, nil } -func TestGetReadonlyState(t *testing.T) { - t.Parallel() - - as := require.New(t) - readonlyState, err := GetReadonlyState() - as.NotNil(err) - as.Nil(readonlyState) -} - func TestClearResidualState(t *testing.T) { t.Parallel() diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go index 22befed02..00c68b258 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go @@ -19,6 +19,7 @@ package state import ( "encoding/json" "fmt" + "sync" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -562,3 +563,53 @@ type ReadonlyState interface { } type GenerateMachineStateFromPodEntriesFunc func(topology *machine.CPUTopology, podEntries PodEntries) (NUMANodeMap, error) + +var ( + readonlyStateLock sync.RWMutex + readonlyState ReadonlyState +) + +// GetReadonlyState retrieves the readonlyState in a thread-safe manner. +// Returns an error if readonlyState is not set. +func GetReadonlyState() (ReadonlyState, error) { + readonlyStateLock.RLock() + defer readonlyStateLock.RUnlock() + + if readonlyState == nil { + return nil, fmt.Errorf("readonlyState isn't set") + } + return readonlyState, nil +} + +// SetReadonlyState updates the readonlyState in a thread-safe manner. +func SetReadonlyState(state ReadonlyState) { + readonlyStateLock.Lock() + defer readonlyStateLock.Unlock() + + readonlyState = state +} + +var ( + readWriteStateLock sync.RWMutex + readWriteState State +) + +// GetReadWriteState retrieves the readWriteState in a thread-safe manner. +// Returns an error if readWriteState is not set. +func GetReadWriteState() (State, error) { + readWriteStateLock.RLock() + defer readWriteStateLock.RUnlock() + + if readWriteState == nil { + return nil, fmt.Errorf("readWriteState isn't set") + } + return readWriteState, nil +} + +// SetReadWriteState updates the readWriteState in a thread-safe manner. +func SetReadWriteState(state State) { + readWriteStateLock.Lock() + defer readWriteStateLock.Unlock() + + readWriteState = state +} diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go index 711e38efa..526cd9041 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go @@ -3233,3 +3233,22 @@ func TestGetAvailableCPUQuantity(t *testing.T) { cpuQuantity := int(math.Ceil(nodeState.GetAvailableCPUQuantity(machine.NewCPUSet()))) require.Equal(t, 15, cpuQuantity) } + +func TestGetReadonlyState(t *testing.T) { + t.Parallel() + + as := require.New(t) + state, err := GetReadonlyState() + as.NotNil(err) + as.Nil(state) +} + +func TestGetWriteOnlyState(t *testing.T) { + t.Parallel() + + as := require.New(t) + state, err := GetReadWriteState() + if state == nil { + as.NotNil(err) + } +} diff --git a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go index 823d28af5..1c858a805 100644 --- a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go +++ b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go @@ -55,23 +55,6 @@ const ( maxResidualTime = 5 * time.Minute ) -var ( - readonlyStateLock sync.RWMutex - readonlyState state.ReadonlyState -) - -// GetReadonlyState returns state.ReadonlyState to provides a way -// to obtain the running states of the plugin -func GetReadonlyState() (state.ReadonlyState, error) { - readonlyStateLock.RLock() - defer readonlyStateLock.RUnlock() - - if readonlyState == nil { - return nil, fmt.Errorf("readonlyState isn't setted") - } - return readonlyState, nil -} - // NativePolicy is a policy compatible with Kubernetes native semantics and is used in topology-aware scheduling scenarios. type NativePolicy struct { sync.RWMutex @@ -115,9 +98,8 @@ func NewNativePolicy(agentCtx *agent.GenericContext, conf *config.Configuration, return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr) } - readonlyStateLock.Lock() - readonlyState = stateImpl - readonlyStateLock.Unlock() + state.SetReadonlyState(stateImpl) + state.SetReadWriteState(stateImpl) wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(agentName, metrics.MetricTag{ Key: util.QRMPluginPolicyTagName, diff --git a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go index 5271b1f00..318f8dbe3 100644 --- a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go @@ -390,15 +390,6 @@ func TestAllocateForPod(t *testing.T) { _ = os.RemoveAll(tmpDir) } -func TestGetReadonlyState(t *testing.T) { - t.Parallel() - - as := require.New(t) - readonlyState, err := GetReadonlyState() - as.NotNil(err) - as.Nil(readonlyState) -} - func TestClearResidualState(t *testing.T) { t.Parallel() diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go index f91e33883..802916eae 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go @@ -90,23 +90,6 @@ const ( movePagesWorkLimit = 2 ) -var ( - readonlyStateLock sync.RWMutex - readonlyState state.ReadonlyState -) - -// GetReadonlyState returns state.ReadonlyState to provides a way -// to obtain the running states of the plugin -func GetReadonlyState() (state.ReadonlyState, error) { - readonlyStateLock.RLock() - defer readonlyStateLock.RUnlock() - - if readonlyState == nil { - return nil, fmt.Errorf("readonlyState isn't setted") - } - return readonlyState, nil -} - type DynamicPolicy struct { sync.RWMutex @@ -193,9 +176,8 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration general.Infof("empty ExtraControlKnobConfigFile, initialize empty extraControlKnobConfigs") } - readonlyStateLock.Lock() - readonlyState = stateImpl - readonlyStateLock.Unlock() + state.SetReadonlyState(stateImpl) + state.SetReadWriteState(stateImpl) wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(agentName, metrics.MetricTag{ Key: util.QRMPluginPolicyTagName, diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go index 77f1ef8f0..7cd3485ac 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go @@ -1953,16 +1953,6 @@ func TestGetResourcesAllocation(t *testing.T) { }, resp5.PodResources[req.PodUid].ContainerResources[testName].ResourceAllocation[string(v1.ResourceMemory)]) } -func TestGetReadonlyState(t *testing.T) { - t.Parallel() - - as := require.New(t) - readonlyState, err := GetReadonlyState() - if readonlyState == nil { - as.NotNil(err) - } -} - func TestGenerateResourcesMachineStateFromPodEntries(t *testing.T) { t.Parallel() diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state.go index 5914f9c5c..359732551 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state.go @@ -19,6 +19,7 @@ package state import ( "encoding/json" "fmt" + "sync" info "github.com/google/cadvisor/info/v1" v1 "k8s.io/api/core/v1" @@ -384,3 +385,53 @@ type State interface { writer ReadonlyState } + +var ( + readonlyStateLock sync.RWMutex + readonlyState ReadonlyState +) + +// GetReadonlyState retrieves the readonlyState in a thread-safe manner. +// Returns an error if readonlyState is not set. +func GetReadonlyState() (ReadonlyState, error) { + readonlyStateLock.RLock() + defer readonlyStateLock.RUnlock() + + if readonlyState == nil { + return nil, fmt.Errorf("readonlyState isn't setted") + } + return readonlyState, nil +} + +// SetReadonlyState updates the readonlyState in a thread-safe manner. +func SetReadonlyState(state ReadonlyState) { + readonlyStateLock.Lock() + defer readonlyStateLock.Unlock() + + readonlyState = state +} + +var ( + readWriteStateLock sync.RWMutex + readWriteState State +) + +// GetReadWriteState retrieves the readWriteState in a thread-safe manner. +// Returns an error if readWriteState is not set. +func GetReadWriteState() (State, error) { + readWriteStateLock.RLock() + defer readWriteStateLock.RUnlock() + + if readWriteState == nil { + return nil, fmt.Errorf("readWriteState isn't set") + } + return readWriteState, nil +} + +// SetReadWriteState updates the readWriteState in a thread-safe manner. +func SetReadWriteState(state State) { + readWriteStateLock.Lock() + defer readWriteStateLock.Unlock() + + readWriteState = state +} diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go new file mode 100644 index 000000000..a3e8c91d5 --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go @@ -0,0 +1,43 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetReadonlyState(t *testing.T) { + t.Parallel() + + as := require.New(t) + state, err := GetReadonlyState() + if state == nil { + as.NotNil(err) + } +} + +func TestGetWriteOnlyState(t *testing.T) { + t.Parallel() + + as := require.New(t) + state, err := GetReadWriteState() + if state == nil { + as.NotNil(err) + } +}