Skip to content

Commit

Permalink
roachtest: add test for admission control disk bandwidth
Browse files Browse the repository at this point in the history
This test runs a single node target cluster that has two workloads
running on it. The lower priority (qos=background) is very bandwidth
intensive, and without the AC bandwidth limiter would saturate the
provisioned bandwidth (controlled using cgroups).

This test shows how setting the cluster setting
`kvadmission.store.provisioned-bandwidth` limits the disk bandwidth
usage of lower priority work and shapes it at the value set in the
setting.

Fixes #121576.

Release note: None
  • Loading branch information
aadityasondhi committed May 8, 2024
1 parent 48a2035 commit 5c6d381
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"activerecord_blocklist.go",
"admission_control.go",
"admission_control_database_drop.go",
"admission_control_disk_bandwidth_overload.go",
"admission_control_elastic_backup.go",
"admission_control_elastic_cdc.go",
"admission_control_elastic_io.go",
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/admission_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ func registerAdmission(r registry.Registry) {
registerDatabaseDrop(r)
registerIntentResolutionOverload(r)
registerElasticIO(r)
registerDiskBandwidthOverload(r)
}
210 changes: 210 additions & 0 deletions pkg/cmd/roachtest/tests/admission_control_disk_bandwidth_overload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/grafana"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// This test sets up 2 workloads – kv0 consisting of "normal" priority writes
// and kv0 consisting of "background" priority writes. The goal is to show that
// even with a demanding "background" workload that is able to push the used
// bandwidth much higher than the provisioned one, the AC bandwidth limiter
// paces the traffic at the set bandwidth limit.
func registerDiskBandwidthOverload(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "admission-control/disk-bandwidth-limiter",
Owner: registry.OwnerAdmissionControl,
Timeout: time.Hour,
Benchmark: true,
CompatibleClouds: registry.AllClouds,
Suites: registry.ManualOnly,
Cluster: r.MakeClusterSpec(2, spec.CPU(8)),
RequiresLicense: true,
Leases: registry.MetamorphicLeases,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.Spec().NodeCount != 2 {
t.Fatalf("expected 2 nodes, found %d", c.Spec().NodeCount)
}
crdbNodes := c.Spec().NodeCount - 1
workloadNode := crdbNodes + 1

promCfg := &prometheus.Config{}
promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]).
WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithGrafanaDashboardJSON(grafana.SnapshotAdmissionControlGrafanaJSON)
err := c.StartGrafana(ctx, t.L(), promCfg)
require.NoError(t, err)

startOpts := option.NewStartOpts(option.NoBackupSchedule)
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs,
"--vmodule=io_load_listener=2")
roachtestutil.SetDefaultAdminUIPort(c, &startOpts.RoachprodOpts)
settings := install.MakeClusterSettings()
c.Start(ctx, t.L(), startOpts, settings, c.Range(1, crdbNodes))

promClient, err := clusterstats.SetupCollectorPromClient(ctx, c, t.L(), promCfg)
require.NoError(t, err)
statCollector := clusterstats.NewStatsCollector(ctx, promClient)

setAdmissionControl(ctx, t, c, true)

// TODO(aaditya): This function shares some of the logic with roachtestutil.DiskStaller. Consider merging the two.
setBandwidthLimit := func(nodes option.NodeListOption, rw string, bw int, max bool) error {
res, err := c.RunWithDetailsSingleNode(context.TODO(), t.L(), option.WithNodes(nodes[:1]), "lsblk | grep /mnt/data1 | awk '{print $2}'")
if err != nil {
t.Fatalf("error when determining block device: %s", err)
}
parts := strings.Split(strings.TrimSpace(res.Stdout), ":")
if len(parts) != 2 {
t.Fatalf("unexpected output from lsblk: %s", res.Stdout)
}
major, err := strconv.Atoi(parts[0])
if err != nil {
t.Fatalf("error when determining block device: %s", err)
}
minor, err := strconv.Atoi(parts[1])
if err != nil {
t.Fatalf("error when determining block device: %s", err)
}

cockroachIOController := filepath.Join("/sys/fs/cgroup/system.slice", roachtestutil.SystemInterfaceSystemdUnitName()+".service", "io.max")
bytesPerSecondStr := "max"
if !max {
bytesPerSecondStr = fmt.Sprintf("%d", bw)
}
return c.RunE(ctx, option.WithNodes(nodes), "sudo", "/bin/bash", "-c", fmt.Sprintf(
`'echo %d:%d %s=%s > %s'`,
major,
minor,
rw,
bytesPerSecondStr,
cockroachIOController,
))
}

if err := setBandwidthLimit(c.Range(1, crdbNodes), "wbps", 128<<20 /* 128MiB */, false); err != nil {
t.Fatal(err)
}

// TODO(aaditya): Extend this test to also limit reads once we have a
// mechanism to pace read traffic in AC.

db := c.Conn(ctx, t.L(), crdbNodes)
defer db.Close()

const bandwidthLimit = 75
if _, err := db.ExecContext(
// We intentionally set this to much lower than the provisioned value
// above to clearly show that the bandwidth limiter works.
ctx, fmt.Sprintf("SET CLUSTER SETTING kvadmission.store.provisioned_bandwidth = '%dMiB'", bandwidthLimit)); err != nil {
t.Fatalf("failed to set kvadmission.store.provisioned_bandwidth: %v", err)
}

duration := 30 * time.Minute
m := c.NewMonitor(ctx, c.Range(1, crdbNodes))
m.Go(func(ctx context.Context) error {
t.Status(fmt.Sprintf("starting foreground kv workload thread (<%s)", time.Minute))
dur := " --duration=" + duration.String()
url := fmt.Sprintf(" {pgurl:1-%d}", crdbNodes)
cmd := "./cockroach workload run kv --init --histograms=perf/stats.json --concurrency=2 " +
"--splits=1000 --read-percent=50 --min-block-bytes=4096 --max-block-bytes=4096 " +
"--txn-qos='regular' --tolerate-errors" + dur + url
c.Run(ctx, option.WithNodes(c.Node(workloadNode)), cmd)
return nil
})

m.Go(func(ctx context.Context) error {
time.Sleep(1 * time.Minute)
t.Status(fmt.Sprintf("starting background kv workload thread (<%s)", time.Minute))
dur := " --duration=" + duration.String()
url := fmt.Sprintf(" {pgurl:1-%d}", crdbNodes)
cmd := "./cockroach workload run kv --init --histograms=perf/stats.json --concurrency=1024 " +
"--splits=1000 --read-percent=0 --min-block-bytes=4096 --max-block-bytes=4096 " +
"--txn-qos='background' --tolerate-errors" + dur + url
c.Run(ctx, option.WithNodes(c.Node(workloadNode)), cmd)
return nil
})

m.Go(func(ctx context.Context) error {
t.Status(fmt.Sprintf("starting monitoring thread (<%s)", time.Minute))
writeBWMetric := divQuery("rate(sys_host_disk_write_bytes[1m])", 1<<20 /* 1MiB */)
getMetricVal := func(query string, label string) (float64, error) {
point, err := statCollector.CollectPoint(ctx, t.L(), timeutil.Now(), query)
if err != nil {
t.L().Errorf("could not query prom %s", err.Error())
return 0, err
}
val := point[label]
if len(val) != 1 {
err = errors.Errorf(
"unexpected number %d of points for metric %s", len(val), query)
t.L().Errorf("%s", err.Error())
return 0, err
}
for storeID, v := range val {
t.L().Printf("%s(store=%s): %f", query, storeID, v.Value)
return v.Value, nil
}
// Unreachable.
panic("unreachable")
}

// Allow a 5% room for error.
const bandwidthThreshold = bandwidthLimit * 1.05
const collectionIntervalSeconds = 10.0
// Loop for ~20 minutes.
const numIterations = int(20 / (collectionIntervalSeconds / 60))
numErrors := 0
numSuccesses := 0
for i := 0; i < numIterations; i++ {
time.Sleep(collectionIntervalSeconds * time.Second)
val, err := getMetricVal(writeBWMetric, "node")
if err != nil {
numErrors++
continue
}
if val > bandwidthThreshold {
t.Fatalf("write bandwidth %f over last exceeded threshold", val)
}
numSuccesses++
}
t.Status(fmt.Sprintf("done monitoring, errors: %d successes: %d", numErrors, numSuccesses))
if numErrors > numSuccesses {
t.Fatalf("too many errors retrieving metrics")
}
return nil
})

m.Wait()
},
})
}

0 comments on commit 5c6d381

Please sign in to comment.