Skip to content

Commit

Permalink
[ws-daemon] Integrate new CPU limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
csweichel committed Feb 4, 2022
1 parent 8f531ab commit a217998
Show file tree
Hide file tree
Showing 14 changed files with 347 additions and 1,048 deletions.
100 changes: 100 additions & 0 deletions components/ws-daemon/pkg/cpulimit/cfs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package cpulimit

import (
"bufio"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"golang.org/x/xerrors"
)

// CgroupCFSController controls a cgroup's CFS settings
type CgroupCFSController string

// Usage returns the cpuacct.usage value of the cgroup
func (basePath CgroupCFSController) Usage() (usage CPUTime, err error) {
fn := filepath.Join(string(basePath), "cpuacct.usage")
fc, err := os.ReadFile(fn)
if err != nil {
return 0, xerrors.Errorf("cannot sample cpuacct.usage: %w", err)
}

cpuTimeInNS, err := strconv.ParseInt(strings.TrimSpace(string(fc)), 10, 64)
if err != nil {
return 0, xerrors.Errorf("cannot sample cpuacct.usage: %w", err)
}

return CPUTime(time.Duration(cpuTimeInNS) * time.Nanosecond), nil
}

// SetQuota sets a new CFS quota on the cgroup
func (basePath CgroupCFSController) SetLimit(limit Bandwidth) (changed bool, err error) {
periodfn := filepath.Join(string(basePath), "cpu.cfs_period_us")
periodfc, err := os.ReadFile(periodfn)
if err != nil {
err = xerrors.Errorf("cannot read CFS period: %w", err)
return
}
p, err := strconv.ParseInt(strings.TrimSpace(string(periodfc)), 10, 64)
if err != nil {
err = xerrors.Errorf("cannot parse CFS period: %w", err)
return
}
period := time.Duration(p) * time.Microsecond

quotafn := filepath.Join(string(basePath), "cpu.cfs_quota_us")
quotafc, err := os.ReadFile(periodfn)
if err != nil {
err = xerrors.Errorf("cannot read CFS period: %w", err)
return
}
q, err := strconv.ParseInt(strings.TrimSpace(string(quotafc)), 10, 64)
if err != nil {
err = xerrors.Errorf("cannot parse CFS period: %w", err)
return
}
quota := time.Duration(q) * time.Microsecond
target := limit.Quota(period)
if quota == target {
return false, nil
}

err = os.WriteFile(quotafn, []byte(strconv.FormatInt(target.Microseconds(), 10)), 0644)
if err != nil {
return false, xerrors.Errorf("cannot set CFS quota: %w", err)
}
return true, nil
}

// NrThrottled returns the number of CFS periods the cgroup was throttled in
func (basePath CgroupCFSController) NrThrottled() (uint64, error) {
f, err := os.Open(filepath.Join(string(basePath), "cpu.stat"))
if err != nil {
return 0, xerrors.Errorf("cannot read cpu.stat: %w", err)
}
defer f.Close()

const prefixNrThrottled = "nr_throttled "

scanner := bufio.NewScanner(f)
for scanner.Scan() {
l := scanner.Text()
if !strings.HasPrefix(l, prefixNrThrottled) {
continue
}

r, err := strconv.ParseInt(strings.TrimSpace(strings.TrimPrefix(l, prefixNrThrottled)), 10, 64)
if err != nil {
return 0, xerrors.Errorf("cannot parse cpu.stat: %w", err)
}
return uint64(r), nil
}
return 0, xerrors.Errorf("cpu.stat did not contain nr_throttled")
}
24 changes: 12 additions & 12 deletions components/ws-daemon/pkg/cpulimit/cpulimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,26 +60,26 @@ func (h *WorkspaceHistory) Throttled() bool {
}

type DistributorSource func(context.Context) ([]Workspace, error)
type DistributorSink func(workspaceID string, limit Bandwidth)
type DistributorSink func(id string, limit Bandwidth)

func NewDistributor(source DistributorSource, sink DistributorSink, limiter ResourceLimiter, breakoutLimiter ResourceLimiter, totalBandwidth Bandwidth) *Distributor {
func NewDistributor(source DistributorSource, sink DistributorSink, limiter ResourceLimiter, burstLimiter ResourceLimiter, totalBandwidth Bandwidth) *Distributor {
return &Distributor{
Source: source,
Sink: sink,
Limiter: limiter,
BreakoutLimiter: breakoutLimiter,
TotalBandwidth: totalBandwidth,
History: make(map[string]*WorkspaceHistory),
Source: source,
Sink: sink,
Limiter: limiter,
BurstLimiter: burstLimiter,
TotalBandwidth: totalBandwidth,
History: make(map[string]*WorkspaceHistory),
}
}

type Distributor struct {
Source DistributorSource
Sink DistributorSink

History map[string]*WorkspaceHistory
Limiter ResourceLimiter
BreakoutLimiter ResourceLimiter
History map[string]*WorkspaceHistory
Limiter ResourceLimiter
BurstLimiter ResourceLimiter

// TotalBandwidth is the total CPU time available in nanoseconds per second
TotalBandwidth Bandwidth
Expand Down Expand Up @@ -185,7 +185,7 @@ func (d *Distributor) Tick(dt time.Duration) (DistributorDebug, error) {
// never spent any CPU time and assume the workspace will spend their
// entire bandwidth at once.
if totalBandwidth < d.TotalBandwidth && ws.Throttled() {
limit = d.BreakoutLimiter.Limit(ws.Usage())
limit = d.BurstLimiter.Limit(ws.Usage())

// We assume the workspace is going to use as much as their limit allows.
// This might not be true, because their process which consumed so much CPU
Expand Down
192 changes: 192 additions & 0 deletions components/ws-daemon/pkg/cpulimit/dispatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package cpulimit

import (
"context"
"path/filepath"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/xerrors"
"k8s.io/apimachinery/pkg/api/resource"

wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes"
"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/common-go/util"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/dispatch"
)

// Config configures the containerd resource governer dispatch
type Config struct {
Enabled bool `json:"enabled"`
TotalBandwidth resource.Quantity `json:"totalBandwidth"`
Limit resource.Quantity `json:"limit"`
BurstLimit resource.Quantity `json:"bustLimit"`

ControlPeriod util.Duration `json:"controlPeriod"`
CGroupBasePath string `json:"cgroupBasePath"`
}

// NewDispatchListener creates a new resource governer dispatch listener
func NewDispatchListener(cfg *Config, prom prometheus.Registerer) *DispatchListener {
d := &DispatchListener{
Prometheus: prom,
Config: cfg,
workspaces: make(map[string]*workspace),

workspacesAddedCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cpulimit_workspaces_added_total",
Help: "Number of workspaces added to CPU control",
}, []string{"qos"}),
workspacesRemovedCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cpulimit_workspaces_removed_total",
Help: "Number of workspaces removed from CPU control",
}, []string{"qos"}),
workspacesThrottledCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cpulimit_workspaces_throttled_total",
Help: "Number of workspaces which ran with throttled CPU",
}, []string{"qos"}),
}

if cfg.Enabled {
dist := NewDistributor(d.source, d.sink,
FixedLimiter(BandwidthFromQuantity(d.Config.Limit)),
FixedLimiter(BandwidthFromQuantity(d.Config.BurstLimit)),
BandwidthFromQuantity(d.Config.TotalBandwidth),
)
go dist.Run(context.Background(), time.Duration(d.Config.ControlPeriod))
}

prom.MustRegister(
d.workspacesAddedCounterVec,
d.workspacesRemovedCounterVec,
d.workspacesThrottledCounterVec,
)

return d
}

// DispatchListener starts new resource governer using the workspace dispatch
type DispatchListener struct {
Prometheus prometheus.Registerer
Config *Config

workspaces map[string]*workspace
mu sync.RWMutex

workspacesAddedCounterVec *prometheus.CounterVec
workspacesRemovedCounterVec *prometheus.CounterVec
workspacesThrottledCounterVec *prometheus.CounterVec
}

type workspace struct {
CFS CgroupCFSController
OWI logrus.Fields
HardLimit ResourceLimiter
}

func (d *DispatchListener) source(context.Context) ([]Workspace, error) {
d.mu.RLock()
defer d.mu.RUnlock()

res := make([]Workspace, 0, len(d.workspaces))
for id, w := range d.workspaces {
usage, err := w.CFS.Usage()
if err != nil {
log.WithFields(w.OWI).WithError(err).Warn("cannot read CPU usage")
continue
}
throttled, err := w.CFS.NrThrottled()
if err != nil {
log.WithFields(w.OWI).WithError(err).Warn("cannot read times cgroup was throttled")
// we don't continue here, because worst case the cgroup will get too low a
// limit, but at least we'll keep maintaining the limit.
}

res = append(res, Workspace{
ID: id,
NrThrottled: throttled,
Usage: usage,
})
}
return res, nil
}

func (d *DispatchListener) sink(id string, limit Bandwidth) {
d.mu.RLock()
defer d.mu.RUnlock()

ws, ok := d.workspaces[id]
if !ok {
// this can happen if the workspace has gone away inbetween a distributor cycle
return
}

changed, err := ws.CFS.SetLimit(limit)
if err != nil {
log.WithError(err).WithFields(ws.OWI).Warn("cannot set CPU limit")
}
if changed {
log.WithFields(ws.OWI).WithField("limit", limit).Debug("applied new CPU limit")
}
}

// WorkspaceAdded starts new governer
func (d *DispatchListener) WorkspaceAdded(ctx context.Context, ws *dispatch.Workspace) error {
d.mu.Lock()
defer d.mu.Unlock()

disp := dispatch.GetFromContext(ctx)
if disp == nil {
return xerrors.Errorf("no dispatch available")
}

cgroupPath, err := disp.Runtime.ContainerCGroupPath(context.Background(), ws.ContainerID)
if err != nil {
return xerrors.Errorf("cannot start governer: %w", err)
}

d.workspaces[ws.InstanceID] = &workspace{
CFS: CgroupCFSController(filepath.Join(d.Config.CGroupBasePath, "cpu", cgroupPath)),
OWI: ws.OWI(),
}
go func() {
<-ctx.Done()

d.mu.Lock()
defer d.mu.Unlock()
delete(d.workspaces, ws.InstanceID)
d.workspacesRemovedCounterVec.WithLabelValues("none").Inc()
}()

d.workspacesAddedCounterVec.WithLabelValues("none").Inc()

return nil
}

// WorkspaceUpdated gets called when a workspace is updated
func (d *DispatchListener) WorkspaceUpdated(ctx context.Context, ws *dispatch.Workspace) error {
d.mu.Lock()
defer d.mu.Unlock()

wsinfo, ok := d.workspaces[ws.InstanceID]
if !ok {
return xerrors.Errorf("received update for a workspace we haven't seen before: %s", ws.InstanceID)
}

newCPULimit := ws.Pod.Annotations[wsk8s.CPULimitAnnotation]
if newCPULimit != "" {
limit, err := resource.ParseQuantity(newCPULimit)
if err != nil {
return xerrors.Errorf("cannot enforce fixed CPU limit: %w", err)
}
wsinfo.HardLimit = FixedLimiter(BandwidthFromQuantity(limit))
}

return nil
}
Loading

0 comments on commit a217998

Please sign in to comment.