Skip to content

Commit

Permalink
[ws-daemon] Introduce IO limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
csweichel authored and roboquat committed Apr 12, 2022
1 parent 94e147b commit 7853926
Show file tree
Hide file tree
Showing 11 changed files with 395 additions and 221 deletions.
120 changes: 120 additions & 0 deletions components/ws-daemon/pkg/cgroup/cgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package cgroup

import (
"context"

"github.com/gitpod-io/gitpod/common-go/cgroups"
"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/dispatch"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/xerrors"
)

func NewPluginHost(cgroupBasePath string, plugins ...Plugin) (*PluginHost, error) {
var version Version
unified, err := cgroups.IsUnifiedCgroupSetup()
if err != nil {
return nil, xerrors.Errorf("could not determine cgroup setup: %w", err)
}
if unified {
version = Version2
} else {
version = Version1
}

return &PluginHost{
CGroupBasePath: cgroupBasePath,
CGroupVersion: version,
Plugins: plugins,

pluginActivationTotalVec: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cgroup_plugin_activation_total",
Help: "counts the total activation of cgroup plugins",
}, []string{"plugin", "success"}),
}, nil
}

type PluginHost struct {
CGroupBasePath string
CGroupVersion Version
Plugins []Plugin

pluginActivationTotalVec *prometheus.CounterVec
}

var _ dispatch.Listener = &PluginHost{}
var _ prometheus.Collector = &PluginHost{}

func (host *PluginHost) Describe(c chan<- *prometheus.Desc) {
host.pluginActivationTotalVec.Describe(c)
for _, p := range host.Plugins {
col, ok := p.(prometheus.Collector)
if !ok {
continue
}

col.Describe(c)
}
}

func (host *PluginHost) Collect(c chan<- prometheus.Metric) {
host.pluginActivationTotalVec.Collect(c)
for _, p := range host.Plugins {
col, ok := p.(prometheus.Collector)
if !ok {
continue
}

col.Collect(c)
}
}

func (host *PluginHost) WorkspaceAdded(ctx context.Context, ws *dispatch.Workspace) (err error) {
disp := dispatch.GetFromContext(ctx)
if disp == nil {
return xerrors.Errorf("no dispatch available")
}

cgroupPath, err := disp.Runtime.ContainerCGroupPath(context.Background(), ws.ContainerID)
if err != nil {
return xerrors.Errorf("cannot start governer: %w", err)
}

for _, plg := range host.Plugins {
if plg.Type() != host.CGroupVersion {
continue
}

go func(plg Plugin) {
err := plg.Apply(ctx, host.CGroupBasePath, cgroupPath)
if err == context.Canceled || err == context.DeadlineExceeded {
return
}
if err != nil {
log.WithError(err).WithFields(ws.OWI()).WithField("plugin", plg.Name()).Error("cgroup plugin failure")
host.pluginActivationTotalVec.WithLabelValues(plg.Name(), "false").Inc()
} else {
host.pluginActivationTotalVec.WithLabelValues(plg.Name(), "true").Inc()
}
}(plg)
}

return nil
}

type Plugin interface {
Name() string
Type() Version
Apply(ctx context.Context, basePath, cgroupPath string) error
}

type Version int

const (
Version1 Version = iota
Version2
)
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package daemon
package cgroup

import (
"bufio"
Expand All @@ -17,63 +17,38 @@ import (
"strings"
"time"

"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/dispatch"
"golang.org/x/xerrors"
)

type CacheReclaim string
type CacheReclaim struct{}

// WorkspaceAdded will customize the cgroups for every workspace that is started
func (c CacheReclaim) WorkspaceAdded(ctx context.Context, ws *dispatch.Workspace) error {
disp := dispatch.GetFromContext(ctx)
if disp == nil {
return xerrors.Errorf("no dispatch available")
}
func (c *CacheReclaim) Name() string { return "cache-reclaim-v1" }
func (c *CacheReclaim) Type() Version { return Version1 }

cgroupPath, err := disp.Runtime.ContainerCGroupPath(context.Background(), ws.ContainerID)
if err != nil {
return xerrors.Errorf("cannot start governer: %w", err)
}
func (c *CacheReclaim) Apply(ctx context.Context, basePath, cgroupPath string) error {
memPath := filepath.Join(string(basePath), "memory", cgroupPath)

t := time.NewTicker(10 * time.Second)
defer t.Stop()

var lastReclaim time.Time
for {
select {
case <-ctx.Done():
return nil
case <-t.C:
}

memPath := filepath.Join(string(c), "memory", cgroupPath)

go func() {
owi := ws.OWI()
log.WithFields(ws.OWI()).Debug("starting page cache reclaim")

t := time.NewTicker(10 * time.Second)
defer t.Stop()

var lastReclaim time.Time
for {
select {
case <-ctx.Done():
log.WithFields(owi).Debug("shutting down page cache reclaim")
return
case <-t.C:
}

if !lastReclaim.IsZero() && time.Since(lastReclaim) < 30*time.Second {
continue
}

stats, err := reclaimPageCache(memPath)
if err != nil {
log.WithFields(owi).WithError(err).Warn("cannot reclaim page cache")
continue
}
e := log.WithFields(owi).WithField("reclaimed_bytes", stats.Reclaimed()).WithField("stats", stats)
if stats.DidReclaim {
e.Debug("reclaimed page cache")
} else {
e.Debug("did not reclaim page cache")
}
lastReclaim = time.Now()
if !lastReclaim.IsZero() && time.Since(lastReclaim) < 30*time.Second {
continue
}
}()

return nil
_, err := reclaimPageCache(memPath)
if err != nil {
continue
}
lastReclaim = time.Now()
}
}

type reclaimStats struct {
Expand Down Expand Up @@ -118,7 +93,9 @@ func readLimit(memCgroupPath string) (uint64, error) {
fn := filepath.Join(string(memCgroupPath), "memory.limit_in_bytes")
fc, err := os.ReadFile(fn)
if err != nil {
// TODO(toru): find out why the file does not exists
// We have a race between the dispatch noticing that a workspace is stopped
// and the container going away. Hence we might be running for workspace
// container which no longer exist, i.e. their cgroup files no longer exist.
if errors.Is(err, fs.ErrNotExist) {
return 0, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package daemon
package cgroup

import (
"fmt"
Expand Down
58 changes: 58 additions & 0 deletions components/ws-daemon/pkg/cgroup/plugin_fuse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package cgroup

import (
"context"

"github.com/containerd/cgroups"
"github.com/opencontainers/runtime-spec/specs-go"
"golang.org/x/xerrors"
)

var (
fuseDeviceMajor int64 = 10
fuseDeviceMinor int64 = 229
)

type FuseDeviceEnablerV1 struct{}

func (c *FuseDeviceEnablerV1) Name() string { return "fuse-device-enabler-v1" }
func (c *FuseDeviceEnablerV1) Type() Version { return Version1 }

func (c *FuseDeviceEnablerV1) Apply(ctx context.Context, basePath, cgroupPath string) error {
control, err := cgroups.Load(customV1(basePath), cgroups.StaticPath(cgroupPath))

if err != nil {
return xerrors.Errorf("error loading cgroup at path: %s %w", cgroupPath, err)
}

res := &specs.LinuxResources{
Devices: []specs.LinuxDeviceCgroup{
// /dev/fuse
{
Type: "c",
Minor: &fuseDeviceMinor,
Major: &fuseDeviceMajor,
Access: "rwm",
Allow: true,
},
},
}

if err := control.Update(res); err != nil {
return xerrors.Errorf("cgroup update failed: %w", err)
}

return nil
}

func customV1(basePath string) func() ([]cgroups.Subsystem, error) {
return func() ([]cgroups.Subsystem, error) {
return []cgroups.Subsystem{
cgroups.NewDevices(basePath),
}, nil
}
}
67 changes: 67 additions & 0 deletions components/ws-daemon/pkg/cgroup/plugin_fuse_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package cgroup

import (
"context"
"path/filepath"

"github.com/opencontainers/runc/libcontainer/cgroups/ebpf"
"github.com/opencontainers/runc/libcontainer/cgroups/ebpf/devicefilter"
"github.com/opencontainers/runc/libcontainer/devices"
"github.com/opencontainers/runc/libcontainer/specconv"
"golang.org/x/sys/unix"
"golang.org/x/xerrors"
)

type FuseDeviceEnablerV2 struct{}

func (c *FuseDeviceEnablerV2) Name() string { return "fuse-device-enabler-v2" }
func (c *FuseDeviceEnablerV2) Type() Version { return Version2 }

func (c *FuseDeviceEnablerV2) Apply(ctx context.Context, basePath, cgroupPath string) error {
fullCgroupPath := filepath.Join(basePath, cgroupPath)
cgroupFD, err := unix.Open(fullCgroupPath, unix.O_DIRECTORY|unix.O_RDONLY, 0o600)
if err != nil {
return xerrors.Errorf("cannot get directory fd for %s", fullCgroupPath)
}
defer unix.Close(cgroupFD)

insts, license, err := devicefilter.DeviceFilter(composeDeviceRules())
if err != nil {
return xerrors.Errorf("failed to generate device filter: %w", err)
}

_, err = ebpf.LoadAttachCgroupDeviceFilter(insts, license, cgroupFD)
if err != nil {
return xerrors.Errorf("failed to attach cgroup device filter: %w", err)
}

return nil
}

func composeDeviceRules() []*devices.Rule {
denyAll := devices.Rule{
Type: 'a',
Permissions: "rwm",
Allow: false,
}

allowFuse := devices.Rule{
Type: 'c',
Major: fuseDeviceMajor,
Minor: fuseDeviceMinor,
Permissions: "rwm",
Allow: true,
}

deviceRules := make([]*devices.Rule, 0)
deviceRules = append(deviceRules, &denyAll, &allowFuse)
for _, device := range specconv.AllowedDevices {
deviceRules = append(deviceRules, &device.Rule)
}

return deviceRules
}
Loading

0 comments on commit 7853926

Please sign in to comment.