Skip to content

Commit

Permalink
[ws-daemon] Refactor enforcing of CPU limits
Browse files Browse the repository at this point in the history
  • Loading branch information
aledbf committed Jan 25, 2022
1 parent 43198ce commit 50e9f1a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 45 deletions.
44 changes: 15 additions & 29 deletions components/ws-daemon/pkg/resources/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,24 +190,11 @@ func (gov *Controller) controlCPU() {

var bdgtSpent int64
if prev > 0 {
// s and prev are total CPU time consumption at t-10sec and t.
// s and prev are expressed in nano-jiffies (= 1000*1000*10 milliseconds of CPU time)
// sample and prev are total CPU time consumption at t-10sec and t.
// diff is the total CPU time consumption during the sampling interval.
// diff is expressed in nano-jiffies per sampling interval (e.g. if the sampling interval is 10 sec, this is nano-jiffies/10 sec)
diff := sample - prev

// load is the average CPU load during the sampling interval.
// load is expressed in jiffies per samplingPeriod, i.e. the jiffies consumed this period.
// nano-jiffies convert to load as follows:
// 1 CPU has 100 jiffie/sec, i.e. 1 jiffie/sec equals 1 CPU load (if userHz is 100 - see above)
// 1000 nano-jiffies/sec are 1 micro-jiffie/sec
// 1000 micro-jiffies/sec are 1 milli-jiffie/sec
// 10 milli-jiffies/sec are 1 jiffie/sec (because 100 jiffie/sec CPU capactity * 10 milli-jiffie/sec make 1000 milliseconds)
load := diff / (1000 * 1000 * 10)

// load is the jiffies we've spent this sampling period. Add it to the expenditure sampling buffer
// and compute the budget we have left.
gov.cpuExpenditures.Value = load
gov.cpuExpenditures.Value = diff / int64(time.Millisecond)
gov.cpuExpenditures = gov.cpuExpenditures.Next()
gov.cpuExpenditures.Do(func(s interface{}) {
si, ok := s.(int64)
Expand All @@ -218,8 +205,8 @@ func (gov *Controller) controlCPU() {
})
}

// newLimit is expressed in jiffies/sec
var newLimit int64

gov.mu.RLock()
if gov.cpuLimiterOverride != nil {
newLimit = gov.cpuLimiterOverride.Limit(bdgtSpent)
Expand All @@ -243,15 +230,15 @@ func (gov *Controller) controlCPU() {
}

// SetFixedCPULimit overrides the CPU current limiter with a fixed CPU limiter
func (gov *Controller) SetFixedCPULimit(jiffiesPerSec int64) {
func (gov *Controller) SetFixedCPULimit(newLimit int64) {
gov.mu.Lock()
defer gov.mu.Unlock()

gov.cpuLimiterOverride = nil

if jiffiesPerSec > 0 {
gov.cpuLimiterOverride = FixedLimiter(jiffiesPerSec)
gov.log.WithField("limit", jiffiesPerSec).Debug("set fixed CPU limit for workspace")
if newLimit > 0 {
gov.cpuLimiterOverride = FixedLimiter(newLimit)
gov.log.WithField("limit", newLimit).Debug("set fixed CPU limit for workspace")
}
}

Expand Down Expand Up @@ -328,16 +315,14 @@ func determineProcessType(p processTypeIndicator) ProcessType {
return ProcessDefault
}

// enforceCPULimit sets a new CPU spending limit expressed in jiffies/sec
// enforceCPULimit sets a new CPU spending limit expressed in CPU cores
func (gov *Controller) enforceCPULimit(limit int64) (didChange bool, err error) {
currentQuota, period, err := gov.cfsController.GetQuota()
currentQuota, periodInMs, err := gov.cfsController.GetQuota()
if err != nil {
return
}

periodToMilliseconds := (time.Duration(period) * time.Microsecond).Milliseconds()
newQuota := limit * (10 /* milli-jiffie per jiffie */) * periodToMilliseconds

newQuota := limit * (periodInMs / 100)
if newQuota == currentQuota {
return false, nil
}
Expand All @@ -346,6 +331,7 @@ func (gov *Controller) enforceCPULimit(limit int64) (didChange bool, err error)
if err != nil {
return
}

gov.log.WithField("currentQuota", currentQuota).WithField("limit", limit).WithField("quota", newQuota).Info("set new CPU limit")

return true, nil
Expand All @@ -360,7 +346,7 @@ func (gov *Controller) Stop() {

// cfsController interacts with the completely fair scheduler of the linux kernel
type cfsController interface {
GetUsage() (totalJiffies int64, err error)
GetUsage() (nanoSeconds int64, err error)
GetQuota() (quota, period int64, err error)
SetQuota(quota int64) error
}
Expand All @@ -369,19 +355,19 @@ type cfsController interface {
type cgroupCFSController string

// GetUsage returns the cpuacct.usage value of the cgroup
func (basePath cgroupCFSController) GetUsage() (totalJiffies int64, err error) {
func (basePath cgroupCFSController) GetUsage() (nanoSeconds int64, err error) {
fn := filepath.Join(string(basePath), "cpuacct.usage")
fc, err := os.ReadFile(fn)
if err != nil {
return 0, xerrors.Errorf("cannot sample cpuacct.usage: %w", err)
}

totalJiffies, err = strconv.ParseInt(strings.TrimSpace(string(fc)), 10, 64)
nanoSeconds, err = strconv.ParseInt(strings.TrimSpace(string(fc)), 10, 64)
if err != nil {
return 0, xerrors.Errorf("cannot sample cpuacct.usage: %w", err)
}

return totalJiffies, nil
return nanoSeconds, nil
}

// GetQuota returns the current quota and period setting of the cgroup's CFS
Expand Down
18 changes: 9 additions & 9 deletions components/ws-daemon/pkg/resources/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,43 +54,43 @@ func TestControlCPU(t *testing.T) {
Name: "bucket regular use",
Opts: []ControllerOpt{WithCPULimiter(bktLimiter())},
Consumer: splitpointConsumer(fixedConsumer(500), randomConsumer(20, 100), 5*time.Minute),
Validator: aucValidator(49985),
Validator: aucValidator(53385),
},
{
Name: "bucket max abuse",
Opts: []ControllerOpt{WithCPULimiter(bktLimiter())},
Consumer: fixedConsumer(600),
Validator: aucValidator(192200),
Validator: aucValidator(324000),
},
{
Name: "bucket below radar abuse",
Opts: []ControllerOpt{WithCPULimiter(bktLimiter())},
Consumer: fixedConsumer(400),
Validator: aucValidator(191000),
Validator: aucValidator(216000),
},
{
Name: "clamped bucket regular use",
Opts: []ControllerOpt{WithCPULimiter(clampingBktLimiter())},
Consumer: splitpointConsumer(fixedConsumer(500), randomConsumer(20, 100), 5*time.Minute),
Validator: aucValidator(51200),
Validator: aucValidator(54600),
},
{
Name: "clamped bucket max abuse",
Opts: []ControllerOpt{WithCPULimiter(clampingBktLimiter())},
Consumer: fixedConsumer(600),
Validator: aucValidator(125200),
Validator: aucValidator(324000),
},
{
Name: "clamped bucket below radar abuse",
Opts: []ControllerOpt{WithCPULimiter(clampingBktLimiter())},
Consumer: fixedConsumer(400),
Validator: aucValidator(125000),
Validator: aucValidator(216000),
},
{
Name: "clamped bucket smart abuser",
Opts: []ControllerOpt{WithCPULimiter(clampingBktLimiter())},
Consumer: periodicConsumer(splitpointConsumer(fixedConsumer(500), fixedConsumer(100), 5*time.Minute), 10*time.Minute),
Validator: aucValidator(150000),
Validator: aucValidator(174000),
},
}

Expand Down Expand Up @@ -187,7 +187,7 @@ func (c *testCFSController) Sample(dt time.Duration) (s *sample) {
quota, period, _ := c.GetQuota()

periodToMilliseconds := (time.Duration(period) * time.Microsecond).Milliseconds()
quotaInMilliseconds := quota / ((10 /* milli-jiffie per jiffie */) * periodToMilliseconds)
quotaInMilliseconds := quota / periodToMilliseconds

spending := req
if spending > quotaInMilliseconds {
Expand Down Expand Up @@ -328,7 +328,7 @@ func aucValidator(limit int64) validator {
integ += s.GrantedReq
}
if integ > limit {
t.Errorf("unexpected high total CPU use %d micro-jiffies, expected %d micro-jiffies", integ, limit)
t.Errorf("unexpected high total CPU use %d, expected %d", integ, limit)
}
}
}
10 changes: 3 additions & 7 deletions components/ws-daemon/pkg/resources/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,11 @@ func (d *DispatchListener) WorkspaceAdded(ctx context.Context, ws *dispatch.Work

var cpuLimiter ResourceLimiter
if fixedLimit, ok := ws.Pod.Annotations[wsk8s.CPULimitAnnotation]; ok && fixedLimit != "" {
var scaledLimit int64
limit, err := resource.ParseQuantity(fixedLimit)
scaledLimit, err := resource.ParseQuantity(fixedLimit)
if err != nil {
log.WithError(err).WithField("limitReq", fixedLimit).Warn("workspace requested a fixed CPU limit, but we cannot parse the value")
}
// we need to scale from milli jiffie to jiffie - see governer code for details
scaledLimit = limit.MilliValue() / 10
cpuLimiter = FixedLimiter(scaledLimit)
cpuLimiter = FixedLimiter(scaledLimit.MilliValue())
} else if len(d.Config.CPUBuckets) > 0 {
cpuLimiter = &ClampingBucketLimiter{Buckets: d.Config.CPUBuckets}
} else {
Expand Down Expand Up @@ -134,8 +131,7 @@ func (d *DispatchListener) WorkspaceUpdated(ctx context.Context, ws *dispatch.Wo
if err != nil {
return xerrors.Errorf("cannot enforce fixed CPU limit: %w", err)
}
// we need to scale from milli jiffie to jiffie - see governer code for details
scaledLimit = limit.MilliValue() / 10
scaledLimit = limit.MilliValue()
}

gov.SetFixedCPULimit(scaledLimit)
Expand Down

0 comments on commit 50e9f1a

Please sign in to comment.