Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ws-daemon] Introduce IO limiting #9271

Merged
merged 1 commit into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions components/ws-daemon/pkg/cgroup/cgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package cgroup

import (
"context"

"github.com/gitpod-io/gitpod/common-go/cgroups"
"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/dispatch"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/xerrors"
)

func NewPluginHost(cgroupBasePath string, plugins ...Plugin) (*PluginHost, error) {
var version Version
unified, err := cgroups.IsUnifiedCgroupSetup()
if err != nil {
return nil, xerrors.Errorf("could not determine cgroup setup: %w", err)
}
if unified {
version = Version2
} else {
version = Version1
}

return &PluginHost{
CGroupBasePath: cgroupBasePath,
CGroupVersion: version,
Plugins: plugins,

pluginActivationTotalVec: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cgroup_plugin_activation_total",
Help: "counts the total activation of cgroup plugins",
}, []string{"plugin", "success"}),
}, nil
}

type PluginHost struct {
CGroupBasePath string
CGroupVersion Version
Plugins []Plugin

pluginActivationTotalVec *prometheus.CounterVec
}

var _ dispatch.Listener = &PluginHost{}
var _ prometheus.Collector = &PluginHost{}

func (host *PluginHost) Describe(c chan<- *prometheus.Desc) {
host.pluginActivationTotalVec.Describe(c)
for _, p := range host.Plugins {
col, ok := p.(prometheus.Collector)
if !ok {
continue
}

col.Describe(c)
}
}

func (host *PluginHost) Collect(c chan<- prometheus.Metric) {
host.pluginActivationTotalVec.Collect(c)
for _, p := range host.Plugins {
col, ok := p.(prometheus.Collector)
if !ok {
continue
}

col.Collect(c)
}
}

func (host *PluginHost) WorkspaceAdded(ctx context.Context, ws *dispatch.Workspace) (err error) {
disp := dispatch.GetFromContext(ctx)
if disp == nil {
return xerrors.Errorf("no dispatch available")
}

cgroupPath, err := disp.Runtime.ContainerCGroupPath(context.Background(), ws.ContainerID)
if err != nil {
return xerrors.Errorf("cannot start governer: %w", err)
}

for _, plg := range host.Plugins {
if plg.Type() != host.CGroupVersion {
continue
}

go func(plg Plugin) {
err := plg.Apply(ctx, host.CGroupBasePath, cgroupPath)
if err == context.Canceled || err == context.DeadlineExceeded {
return
}
if err != nil {
log.WithError(err).WithFields(ws.OWI()).WithField("plugin", plg.Name()).Error("cgroup plugin failure")
host.pluginActivationTotalVec.WithLabelValues(plg.Name(), "false").Inc()
} else {
host.pluginActivationTotalVec.WithLabelValues(plg.Name(), "true").Inc()
}
}(plg)
}

return nil
}

type Plugin interface {
Name() string
Type() Version
Apply(ctx context.Context, basePath, cgroupPath string) error
}

type Version int

const (
Version1 Version = iota
Version2
)
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package daemon
package cgroup

import (
"bufio"
Expand All @@ -17,63 +17,38 @@ import (
"strings"
"time"

"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/dispatch"
"golang.org/x/xerrors"
)

type CacheReclaim string
type CacheReclaim struct{}

// WorkspaceAdded will customize the cgroups for every workspace that is started
func (c CacheReclaim) WorkspaceAdded(ctx context.Context, ws *dispatch.Workspace) error {
disp := dispatch.GetFromContext(ctx)
if disp == nil {
return xerrors.Errorf("no dispatch available")
}
func (c *CacheReclaim) Name() string { return "cache-reclaim-v1" }
func (c *CacheReclaim) Type() Version { return Version1 }

cgroupPath, err := disp.Runtime.ContainerCGroupPath(context.Background(), ws.ContainerID)
if err != nil {
return xerrors.Errorf("cannot start governer: %w", err)
}
func (c *CacheReclaim) Apply(ctx context.Context, basePath, cgroupPath string) error {
memPath := filepath.Join(string(basePath), "memory", cgroupPath)

t := time.NewTicker(10 * time.Second)
defer t.Stop()

var lastReclaim time.Time
for {
select {
case <-ctx.Done():
return nil
case <-t.C:
}

memPath := filepath.Join(string(c), "memory", cgroupPath)

go func() {
owi := ws.OWI()
log.WithFields(ws.OWI()).Debug("starting page cache reclaim")

t := time.NewTicker(10 * time.Second)
defer t.Stop()

var lastReclaim time.Time
for {
select {
case <-ctx.Done():
log.WithFields(owi).Debug("shutting down page cache reclaim")
return
case <-t.C:
}

if !lastReclaim.IsZero() && time.Since(lastReclaim) < 30*time.Second {
continue
}

stats, err := reclaimPageCache(memPath)
if err != nil {
log.WithFields(owi).WithError(err).Warn("cannot reclaim page cache")
continue
}
e := log.WithFields(owi).WithField("reclaimed_bytes", stats.Reclaimed()).WithField("stats", stats)
if stats.DidReclaim {
e.Debug("reclaimed page cache")
} else {
e.Debug("did not reclaim page cache")
}
lastReclaim = time.Now()
if !lastReclaim.IsZero() && time.Since(lastReclaim) < 30*time.Second {
continue
}
}()

return nil
_, err := reclaimPageCache(memPath)
if err != nil {
continue
}
lastReclaim = time.Now()
}
}

type reclaimStats struct {
Expand Down Expand Up @@ -118,7 +93,9 @@ func readLimit(memCgroupPath string) (uint64, error) {
fn := filepath.Join(string(memCgroupPath), "memory.limit_in_bytes")
fc, err := os.ReadFile(fn)
if err != nil {
// TODO(toru): find out why the file does not exists
// We have a race between the dispatch noticing that a workspace is stopped
// and the container going away. Hence we might be running for workspace
// container which no longer exist, i.e. their cgroup files no longer exist.
if errors.Is(err, fs.ErrNotExist) {
return 0, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package daemon
package cgroup

import (
"fmt"
Expand Down
58 changes: 58 additions & 0 deletions components/ws-daemon/pkg/cgroup/plugin_fuse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package cgroup

import (
"context"

"github.com/containerd/cgroups"
"github.com/opencontainers/runtime-spec/specs-go"
"golang.org/x/xerrors"
)

var (
fuseDeviceMajor int64 = 10
fuseDeviceMinor int64 = 229
)

type FuseDeviceEnablerV1 struct{}

func (c *FuseDeviceEnablerV1) Name() string { return "fuse-device-enabler-v1" }
func (c *FuseDeviceEnablerV1) Type() Version { return Version1 }

func (c *FuseDeviceEnablerV1) Apply(ctx context.Context, basePath, cgroupPath string) error {
control, err := cgroups.Load(customV1(basePath), cgroups.StaticPath(cgroupPath))

if err != nil {
return xerrors.Errorf("error loading cgroup at path: %s %w", cgroupPath, err)
}

res := &specs.LinuxResources{
Devices: []specs.LinuxDeviceCgroup{
// /dev/fuse
{
Type: "c",
Minor: &fuseDeviceMinor,
Major: &fuseDeviceMajor,
Access: "rwm",
Allow: true,
},
},
}

if err := control.Update(res); err != nil {
return xerrors.Errorf("cgroup update failed: %w", err)
}

return nil
}

func customV1(basePath string) func() ([]cgroups.Subsystem, error) {
return func() ([]cgroups.Subsystem, error) {
return []cgroups.Subsystem{
cgroups.NewDevices(basePath),
}, nil
}
}
67 changes: 67 additions & 0 deletions components/ws-daemon/pkg/cgroup/plugin_fuse_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package cgroup

import (
"context"
"path/filepath"

"github.com/opencontainers/runc/libcontainer/cgroups/ebpf"
"github.com/opencontainers/runc/libcontainer/cgroups/ebpf/devicefilter"
"github.com/opencontainers/runc/libcontainer/devices"
"github.com/opencontainers/runc/libcontainer/specconv"
"golang.org/x/sys/unix"
"golang.org/x/xerrors"
)

type FuseDeviceEnablerV2 struct{}

func (c *FuseDeviceEnablerV2) Name() string { return "fuse-device-enabler-v2" }
func (c *FuseDeviceEnablerV2) Type() Version { return Version2 }

func (c *FuseDeviceEnablerV2) Apply(ctx context.Context, basePath, cgroupPath string) error {
fullCgroupPath := filepath.Join(basePath, cgroupPath)
cgroupFD, err := unix.Open(fullCgroupPath, unix.O_DIRECTORY|unix.O_RDONLY, 0o600)
if err != nil {
return xerrors.Errorf("cannot get directory fd for %s", fullCgroupPath)
}
defer unix.Close(cgroupFD)

insts, license, err := devicefilter.DeviceFilter(composeDeviceRules())
if err != nil {
return xerrors.Errorf("failed to generate device filter: %w", err)
}

_, err = ebpf.LoadAttachCgroupDeviceFilter(insts, license, cgroupFD)
if err != nil {
return xerrors.Errorf("failed to attach cgroup device filter: %w", err)
}

return nil
}

func composeDeviceRules() []*devices.Rule {
denyAll := devices.Rule{
Type: 'a',
Permissions: "rwm",
Allow: false,
}

allowFuse := devices.Rule{
Type: 'c',
Major: fuseDeviceMajor,
Minor: fuseDeviceMinor,
Permissions: "rwm",
Allow: true,
}

deviceRules := make([]*devices.Rule, 0)
deviceRules = append(deviceRules, &denyAll, &allowFuse)
for _, device := range specconv.AllowedDevices {
deviceRules = append(deviceRules, &device.Rule)
}

return deviceRules
}
Loading