Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: introduce admission-control/elastic-cdc #89656

Merged
merged 1 commit into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"activerecord_blocklist.go",
"admission_control.go",
"admission_control_elastic_backup.go",
"admission_control_elastic_cdc.go",
"admission_control_multi_store_overload.go",
"admission_control_snapshot_overload.go",
"admission_control_tpcc_overload.go",
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/admission_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func registerAdmission(r registry.Registry) {
// over some latency threshold. Will be Useful to track over time.

registerElasticControlForBackups(r)
registerElasticControlForCDC(r)
registerMultiStoreOverload(r)
registerSnapshotOverload(r)
registerTPCCOverload(r)
Expand Down
147 changes: 147 additions & 0 deletions pkg/cmd/roachtest/tests/admission_control_elastic_cdc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
)

// This test sets up a 3-node CRDB cluster on 8vCPU machines running
// 1000-warehouse TPC-C, and kicks off a few changefeed backfills concurrently.
// We've observed latency spikes during backfills because of its CPU/scan-heavy
// nature -- it can elevate CPU scheduling latencies which in turn translates to
// an increase in foreground latency.
func registerElasticControlForCDC(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "admission-control/elastic-cdc",
Owner: registry.OwnerAdmissionControl,
// TODO(irfansharif): After two weeks of nightly baking time, reduce
// this to a weekly cadence. This is a long-running test and serves only
// as a coarse-grained benchmark.
// Tags: []string{`weekly`},
Cluster: r.MakeClusterSpec(4, spec.CPU(8)),
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.Spec().NodeCount < 4 {
t.Fatalf("expected at least 4 nodes, found %d", c.Spec().NodeCount)
}

crdbNodes := c.Spec().NodeCount - 1
workloadNode := crdbNodes + 1
numWarehouses, workloadDuration, estimatedSetupTime := 1000, 60*time.Minute, 10*time.Minute
if c.IsLocal() {
numWarehouses, workloadDuration, estimatedSetupTime = 1, time.Minute, 2*time.Minute
}

promCfg := &prometheus.Config{}
promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]).
WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithGrafanaDashboard("http://go.crdb.dev/p/changefeed-admission-control-grafana").
WithScrapeConfigs(
prometheus.MakeWorkloadScrapeConfig("workload", "/",
makeWorkloadScrapeNodes(
c.Node(workloadNode).InstallNodes()[0],
[]workloadInstance{{nodes: c.Node(workloadNode)}},
),
),
)

if t.SkipInit() {
t.Status(fmt.Sprintf("running tpcc for %s (<%s)", workloadDuration, time.Minute))
} else {
t.Status(fmt.Sprintf("initializing + running tpcc for %s (<%s)", workloadDuration, 10*time.Minute))
}

padDuration, err := time.ParseDuration(ifLocal(c, "5s", "5m"))
if err != nil {
t.Fatal(err)
}
stopFeedsDuration, err := time.ParseDuration(ifLocal(c, "5s", "1m"))
if err != nil {
t.Fatal(err)
}

runTPCC(ctx, t, c, tpccOptions{
Warehouses: numWarehouses,
Duration: workloadDuration,
SetupType: usingImport,
EstimatedSetupTime: estimatedSetupTime,
SkipPostRunCheck: true,
ExtraSetupArgs: "--checks=false",
PrometheusConfig: promCfg,
During: func(ctx context.Context) error {
db := c.Conn(ctx, t.L(), crdbNodes)
defer db.Close()

t.Status(fmt.Sprintf("configuring cluster (<%s)", 30*time.Second))
{
setAdmissionControl(ctx, t, c, true)

// Changefeeds depend on rangefeeds being enabled.
if _, err := db.Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true"); err != nil {
return err
}
}

stopFeeds(db) // stop stray feeds (from repeated runs against the same cluster for ex.)
defer stopFeeds(db)

m := c.NewMonitor(ctx, c.Range(1, crdbNodes))
m.Go(func(ctx context.Context) error {
const iters, changefeeds = 5, 10
for i := 0; i < iters; i++ {
if i == 0 {
t.Status(fmt.Sprintf("setting performance baseline (<%s)", padDuration))
}
time.Sleep(padDuration) // each iteration lasts long enough to observe effects in metrics

t.Status(fmt.Sprintf("during: round %d: stopping extant changefeeds (<%s)", i, stopFeedsDuration))
stopFeeds(db)
time.Sleep(stopFeedsDuration) // buffer for cancellations to take effect/show up in metrics

t.Status(fmt.Sprintf("during: round %d: creating %d changefeeds (<%s)", i, changefeeds, time.Minute))
for j := 0; j < changefeeds; j++ {
stmtWithCursor := fmt.Sprintf(`
CREATE CHANGEFEED FOR tpcc.order_line, tpcc.stock, tpcc.customer
INTO 'null://' WITH cursor = '-%ds'
`, int64(float64(i+1)*padDuration.Seconds())) // scanning as far back as possible (~ when the workload started)
if _, err := db.ExecContext(ctx, stmtWithCursor); err != nil {
return err
}
}

// TODO(irfansharif): Add a version of this test
// with initial_scan = 'only' to demonstrate the
// need+efficacy of using elastic CPU control in
// changefeed workers. That too has a severe effect
// on scheduling latencies.
}
return nil
})

t.Status(fmt.Sprintf("waiting for workload to finish (<%s)", workloadDuration))
m.Wait()

return nil
},
})
},
})
}
3 changes: 3 additions & 0 deletions pkg/cmd/roachtest/tests/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio
var ep *tpccChaosEventProcessor
var promCfg *prometheus.Config
if !opts.DisablePrometheus {
// TODO(irfansharif): Move this after the import step. The statistics
// during import itself is uninteresting and pollutes actual workload
// data.
var cleanupFunc func()
promCfg, cleanupFunc = setupPrometheusForRoachtest(ctx, t, c, opts.PrometheusConfig, workloadInstances)
defer cleanupFunc()
Expand Down
40 changes: 20 additions & 20 deletions pkg/roachprod/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func Init(
// NB: when upgrading here, make sure to target a version that picks up this PR:
// https://github.com/prometheus/node_exporter/pull/2311
// At time of writing, there hasn't been a release in over half a year.
if err := c.RepeatRun(ctx, l, os.Stdout, os.Stderr, cfg.NodeExporter,
if err := c.RepeatRun(ctx, l, l.Stdout, l.Stderr, cfg.NodeExporter,
"download node exporter",
`
(sudo systemctl stop node_exporter || true) &&
Expand All @@ -214,7 +214,7 @@ rm -rf node_exporter && mkdir -p node_exporter && curl -fsSL \
}

// Start node_exporter.
if err := c.Run(ctx, l, os.Stdout, os.Stderr, cfg.NodeExporter, "init node exporter",
if err := c.Run(ctx, l, l.Stdout, l.Stderr, cfg.NodeExporter, "init node exporter",
`cd node_exporter &&
sudo systemd-run --unit node_exporter --same-dir ./node_exporter`,
); err != nil {
Expand All @@ -226,8 +226,8 @@ sudo systemd-run --unit node_exporter --same-dir ./node_exporter`,
if err := c.RepeatRun(
ctx,
l,
os.Stdout,
os.Stderr,
l.Stdout,
l.Stderr,
cfg.PrometheusNode,
"reset prometheus",
"sudo systemctl stop prometheus || echo 'no prometheus is running'",
Expand All @@ -238,8 +238,8 @@ sudo systemd-run --unit node_exporter --same-dir ./node_exporter`,
if err := c.RepeatRun(
ctx,
l,
os.Stdout,
os.Stderr,
l.Stdout,
l.Stderr,
cfg.PrometheusNode,
"download prometheus",
`sudo rm -rf /tmp/prometheus && mkdir /tmp/prometheus && cd /tmp/prometheus &&
Expand Down Expand Up @@ -272,8 +272,8 @@ sudo systemd-run --unit node_exporter --same-dir ./node_exporter`,
if err := c.Run(
ctx,
l,
os.Stdout,
os.Stderr,
l.Stdout,
l.Stderr,
cfg.PrometheusNode,
"start-prometheus",
`cd /tmp/prometheus &&
Expand All @@ -286,8 +286,8 @@ sudo systemd-run --unit prometheus --same-dir \
if cfg.Grafana.Enabled {
// Install Grafana.
if err := c.RepeatRun(ctx, l,
os.Stdout,
os.Stderr, cfg.PrometheusNode, "install grafana",
l.Stdout,
l.Stderr, cfg.PrometheusNode, "install grafana",
`sudo apt-get install -qqy apt-transport-https &&
sudo apt-get install -qqy software-properties-common wget &&
wget -q -O - https://packages.grafana.com/gpg.key | sudo apt-key add - &&
Expand All @@ -299,8 +299,8 @@ sudo apt-get update -qqy && sudo apt-get install -qqy grafana-enterprise && sudo

// Provision local prometheus instance as data source.
if err := c.RepeatRun(ctx, l,
os.Stdout,
os.Stderr, cfg.PrometheusNode, "permissions",
l.Stdout,
l.Stderr, cfg.PrometheusNode, "permissions",
`sudo chmod 777 /etc/grafana/provisioning/datasources /etc/grafana/provisioning/dashboards /var/lib/grafana/dashboards /etc/grafana/grafana.ini`,
); err != nil {
return nil, err
Expand Down Expand Up @@ -342,14 +342,14 @@ org_role = Admin

for idx, u := range cfg.Grafana.DashboardURLs {
cmd := fmt.Sprintf("curl -fsSL %s -o /var/lib/grafana/dashboards/%d.json", u, idx)
if err := c.Run(ctx, l, os.Stdout, os.Stderr, cfg.PrometheusNode, "download dashboard",
if err := c.Run(ctx, l, l.Stdout, l.Stderr, cfg.PrometheusNode, "download dashboard",
cmd); err != nil {
l.PrintfCtx(ctx, "failed to download dashboard from %s: %s", u, err)
}
}

// Start Grafana. Default port is 3000.
if err := c.Run(ctx, l, os.Stdout, os.Stderr, cfg.PrometheusNode, "start grafana",
if err := c.Run(ctx, l, l.Stdout, l.Stderr, cfg.PrometheusNode, "start grafana",
`sudo systemctl restart grafana-server`); err != nil {
return nil, err
}
Expand All @@ -371,8 +371,8 @@ func Snapshot(
if err := c.Run(
ctx,
l,
os.Stdout,
os.Stderr,
l.Stdout,
l.Stderr,
promNode,
"prometheus snapshot",
`sudo rm -rf /tmp/prometheus/data/snapshots/* && curl -XPOST http://localhost:9090/api/v1/admin/tsdb/snapshot &&
Expand Down Expand Up @@ -442,13 +442,13 @@ func Shutdown(
shutdownErr = errors.CombineErrors(shutdownErr, err)
}
}
if err := c.Run(ctx, l, os.Stdout, os.Stderr, nodes, "stop node exporter",
if err := c.Run(ctx, l, l.Stdout, l.Stderr, nodes, "stop node exporter",
`sudo systemctl stop node_exporter || echo 'Stopped node exporter'`); err != nil {
l.Printf("Failed to stop node exporter: %v", err)
shutdownErr = errors.CombineErrors(shutdownErr, err)
}

if err := c.Run(ctx, l, os.Stdout, os.Stderr, promNode, "stop grafana",
if err := c.Run(ctx, l, l.Stdout, l.Stderr, promNode, "stop grafana",
`sudo systemctl stop grafana-server || echo 'Stopped grafana'`); err != nil {
l.Printf("Failed to stop grafana server: %v", err)
shutdownErr = errors.CombineErrors(shutdownErr, err)
Expand All @@ -457,8 +457,8 @@ func Shutdown(
if err := c.RepeatRun(
ctx,
l,
os.Stdout,
os.Stderr,
l.Stdout,
l.Stderr,
promNode,
"stop prometheus",
"sudo systemctl stop prometheus || echo 'Stopped prometheus'",
Expand Down