Skip to content

Commit

Permalink
isolation support non-exclusive
Browse files Browse the repository at this point in the history
  • Loading branch information
luomingmeng committed Dec 14, 2023
1 parent bacb868 commit 49d1ef3
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,15 @@ type CPUIsolationOptions struct {
IsolationLockInThreshold int
IsolationLockOutPeriodSecs int

IsolationDisabled bool
IsolationDisabledPools []string
// IsolationDisabled is used to disable all isolation.
// IsolationDisabledPools indicates the pools where pods will not be isolated.
// IsolationForceEnablePools indicates the pools where pods must be isolated, even if the pool
// is listed in IsolationDisabledPools.
// IsolationNonExclusivePools indicates the pools where pods will not be exclusively isolated.
IsolationDisabled bool
IsolationDisabledPools []string
IsolationForceEnablePools []string
IsolationNonExclusivePools []string
}

// NewCPUIsolationOptions creates a new Options with a default config
Expand All @@ -67,8 +74,10 @@ func NewCPUIsolationOptions() *CPUIsolationOptions {
IsolationLockInThreshold: 3,
IsolationLockOutPeriodSecs: 120,

IsolationDisabled: true,
IsolationDisabledPools: []string{},
IsolationDisabled: true,
IsolationDisabledPools: []string{},
IsolationForceEnablePools: []string{},
IsolationNonExclusivePools: []string{},
}
}

Expand Down Expand Up @@ -101,7 +110,11 @@ func (o *CPUIsolationOptions) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.IsolationDisabled, "isolation-disable", o.IsolationDisabled,
"if set as true, disable the isolation logic")
fs.StringArrayVar(&o.IsolationDisabledPools, "isolation-disable-pools", o.IsolationDisabledPools,
"if set as true, disable the isolation logic for the given pool")
"disable the isolation logic for the given pool")
fs.StringArrayVar(&o.IsolationForceEnablePools, "isolation-force-enable-pools", o.IsolationForceEnablePools,
"isolation force enable for get given pool")
fs.StringArrayVar(&o.IsolationNonExclusivePools, "isolation-non-exclusive-pools", o.IsolationNonExclusivePools,
"isolation is non-exclusive for get given pool")
}

// ApplyTo fills up config with options
Expand Down Expand Up @@ -142,6 +155,8 @@ func (o *CPUIsolationOptions) ApplyTo(c *cpu.CPUIsolationConfiguration) error {

c.IsolationDisabled = o.IsolationDisabled
c.IsolationDisabledPools = sets.NewString(o.IsolationDisabledPools...)
c.IsolationForceEnablePools = sets.NewString(o.IsolationForceEnablePools...)
c.IsolationNonExclusivePools = sets.NewString(o.IsolationNonExclusivePools...)

return nil
}
2 changes: 1 addition & 1 deletion pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewQoSAwarePlugin(pluginName string, conf *config.Configuration, extraConf
return nil, err
}

qrmServer, err := server.NewQRMServer(resourceAdvisor, conf, metaCache, emitter)
qrmServer, err := server.NewQRMServer(resourceAdvisor, conf, metaCache, metaServer, emitter)
if err != nil {
return nil, err
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (cra *cpuResourceAdvisor) assignContainersToRegions() error {
if ci.OwnerPoolName == state.PoolNameDedicated {
// dedicated pool should not exist in metaCache.poolEntries
return true
} else if ci.Isolated {
} else if ci.Isolated || cra.conf.IsolationForceEnablePools.Has(ci.OriginOwnerPoolName) {
// isolated pool should not exist in metaCache.poolEntries
return true
} else {
Expand Down Expand Up @@ -412,7 +412,7 @@ func (cra *cpuResourceAdvisor) assignToRegions(ci *types.ContainerInfo) ([]regio
}

// assign isolated container
if ci.Isolated {
if ci.Isolated || cra.conf.IsolationForceEnablePools.Has(ci.OriginOwnerPoolName) {
// if there already exists an isolation region for this pod, just reuse it
regions, err := cra.getContainerRegions(ci, types.QoSRegionTypeIsolation)
if err != nil {
Expand All @@ -421,7 +421,16 @@ func (cra *cpuResourceAdvisor) assignToRegions(ci *types.ContainerInfo) ([]regio
return regions, nil
}

r := region.NewQoSRegionIsolation(ci, cra.conf, cra.extraConf, cra.metaCache, cra.metaServer, cra.emitter)
regionName := ""
if cra.conf.IsolationNonExclusivePools.Has(ci.OriginOwnerPoolName) {
// if there already exists a non-exclusive isolation region for this pod, just reuse it
regions = cra.getPoolRegions(ci.OriginOwnerPoolName)
if len(regions) > 0 {
return regions, nil
}
}

r := region.NewQoSRegionIsolation(ci, regionName, cra.conf, cra.extraConf, cra.metaCache, cra.metaServer, cra.emitter)
return []region.QoSRegion{r}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (l *LoadIsolator) GetIsolatedPods() []string {

existed.Insert(containerMeta(ci))
if l.checkContainerIsolated(ci, isolationResources) {
general.Infof("add container %s from pod %s/%s to isolation", ci.ContainerName, ci.PodNamespace, ci.PodName)
uidSets.Insert(ci.PodUID)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ func (r *QoSRegionBase) GetProvision() (types.ControlKnob, error) {
r.provisionPolicyNameInUse = internal.name

if r.provisionPolicyNameInUse != oldProvisionPolicyNameInUse {
klog.Infof("[qosaware-cpu] region: %s provision policy switch from %s to %s",
r.Name, oldProvisionPolicyNameInUse, r.provisionPolicyNameInUse)
klog.Infof("[qosaware-cpu] region: %v provision policy switch from %v to %v",
r.Name(), oldProvisionPolicyNameInUse, r.provisionPolicyNameInUse)
if r.enableBorweinModel {
r.borweinController.ResetIndicatorOffsets()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,23 @@ import (
"github.com/kubewharf/katalyst-core/pkg/metrics"
)

var isolationRegionDefaultOwnerPoolName = "isolation-default"
const isolationRegionDefaultOwnerPoolName = "isolation-default"

type QoSRegionIsolation struct {
*QoSRegionBase
}

// NewQoSRegionIsolation returns a region instance for isolated pods
func NewQoSRegionIsolation(ci *types.ContainerInfo, conf *config.Configuration, extraConf interface{},
func NewQoSRegionIsolation(ci *types.ContainerInfo, customRegionName string, conf *config.Configuration, extraConf interface{},
metaReader metacache.MetaReader, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter) QoSRegion {

regionName := getRegionNameFromMetaCache(ci, cpuadvisor.FakedNUMAID, metaReader)
if regionName == "" {
regionName = string(types.QoSRegionTypeIsolation) + types.RegionNameSeparator + ci.PodName + types.RegionNameSeparator + string(uuid.NewUUID())
if customRegionName != "" {
regionName = customRegionName
} else {
regionName = string(types.QoSRegionTypeIsolation) + types.RegionNameSeparator + ci.PodName + types.RegionNameSeparator + string(uuid.NewUUID())
}
}

r := &QoSRegionIsolation{
Expand Down
13 changes: 10 additions & 3 deletions pkg/agent/sysadvisor/plugin/qosaware/server/base_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/config/generic"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)
Expand Down Expand Up @@ -63,23 +65,28 @@ type baseServer struct {
resourceRequestName string
resourceLimitName string

metaCache metacache.MetaCache
emitter metrics.MetricEmitter
qosConf *generic.QoSConfiguration

metaCache metacache.MetaCache
metaServer *metaserver.MetaServer
emitter metrics.MetricEmitter

grpcServer *grpc.Server
resourceServer subQRMServer
}

func newBaseServer(name string, conf *config.Configuration, recvCh interface{}, sendCh chan types.TriggerInfo,
metaCache metacache.MetaCache, emitter metrics.MetricEmitter, resourceServer subQRMServer) *baseServer {
metaCache metacache.MetaCache, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter, resourceServer subQRMServer) *baseServer {
return &baseServer{
name: name,
period: conf.QoSAwarePluginConfiguration.SyncPeriod,
qosConf: conf.QoSConfiguration,
recvCh: recvCh,
sendCh: sendCh,
lwCalledChan: make(chan struct{}),
stopCh: make(chan struct{}),
metaCache: metaCache,
metaServer: metaServer,
emitter: emitter,
resourceServer: resourceServer,
}
Expand Down
46 changes: 35 additions & 11 deletions pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"google.golang.org/grpc"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
Expand All @@ -49,9 +51,9 @@ type cpuServer struct {
}

func NewCPUServer(recvCh chan types.InternalCPUCalculationResult, sendCh chan types.TriggerInfo, conf *config.Configuration,
metaCache metacache.MetaCache, emitter metrics.MetricEmitter) (*cpuServer, error) {
metaCache metacache.MetaCache, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter) (*cpuServer, error) {
cs := &cpuServer{}
cs.baseServer = newBaseServer(cpuServerName, conf, recvCh, sendCh, metaCache, emitter, cs)
cs.baseServer = newBaseServer(cpuServerName, conf, recvCh, sendCh, metaCache, metaServer, emitter, cs)
cs.advisorSocketPath = conf.CPUAdvisorSocketAbsPath
cs.pluginSocketPath = conf.CPUPluginSocketAbsPath
cs.resourceRequestName = "CPURequest"
Expand Down Expand Up @@ -106,7 +108,7 @@ func (cs *cpuServer) ListAndWatch(_ *advisorsvc.Empty, server cpuadvisor.CPUAdvi
// Assemble pod entries
f := func(podUID string, containerName string, ci *types.ContainerInfo) bool {
if err := cs.assemblePodEntries(calculationEntriesMap, blockID2Blocks, podUID, ci); err != nil {
klog.Errorf("[qosaware-server-cpu] assemblePodEntries err: %v", err)
klog.Errorf("[qosaware-server-cpu] assemblePodEntries for pod %s/%s uid %s err: %v", ci.PodNamespace, ci.PodName, ci.PodUID, err)
}
return true
}
Expand All @@ -125,8 +127,9 @@ func (cs *cpuServer) ListAndWatch(_ *advisorsvc.Empty, server cpuadvisor.CPUAdvi
}

func (cs *cpuServer) getCheckpoint() {
ctx := context.Background()
// get checkpoint
resp, err := cs.cpuPluginClient.GetCheckpoint(context.Background(), &cpuadvisor.GetCheckpointRequest{})
resp, err := cs.cpuPluginClient.GetCheckpoint(ctx, &cpuadvisor.GetCheckpointRequest{})
if err != nil {
klog.Errorf("[qosaware-server-cpu] get checkpoint failed: %v", err)
_ = cs.emitter.StoreInt64(cs.genMetricsName(metricServerLWGetCheckpointFailed), int64(cs.period.Seconds()), metrics.MetricTypeNameCount)
Expand Down Expand Up @@ -156,8 +159,14 @@ func (cs *cpuServer) getCheckpoint() {
for entryName, entry := range resp.Entries {
if _, ok := entry.Entries[cpuadvisor.FakedContainerName]; !ok {
podUID := entryName
pod, err := cs.metaServer.GetPod(ctx, podUID)
if err != nil {
klog.Errorf("[qosaware-server-cpu] get pod info with error: %v", err)
continue
}

for containerName, info := range entry.Entries {
if err := cs.updateContainerInfo(podUID, containerName, info); err != nil {
if err := cs.updateContainerInfo(podUID, containerName, pod, info); err != nil {
klog.Errorf("[qosaware-server-cpu] update container info with error: %v", err)
}
}
Expand Down Expand Up @@ -227,7 +236,7 @@ func (cs *cpuServer) updatePoolInfo(poolName string, info *cpuadvisor.Allocation
return cs.metaCache.SetPoolInfo(poolName, pi)
}

func (cs *cpuServer) updateContainerInfo(podUID string, containerName string, info *cpuadvisor.AllocationInfo) error {
func (cs *cpuServer) updateContainerInfo(podUID string, containerName string, pod *v1.Pod, info *cpuadvisor.AllocationInfo) error {
ci, ok := cs.metaCache.GetContainerInfo(podUID, containerName)
if !ok {
return fmt.Errorf("container %v/%v not exist", podUID, containerName)
Expand All @@ -236,12 +245,27 @@ func (cs *cpuServer) updateContainerInfo(podUID string, containerName string, in
ci.RampUp = info.RampUp
ci.TopologyAwareAssignments = machine.TransformCPUAssignmentFormat(info.TopologyAwareAssignments)
ci.OriginalTopologyAwareAssignments = machine.TransformCPUAssignmentFormat(info.OriginalTopologyAwareAssignments)
ci.OwnerPoolName = info.OwnerPoolName

// only reset pool-name according to QRM during starting process
if len(ci.OriginOwnerPoolName) == 0 {
ci.OriginOwnerPoolName = info.OwnerPoolName
// get qos level name according to the qos conf
qosLevel, err := cs.qosConf.GetQoSLevelForPod(pod)
if err != nil {
return fmt.Errorf("container %v/%v get qos level failed", podUID, containerName)
}
if ci.QoSLevel != qosLevel {
general.Infof("qos level has change from %s to %s", ci.QoSLevel, qosLevel)
ci.QoSLevel = qosLevel
}

// get origin owner pool name according to the qos conf
originOwnerPoolName, err := cs.qosConf.GetSpecifiedPoolNameForPod(pod)
if err != nil {
return fmt.Errorf("container %v/%v get origin owner pool name failed", podUID, containerName)
}
if ci.OriginOwnerPoolName != originOwnerPoolName {
general.Infof("OriginOwnerPoolName has change from %s to %s", ci.OriginOwnerPoolName, originOwnerPoolName)
ci.OriginOwnerPoolName = originOwnerPoolName
}
ci.OwnerPoolName = info.OwnerPoolName

// fill in topology aware assignment for containers with owner pool
if ci.QoSLevel != consts.PodAnnotationQoSLevelDedicatedCores {
Expand Down Expand Up @@ -291,7 +315,7 @@ func (cs *cpuServer) assemblePodEntries(calculationEntriesMap map[string]*cpuadv
}
}
// if isolation is locking out, pass original owner pool instead of owner pool
if !ci.Isolated && qrmstate.IsIsolationPool(ci.OwnerPoolName) {
if !ci.Isolated && ci.OwnerPoolName != ci.OriginOwnerPoolName {
calculationInfo.OwnerPoolName = ci.OriginOwnerPoolName
}

Expand Down
Loading

0 comments on commit 49d1ef3

Please sign in to comment.