Skip to content

Commit

Permalink
chore(qrm): refactor cpu/memory read-only state and add read-write state
Browse files Browse the repository at this point in the history
  • Loading branch information
luomingmeng committed Nov 27, 2024
1 parent 32a3253 commit 2fa64f4
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 88 deletions.
22 changes: 2 additions & 20 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,6 @@ const (
healthCheckTolerationTimes = 3
)

var (
readonlyStateLock sync.RWMutex
readonlyState state.ReadonlyState
)

// GetReadonlyState returns state.ReadonlyState to provides a way
// to obtain the running states of the plugin
func GetReadonlyState() (state.ReadonlyState, error) {
readonlyStateLock.RLock()
defer readonlyStateLock.RUnlock()

if readonlyState == nil {
return nil, fmt.Errorf("readonlyState isn't set")
}
return readonlyState, nil
}

// DynamicPolicy is the policy that's used by default;
// it will consider the dynamic running information to calculate
// and adjust resource requirements and configurations
Expand Down Expand Up @@ -150,9 +133,8 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr)
}

readonlyStateLock.Lock()
readonlyState = stateImpl
readonlyStateLock.Unlock()
state.SetReadonlyState(stateImpl)
state.SetReadWriteState(stateImpl)

wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(agentName, metrics.MetricTag{
Key: util.QRMPluginPolicyTagName,
Expand Down
9 changes: 0 additions & 9 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4614,15 +4614,6 @@ func entriesMatch(entries1, entries2 state.PodEntries) (bool, error) {
return true, nil
}

func TestGetReadonlyState(t *testing.T) {
t.Parallel()

as := require.New(t)
readonlyState, err := GetReadonlyState()
as.NotNil(err)
as.Nil(readonlyState)
}

func TestClearResidualState(t *testing.T) {
t.Parallel()

Expand Down
51 changes: 51 additions & 0 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package state
import (
"encoding/json"
"fmt"
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -562,3 +563,53 @@ type ReadonlyState interface {
}

type GenerateMachineStateFromPodEntriesFunc func(topology *machine.CPUTopology, podEntries PodEntries) (NUMANodeMap, error)

var (
readonlyStateLock sync.RWMutex
readonlyState ReadonlyState
)

// GetReadonlyState retrieves the readonlyState in a thread-safe manner.
// Returns an error if readonlyState is not set.
func GetReadonlyState() (ReadonlyState, error) {
readonlyStateLock.RLock()
defer readonlyStateLock.RUnlock()

if readonlyState == nil {
return nil, fmt.Errorf("readonlyState isn't set")
}
return readonlyState, nil
}

// SetReadonlyState updates the readonlyState in a thread-safe manner.
func SetReadonlyState(state ReadonlyState) {
readonlyStateLock.Lock()
defer readonlyStateLock.Unlock()

readonlyState = state
}

var (
readWriteStateLock sync.RWMutex
readWriteState State
)

// GetReadWriteState retrieves the readWriteState in a thread-safe manner.
// Returns an error if readWriteState is not set.
func GetReadWriteState() (State, error) {
readWriteStateLock.RLock()
defer readWriteStateLock.RUnlock()

if readWriteState == nil {
return nil, fmt.Errorf("readWriteState isn't set")
}
return readWriteState, nil
}

// SetReadWriteState updates the readWriteState in a thread-safe manner.
func SetReadWriteState(state State) {
readWriteStateLock.Lock()
defer readWriteStateLock.Unlock()

readWriteState = state
}
19 changes: 19 additions & 0 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3233,3 +3233,22 @@ func TestGetAvailableCPUQuantity(t *testing.T) {
cpuQuantity := int(math.Ceil(nodeState.GetAvailableCPUQuantity(machine.NewCPUSet())))
require.Equal(t, 15, cpuQuantity)
}

func TestGetReadonlyState(t *testing.T) {
t.Parallel()

as := require.New(t)
state, err := GetReadonlyState()
as.NotNil(err)
as.Nil(state)
}

func TestGetWriteOnlyState(t *testing.T) {
t.Parallel()

as := require.New(t)
state, err := GetReadWriteState()
if state == nil {
as.NotNil(err)
}
}
22 changes: 2 additions & 20 deletions pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,6 @@ const (
maxResidualTime = 5 * time.Minute
)

var (
readonlyStateLock sync.RWMutex
readonlyState state.ReadonlyState
)

// GetReadonlyState returns state.ReadonlyState to provides a way
// to obtain the running states of the plugin
func GetReadonlyState() (state.ReadonlyState, error) {
readonlyStateLock.RLock()
defer readonlyStateLock.RUnlock()

if readonlyState == nil {
return nil, fmt.Errorf("readonlyState isn't setted")
}
return readonlyState, nil
}

// NativePolicy is a policy compatible with Kubernetes native semantics and is used in topology-aware scheduling scenarios.
type NativePolicy struct {
sync.RWMutex
Expand Down Expand Up @@ -115,9 +98,8 @@ func NewNativePolicy(agentCtx *agent.GenericContext, conf *config.Configuration,
return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr)
}

readonlyStateLock.Lock()
readonlyState = stateImpl
readonlyStateLock.Unlock()
state.SetReadonlyState(stateImpl)
state.SetReadWriteState(stateImpl)

wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(agentName, metrics.MetricTag{
Key: util.QRMPluginPolicyTagName,
Expand Down
9 changes: 0 additions & 9 deletions pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,15 +390,6 @@ func TestAllocateForPod(t *testing.T) {
_ = os.RemoveAll(tmpDir)
}

func TestGetReadonlyState(t *testing.T) {
t.Parallel()

as := require.New(t)
readonlyState, err := GetReadonlyState()
as.NotNil(err)
as.Nil(readonlyState)
}

func TestClearResidualState(t *testing.T) {
t.Parallel()

Expand Down
22 changes: 2 additions & 20 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,6 @@ const (
movePagesWorkLimit = 2
)

var (
readonlyStateLock sync.RWMutex
readonlyState state.ReadonlyState
)

// GetReadonlyState returns state.ReadonlyState to provides a way
// to obtain the running states of the plugin
func GetReadonlyState() (state.ReadonlyState, error) {
readonlyStateLock.RLock()
defer readonlyStateLock.RUnlock()

if readonlyState == nil {
return nil, fmt.Errorf("readonlyState isn't setted")
}
return readonlyState, nil
}

type DynamicPolicy struct {
sync.RWMutex

Expand Down Expand Up @@ -193,9 +176,8 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
general.Infof("empty ExtraControlKnobConfigFile, initialize empty extraControlKnobConfigs")
}

readonlyStateLock.Lock()
readonlyState = stateImpl
readonlyStateLock.Unlock()
state.SetReadonlyState(stateImpl)
state.SetReadWriteState(stateImpl)

wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(agentName, metrics.MetricTag{
Key: util.QRMPluginPolicyTagName,
Expand Down
10 changes: 0 additions & 10 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1953,16 +1953,6 @@ func TestGetResourcesAllocation(t *testing.T) {
}, resp5.PodResources[req.PodUid].ContainerResources[testName].ResourceAllocation[string(v1.ResourceMemory)])
}

func TestGetReadonlyState(t *testing.T) {
t.Parallel()

as := require.New(t)
readonlyState, err := GetReadonlyState()
if readonlyState == nil {
as.NotNil(err)
}
}

func TestGenerateResourcesMachineStateFromPodEntries(t *testing.T) {
t.Parallel()

Expand Down
51 changes: 51 additions & 0 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package state
import (
"encoding/json"
"fmt"
"sync"

info "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -384,3 +385,53 @@ type State interface {
writer
ReadonlyState
}

var (
readonlyStateLock sync.RWMutex
readonlyState ReadonlyState
)

// GetReadonlyState retrieves the readonlyState in a thread-safe manner.
// Returns an error if readonlyState is not set.
func GetReadonlyState() (ReadonlyState, error) {
readonlyStateLock.RLock()
defer readonlyStateLock.RUnlock()

if readonlyState == nil {
return nil, fmt.Errorf("readonlyState isn't setted")
}
return readonlyState, nil
}

// SetReadonlyState updates the readonlyState in a thread-safe manner.
func SetReadonlyState(state ReadonlyState) {
readonlyStateLock.Lock()
defer readonlyStateLock.Unlock()

readonlyState = state
}

var (
readWriteStateLock sync.RWMutex
readWriteState State
)

// GetReadWriteState retrieves the readWriteState in a thread-safe manner.
// Returns an error if readWriteState is not set.
func GetReadWriteState() (State, error) {
readWriteStateLock.RLock()
defer readWriteStateLock.RUnlock()

if readWriteState == nil {
return nil, fmt.Errorf("readWriteState isn't set")
}
return readWriteState, nil
}

// SetReadWriteState updates the readWriteState in a thread-safe manner.
func SetReadWriteState(state State) {
readWriteStateLock.Lock()
defer readWriteStateLock.Unlock()

readWriteState = state
}
43 changes: 43 additions & 0 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package state

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestGetReadonlyState(t *testing.T) {
t.Parallel()

as := require.New(t)
state, err := GetReadonlyState()
if state == nil {
as.NotNil(err)
}
}

func TestGetWriteOnlyState(t *testing.T) {
t.Parallel()

as := require.New(t)
state, err := GetReadWriteState()
if state == nil {
as.NotNil(err)
}
}

0 comments on commit 2fa64f4

Please sign in to comment.