Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine coding styles for cpu&memory plugin #54

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/katalyst-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
}

// If leader election is enabled, runCommand via LeaderElector until done and exit.
// If leader election is enabled, runCommand via LeaderElector until done and exists.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
Expand Down
527 changes: 0 additions & 527 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/allocation_handlers.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)

// cpuAccumulator is used as a helper function calculate cpu detailed
// cpuAccumulator is used as a helper function to calculate cpu detailed
// allocation results according to machine topology and cpu requirements;
// it uses a stable allocation strategy, meaning that every time we need
// a fixed number of cpu cores based a fixed cpu topology, we will always
Expand Down
179 changes: 156 additions & 23 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,46 @@ limitations under the License.
package cpuadvisor

import (
fmt "fmt"
"fmt"

"k8s.io/klog/v2"

"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)

// FakedContainerID represents a placeholder since pool entry has no container-level
// FakedNumaID represents a placeholder since pools like shared/reclaimed will not contain a specific numa
// FakedContainerName represents a placeholder since pool entry has no container-level
// FakedNUMAID represents a placeholder since pools like shared/reclaimed will not contain a specific numa
const (
FakedContainerID = ""
FakedNumaID = -1
EmptyOwnerPoolName = ""
FakedContainerName = ""
FakedNUMAID = -1
)

// GetNumaCalculationResult returns numa-level calculation results
func (m *CalculationEntries) GetNumaCalculationResult(container string, numa int64) (*NumaCalculationResult, bool) {
info, ok := m.Entries[container]
if !ok || info == nil {
return nil, false
}
type BlockCPUSet map[string]machine.CPUSet

if numaInfo, ok := info.CalculationResultsByNumas[numa]; ok {
return numaInfo, true
func NewBlockCPUSet() BlockCPUSet {
return make(BlockCPUSet)
}

func (b BlockCPUSet) GetCPUSet() machine.CPUSet {
cpusets := machine.NewCPUSet()
for _, cpuset := range b {
cpusets = cpusets.Union(cpuset)
}
return nil, false
return cpusets
}

// GetBlocks parses ListAndWatchResponse and returns map[int]map[string]*Block
// whose first level key is NUMA id and second level key is block id
func (m *ListAndWatchResponse) GetBlocks() (map[int]map[string]*Block, error) {
if m == nil {
// GetBlocks parses ListAndWatchResponse and returns map[int]map[string]*Block,
// the map is keyed as numa id -> block id -> block
func (lwr *ListAndWatchResponse) GetBlocks() (map[int]map[string]*Block, error) {
if lwr == nil {
return nil, fmt.Errorf("got nil ListAndWatchResponse")
}

blocks := make(map[int]map[string]*Block)

visBlocksToNUMA := make(map[string]int)

for entryName, entry := range m.Entries {
for entryName, entry := range lwr.Entries {
for subEntryName, calculationInfo := range entry.Entries {
if calculationInfo == nil {
klog.Warningf("[ListAndWatchResponse.GetBlocks] got nil calculationInfo entry: %s, subEntry: %s",
Expand All @@ -71,7 +72,6 @@ func (m *ListAndWatchResponse) GetBlocks() (map[int]map[string]*Block, error) {
}

numaIdInt, err := general.CovertInt64ToInt(numaIdInt64)

if err != nil {
return nil, fmt.Errorf("parse nuam id: %d failed with error: %v entry: %s, subEntry: %s",
numaIdInt64, err, entryName, subEntryName)
Expand Down Expand Up @@ -105,6 +105,139 @@ func (m *ListAndWatchResponse) GetBlocks() (map[int]map[string]*Block, error) {
}
}
}

return blocks, nil
}

// GeEntryNUMABlocks returns Block lists according to the given [entry, subEntry] pair
func (lwr *ListAndWatchResponse) GeEntryNUMABlocks(entry, subEntry string, numa int64) ([]*Block, bool) {
if entry, ok := lwr.Entries[entry]; !ok {
return []*Block{}, false
} else if info, ok := entry.Entries[subEntry]; !ok {
return []*Block{}, false
} else if results, ok := info.CalculationResultsByNumas[numa]; !ok {
return []*Block{}, false
} else {
return results.Blocks, true
}

}

// GetCalculationInfo returns CalculationInfo according to the given [entry, subEntry]
func (lwr *ListAndWatchResponse) GetCalculationInfo(entry, subEntry string) (*CalculationInfo, bool) {
if entry, ok := lwr.Entries[entry]; !ok {
return nil, false
} else if info, ok := entry.Entries[subEntry]; !ok {
return nil, false
} else {
return info, true
}
}

// FilterCalculationInfo filter out CalculationInfo only for dedicated pod,
// and the returned map and formatted as pod -> container -> CalculationInfo
func (lwr *ListAndWatchResponse) FilterCalculationInfo(pool string) map[string]map[string]*CalculationInfo {
dedicatedCalculationInfos := make(map[string]map[string]*CalculationInfo)
for entryName, entry := range lwr.Entries {
for subEntryName, calculationInfo := range entry.Entries {
if calculationInfo != nil && calculationInfo.OwnerPoolName == pool {
if dedicatedCalculationInfos[entryName] == nil {
dedicatedCalculationInfos[entryName] = make(map[string]*CalculationInfo)
}

dedicatedCalculationInfos[entryName][subEntryName] = calculationInfo
}
}
}
return dedicatedCalculationInfos
}

// GetNUMACalculationResult returns numa-level calculation results
func (ce *CalculationEntries) GetNUMACalculationResult(container string, numa int64) (*NumaCalculationResult, bool) {
info, ok := ce.Entries[container]
if !ok || info == nil {
return nil, false
}

if numaInfo, ok := info.CalculationResultsByNumas[numa]; ok {
return numaInfo, true
}
return nil, false
}

// GetNUMAQuantities returns quantity in each numa for in this CalculationInfo
func (ci *CalculationInfo) GetNUMAQuantities() (map[int]int, error) {
if ci == nil {
return nil, fmt.Errorf("got nil ci")
}

numaQuantities := make(map[int]int)
for numaId, numaResult := range ci.CalculationResultsByNumas {
if numaResult == nil {
klog.Warningf("[GetTotalQuantity] got nil NUMA result")
continue
}

var quantityUInt64 uint64 = 0
for _, block := range numaResult.Blocks {
if block == nil {
klog.Warningf("[GetTotalQuantity] got nil block")
continue
}
quantityUInt64 += block.Result
}

quantityInt, err := general.CovertUInt64ToInt(quantityUInt64)
if err != nil {
return nil, fmt.Errorf("converting quantity: %d to int failed with error: %v",
quantityUInt64, err)
}

numaIdInt, err := general.CovertInt64ToInt(numaId)
if err != nil {
return nil, fmt.Errorf("converting quantity: %d to int failed with error: %v",
numaId, err)
}
numaQuantities[numaIdInt] = quantityInt
}
return numaQuantities, nil
}

// GetTotalQuantity returns total quantity for in this CalculationInfo
func (ci *CalculationInfo) GetTotalQuantity() (int, error) {
waynepeking348 marked this conversation as resolved.
Show resolved Hide resolved
numaQuantities, err := ci.GetNUMAQuantities()
if err != nil {
return 0, err
}

var totalQuantity = 0
for _, quantity := range numaQuantities {
totalQuantity += quantity
}
return totalQuantity, nil
}

// GetCPUSet returns cpuset for this container by union all blocks for it
func (ci *CalculationInfo) GetCPUSet(entry, subEntry string, b BlockCPUSet) (machine.CPUSet, error) {
cpusets := machine.NewCPUSet()
for _, results := range ci.CalculationResultsByNumas {
if results == nil {
general.Warningf("got nil numa result entry: %s, subEntry: %s", entry, subEntry)
continue
}

for _, block := range results.Blocks {
if block == nil {
general.Warningf("got nil block result entry: %s, subEntry: %s", entry, subEntry)
continue
}

if cset, found := b[block.BlockId]; !found {
return machine.CPUSet{}, fmt.Errorf("block %s not found, entry: %s, subEntry: %s", block.BlockId, entry, subEntry)
} else {
cpusets = cpusets.Union(cset)
}
}
}
return cpusets, nil

}
Loading