Skip to content

Commit

Permalink
Merge #39167
Browse files Browse the repository at this point in the history
39167: roachtest: add tpccoverload to exercise a node-liveness failure scenario r=nvanbenschoten a=ajwerner

This roachtest combines the tpcc dataset with a contrived query to highlight
a scenario in which node liveness is currently starved.

This test is skipped as it currently fails on master. It is important to note
that after separating the TCP connection used for node liveness from the rest
of traffic (#39172) this test passes reliably. 

Release note: None

Co-authored-by: Andrew Werner <ajwerner@cockroachlabs.com>
  • Loading branch information
craig[bot] and ajwerner committed Aug 2, 2019
2 parents 59d8234 + cf37e12 commit 66be8df
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 14 deletions.
122 changes: 122 additions & 0 deletions pkg/cmd/roachtest/overload_tpcc_olap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package main

import (
"context"
"fmt"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// tpccOlapQuery is a contrived query that seems to do serious damage to a
// cluster. The query itself is a hash join with a selective filter and a
// limited sort.
const tpccOlapQuery = `SELECT
i_id, s_w_id, s_quantity, i_price
FROM
stock JOIN item ON s_i_id = i_id
WHERE
s_quantity < 100 AND i_price > 90
ORDER BY
i_price DESC, s_quantity ASC
LIMIT
100;`

type tpccOLAPSpec struct {
Nodes int
CPUs int
Warehouses int
Concurrency int
}

func (s tpccOLAPSpec) run(ctx context.Context, t *test, c *cluster) {
crdbNodes, workloadNode := setupTPCC(ctx, t, c, s.Warehouses, false /* zfs */, nil /* versions */)
const queryFileName = "queries.sql"
// querybench expects the entire query to be on a single line.
queryLine := `"` + strings.Replace(tpccOlapQuery, "\n", " ", -1) + `"`
c.Run(ctx, workloadNode, "echo", queryLine, "> "+queryFileName)
t.Status("waiting")
m := newMonitor(ctx, c, crdbNodes)
rampDuration := time.Minute
duration := 2 * time.Minute
m.Go(func(ctx context.Context) error {
t.WorkerStatus("running querybench")
cmd := fmt.Sprintf(
"./workload run querybench --db tpcc"+
" --tolerate-errors=t"+
" --concurrency=%d"+
" --query-file %s"+
" --histograms="+perfArtifactsDir+"/stats.json "+
" --ramp=%s --duration=%s {pgurl:1-%d}",
s.Warehouses, queryFileName, rampDuration, duration, c.spec.NodeCount-1)
c.Run(ctx, workloadNode, cmd)
return nil
})
m.Wait()
verifyNodeLiveness(ctx, c, t, duration)
}

// Check that node liveness did not fail more than maxFailures times across
// all of the nodes.
func verifyNodeLiveness(ctx context.Context, c *cluster, t *test, runDuration time.Duration) {
const maxFailures = 10
adminURLs := c.ExternalAdminUIAddr(ctx, c.Node(1))
now := timeutil.Now()
response := getMetrics(t, adminURLs[0], now.Add(-runDuration), now, []tsQuery{
{
name: "cr.node.liveness.heartbeatfailures",
queryType: total,
},
})
if len(response.Results[0].Datapoints) <= 1 {
t.Fatalf("not enough datapoints in timeseries query response: %+v", response)
}
datapoints := response.Results[0].Datapoints
finalCount := datapoints[len(datapoints)-1].Value
initialCount := datapoints[0].Value
if failures := finalCount - initialCount; failures > maxFailures {
t.Fatalf("Node liveness failed %d times, expected no more than %d",
failures, maxFailures)
} else {
t.logger().Printf("Node liveness failed %d times which is fewer than %d",
failures, maxFailures)
}
}

func registerTPCCOverloadSpec(r *testRegistry, s tpccOLAPSpec) {
name := fmt.Sprintf("overload/tpcc_olap/nodes=%d/cpu=%d/w=%d/c=%d",
s.Nodes, s.CPUs, s.Warehouses, s.Concurrency)
r.Add(testSpec{
Name: name,
Cluster: makeClusterSpec(s.Nodes+1, cpu(s.CPUs)),
Run: s.run,
MinVersion: "v19.2.0",
Timeout: 20 * time.Minute,
Skip: "fails due to node liveness starvation",
})
}

func registerOverload(r *testRegistry) {
specs := []tpccOLAPSpec{
{
CPUs: 16,
Concurrency: 256,
Nodes: 3,
Warehouses: 100,
},
}
for _, s := range specs {
registerTPCCOverloadSpec(r, s)
}
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func registerTests(r *testRegistry) {
registerVersion(r)
registerYCSB(r)
registerTPCHBench(r)
registerOverload(r)
}

func registerBenchmarks(r *testRegistry) {
Expand Down
38 changes: 24 additions & 14 deletions pkg/cmd/roachtest/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,25 +93,24 @@ func tpccFixturesCmd(t *test, cloud string, warehouses int, extraArgs string) st
command, warehouses, extraArgs)
}

func runTPCC(ctx context.Context, t *test, c *cluster, opts tpccOptions) {
crdbNodes := c.Range(1, c.spec.NodeCount-1)
workloadNode := c.Node(c.spec.NodeCount)
rampDuration := 5 * time.Minute
func setupTPCC(
ctx context.Context, t *test, c *cluster, warehouses int, zfs bool, versions []string,
) (crdbNodes, workloadNode nodeListOption) {
crdbNodes = c.Range(1, c.spec.NodeCount-1)
workloadNode = c.Node(c.spec.NodeCount)
if c.isLocal() {
opts.Warehouses = 1
opts.Duration = 1 * time.Minute
rampDuration = 30 * time.Second
warehouses = 1
}

if n := len(opts.Versions); n == 0 {
opts.Versions = make([]string, c.spec.NodeCount-1)
if n := len(versions); n == 0 {
versions = make([]string, c.spec.NodeCount-1)
} else if n != c.spec.NodeCount-1 {
t.Fatalf("must specify Versions for all %d nodes: %v", c.spec.NodeCount-1, opts.Versions)
t.Fatalf("must specify Versions for all %d nodes: %v", c.spec.NodeCount-1, versions)
}

{
var regularNodes []option
for i, v := range opts.Versions {
for i, v := range versions {
if v == "" {
regularNodes = append(regularNodes, c.Node(i+1))
} else {
Expand Down Expand Up @@ -139,7 +138,7 @@ func runTPCC(ctx context.Context, t *test, c *cluster, opts tpccOptions) {
func() {
db := c.Conn(ctx, 1)
defer db.Close()
if opts.ZFS {
if zfs {
if err := c.RunE(ctx, c.Node(1), "test -d /mnt/data1/.zfs/snapshot/pristine"); err != nil {
// Use ZFS so the initial store dumps can be instantly rolled back to their
// pristine state. Useful for iterating quickly on the test, especially when
Expand All @@ -149,7 +148,7 @@ func runTPCC(ctx context.Context, t *test, c *cluster, opts tpccOptions) {
t.Status("loading dataset")
c.Start(ctx, t, crdbNodes, startArgsDontEncrypt)

c.Run(ctx, workloadNode, tpccFixturesCmd(t, cloud, opts.Warehouses, ""))
c.Run(ctx, workloadNode, tpccFixturesCmd(t, cloud, warehouses, ""))
c.Stop(ctx, crdbNodes)

c.Run(ctx, crdbNodes, "test -e /sbin/zfs && sudo zfs snapshot data1@pristine")
Expand All @@ -159,9 +158,20 @@ func runTPCC(ctx context.Context, t *test, c *cluster, opts tpccOptions) {
c.Start(ctx, t, crdbNodes, startArgsDontEncrypt)
} else {
c.Start(ctx, t, crdbNodes, startArgsDontEncrypt)
c.Run(ctx, workloadNode, tpccFixturesCmd(t, cloud, opts.Warehouses, ""))
c.Run(ctx, workloadNode, tpccFixturesCmd(t, cloud, warehouses, ""))
}
}()
return crdbNodes, workloadNode
}

func runTPCC(ctx context.Context, t *test, c *cluster, opts tpccOptions) {
rampDuration := 5 * time.Minute
if c.isLocal() {
opts.Warehouses = 1
opts.Duration = time.Minute
rampDuration = 30 * time.Second
}
crdbNodes, workloadNode := setupTPCC(ctx, t, c, opts.Warehouses, opts.ZFS, opts.Versions)
t.Status("waiting")
m := newMonitor(ctx, c, crdbNodes)
m.Go(func(ctx context.Context) error {
Expand Down

0 comments on commit 66be8df

Please sign in to comment.