Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sysadvisor): fix create duplicated regions for distinct container… #37

Merged
merged 1 commit into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions pkg/agent/sysadvisor/metacache/metacache.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,22 @@ func (mc *MetaCacheImp) DeleteContainer(podUID string, containerName string) err
// RangeAndUpdateContainer applies a function to every podUID, containerName, containerInfo set.
// Not recommended to use if RangeContainer satisfies the requirement.
func (mc *MetaCacheImp) RangeAndUpdateContainer(f func(podUID string, containerName string, containerInfo *types.ContainerInfo) bool) {
mc.mutex.Lock()
defer mc.mutex.Unlock()
mc.mutex.RLock()
podEntries := mc.podEntries.Clone()
mc.mutex.RUnlock()

for podUID, podInfo := range mc.podEntries {
// func f may need to hold metaCache.mutex, so make this for-loop section lock-free
for podUID, podInfo := range podEntries {
for containerName, containerInfo := range podInfo {
if !f(podUID, containerName, containerInfo) {
break
}
}
}

mc.mutex.Lock()
mc.podEntries = podEntries
mc.mutex.Unlock()
}

// RemovePod deletes a PodInfo keyed by pod uid. Repeatedly remove will be ignored.
Expand Down
97 changes: 56 additions & 41 deletions pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -78,9 +79,7 @@ type cpuResourceAdvisor struct {
startTime time.Time
systemNumas machine.CPUSet

regionMap map[string]region.QoSRegion // map[regionName]region
containerRegionMap map[string]map[string][]region.QoSRegion // map[podUID][containerName]regions
poolRegionMap map[string][]region.QoSRegion // map[poolName]regions
regionMap map[string]region.QoSRegion // map[regionName]region

nonBindingNumas machine.CPUSet // numas without numa binding pods
mutex sync.RWMutex
Expand All @@ -100,9 +99,7 @@ func NewCPUResourceAdvisor(conf *config.Configuration, extraConf interface{}, me
startTime: time.Now(),
systemNumas: metaServer.CPUDetails.NUMANodes(),

regionMap: make(map[string]region.QoSRegion),
containerRegionMap: make(map[string]map[string][]region.QoSRegion),
poolRegionMap: make(map[string][]region.QoSRegion),
regionMap: make(map[string]region.QoSRegion),

nonBindingNumas: machine.NewCPUSet(),

Expand Down Expand Up @@ -289,11 +286,18 @@ func (cra *cpuResourceAdvisor) assignContainersToRegions() error {
}

cra.setContainerRegions(ci, regions)
cra.setPoolRegions(ci.OwnerPoolName, regions)
// dedicated pool is not existed in metaCache.poolEntries
if ci.OwnerPoolName == state.PoolNameDedicated {
return true
}
if err := cra.setPoolRegions(ci.OwnerPoolName, regions); err != nil {
errList = append(errList, err)
return true
}

return true
}
cra.metaCache.RangeContainer(f)
cra.metaCache.RangeAndUpdateContainer(f)

cra.gc()
cra.updateNonBindingNumas()
Expand All @@ -306,7 +310,11 @@ func (cra *cpuResourceAdvisor) assignContainersToRegions() error {
func (cra *cpuResourceAdvisor) assignToRegions(ci *types.ContainerInfo) ([]region.QoSRegion, error) {
if ci.QoSLevel == consts.PodAnnotationQoSLevelSharedCores {
// Assign shared cores container. Focus on pool.
if regions, ok := cra.getPoolRegions(ci.OwnerPoolName); ok {
regions, err := cra.getPoolRegions(ci.OwnerPoolName)
if err != nil {
return nil, err
}
if len(regions) > 0 {
return regions, nil
}

Expand All @@ -317,12 +325,14 @@ func (cra *cpuResourceAdvisor) assignToRegions(ci *types.ContainerInfo) ([]regio

} else if ci.IsNumaBinding() {
// Assign dedicated cores numa exclusive containers. Focus on container.
if regions, ok := cra.getContainerRegions(ci); ok {
regions, err := cra.getContainerRegions(ci)
if err != nil {
return nil, err
}
if len(regions) > 0 {
return regions, nil
}

var regions []region.QoSRegion

// Create regions by numa node
for numaID := range ci.TopologyAwareAssignments {
name := string(types.QoSRegionTypeDedicatedNumaExclusive) + regionNameSeparator + string(uuid.NewUUID())
Expand Down Expand Up @@ -434,47 +444,52 @@ func (cra *cpuResourceAdvisor) gc() {
klog.Infof("[qosaware-cpu] delete region %v", regionName)
}
}
}

// Delete non exist pods in container region map
for podUID := range cra.containerRegionMap {
if _, ok := cra.metaCache.GetContainerEntries(podUID); !ok {
delete(cra.containerRegionMap, podUID)
func (cra *cpuResourceAdvisor) getContainerRegions(ci *types.ContainerInfo) ([]region.QoSRegion, error) {
var regions []region.QoSRegion = nil
for regionName := range ci.RegionNames {
region, ok := cra.regionMap[regionName]
if !ok {
return nil, fmt.Errorf("failed to find region %v", regionName)
}
regions = append(regions, region)
}
return regions, nil
}

// Delete non exist pools in pool region map
for poolName := range cra.poolRegionMap {
if _, ok := cra.metaCache.GetPoolInfo(poolName); !ok {
delete(cra.poolRegionMap, poolName)
}
func (cra *cpuResourceAdvisor) setContainerRegions(ci *types.ContainerInfo, regions []region.QoSRegion) {
Copy link
Collaborator

@sun-yuliang sun-yuliang Apr 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setContainerRegions will not change container info in metacache because it is called by metacache.RangeContainer, which is a reader function. Use RangeAndUpdateContainer instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

ci.RegionNames = sets.NewString()
for _, region := range regions {
ci.RegionNames.Insert(region.Name())
}
}

func (cra *cpuResourceAdvisor) getContainerRegions(ci *types.ContainerInfo) ([]region.QoSRegion, bool) {
if v, ok := cra.containerRegionMap[ci.PodUID]; ok {
if regions, exist := v[ci.ContainerName]; exist {
return regions, true
func (cra *cpuResourceAdvisor) getPoolRegions(poolName string) ([]region.QoSRegion, error) {
pool, ok := cra.metaCache.GetPoolInfo(poolName)
if !ok || pool == nil {
return nil, nil
}
var regions []region.QoSRegion = nil
for regionName := range pool.RegionNames {
region, ok := cra.regionMap[regionName]
if !ok {
return nil, fmt.Errorf("failed to find region %v", regionName)
}
regions = append(regions, region)
}
return nil, false
return regions, nil
}

func (cra *cpuResourceAdvisor) setContainerRegions(ci *types.ContainerInfo, regions []region.QoSRegion) {
v, ok := cra.containerRegionMap[ci.PodUID]
func (cra *cpuResourceAdvisor) setPoolRegions(poolName string, regions []region.QoSRegion) error {
pool, ok := cra.metaCache.GetPoolInfo(poolName)
if !ok {
cra.containerRegionMap[ci.PodUID] = make(map[string][]region.QoSRegion)
v = cra.containerRegionMap[ci.PodUID]
return fmt.Errorf("failed to find pool %v", poolName)
}
v[ci.ContainerName] = regions
}

func (cra *cpuResourceAdvisor) getPoolRegions(poolName string) ([]region.QoSRegion, bool) {
if regions, ok := cra.poolRegionMap[poolName]; ok {
return regions, true
pool.RegionNames = sets.NewString()
for _, region := range regions {
pool.RegionNames.Insert(region.Name())
}
return nil, false
}

func (cra *cpuResourceAdvisor) setPoolRegions(poolName string, regions []region.QoSRegion) {
cra.poolRegionMap[poolName] = regions
cra.metaCache.SetPoolInfo(poolName, pool)
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options"
Expand Down Expand Up @@ -122,6 +123,7 @@ func TestCPUServerAddContainer(t *testing.T) {
Annotations: map[string]string{"key": "label"},
QoSLevel: consts.PodAnnotationQoSLevelSharedCores,
CPURequest: 1,
RegionNames: sets.NewString(),
},
},
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/sysadvisor/types/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (ci *ContainerInfo) Clone() *ContainerInfo {
OwnerPoolName: ci.OwnerPoolName,
TopologyAwareAssignments: ci.TopologyAwareAssignments.Clone(),
OriginalTopologyAwareAssignments: ci.OriginalTopologyAwareAssignments.Clone(),
RegionNames: sets.NewString(ci.RegionNames.List()...),
}
return clone
}
Expand All @@ -79,6 +80,7 @@ func (pi *PoolInfo) Clone() *PoolInfo {
PoolName: pi.PoolName,
TopologyAwareAssignments: pi.TopologyAwareAssignments.Clone(),
OriginalTopologyAwareAssignments: pi.OriginalTopologyAwareAssignments.Clone(),
RegionNames: sets.NewString(pi.RegionNames.List()...),
}
return clone
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/sysadvisor/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,15 @@ type ContainerInfo struct {
OwnerPoolName string
TopologyAwareAssignments TopologyAwareAssignment
OriginalTopologyAwareAssignments TopologyAwareAssignment
RegionNames sets.String
}

// PoolInfo contains pool information for sysadvisor plugins
type PoolInfo struct {
PoolName string
TopologyAwareAssignments TopologyAwareAssignment
OriginalTopologyAwareAssignments TopologyAwareAssignment
RegionNames sets.String
}

// RegionInfo contains region information generated by sysadvisor resource advisor
Expand Down