Skip to content

Commit

Permalink
Merge pull request #22795 from petermattis/pmattis/roachtest-existing…
Browse files Browse the repository at this point in the history
…-cluster

roachtest: add -c/--cluster for running a test on an existing cluster
  • Loading branch information
petermattis authored Feb 18, 2018
2 parents f77ecee + c448e2f commit f9f3d43
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 50 deletions.
108 changes: 76 additions & 32 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand All @@ -39,12 +40,13 @@ import (
)

var (
local bool
artifacts string
cockroach string
workload string
clusterID string
username = os.Getenv("ROACHPROD_USER")
local bool
artifacts string
cockroach string
workload string
clusterName string
clusterID string
username = os.Getenv("ROACHPROD_USER")
)

func ifLocal(trueVal, falseVal string) string {
Expand Down Expand Up @@ -101,17 +103,19 @@ func initBinaries() {
}
}

var clusters = map[string]struct{}{}
var clusters = map[*cluster]struct{}{}
var clustersMu syncutil.Mutex
var clustersOnce sync.Once
var interrupted int32

func registerCluster(clusterName string) {
func registerCluster(c *cluster) {
clustersOnce.Do(func() {
go func() {
// Shut down test clusters when interrupted (for example CTRL+C).
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
<-sig
atomic.StoreInt32(&interrupted, 1)

// Fire off a goroutine to destroy all of the clusters.
done := make(chan struct{})
Expand All @@ -121,13 +125,11 @@ func registerCluster(clusterName string) {
var wg sync.WaitGroup
clustersMu.Lock()
wg.Add(len(clusters))
for name := range clusters {
go func(name string) {
cmd := exec.Command("roachprod", "destroy", name)
if err := cmd.Run(); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}(name)
for c := range clusters {
go func(c *cluster) {
defer wg.Done()
c.destroy(context.Background())
}(c)
}
clustersMu.Unlock()

Expand All @@ -144,7 +146,7 @@ func registerCluster(clusterName string) {
})

clustersMu.Lock()
clusters[clusterName] = struct{}{}
clusters[c] = struct{}{}
clustersMu.Unlock()
}

Expand All @@ -160,6 +162,9 @@ func execCmd(ctx context.Context, l *logger, args ...string) error {
}

func makeClusterName(t testI) string {
if clusterName != "" {
return clusterName
}
if local {
return "local"
}
Expand Down Expand Up @@ -273,6 +278,10 @@ type cluster struct {
// duration. The default lifetime of 12h is too long for some tests and will be
// too short for others.
func newCluster(ctx context.Context, t testI, nodes int, args ...interface{}) *cluster {
if atomic.LoadInt32(&interrupted) == 1 {
t.Fatal("interrupted")
}

l, err := rootLogger(t.Name())
if err != nil {
t.Fatal(err)
Expand All @@ -285,21 +294,30 @@ func newCluster(ctx context.Context, t testI, nodes int, args ...interface{}) *c
t: t,
l: l,
}
registerCluster(c.name)

if impl, ok := t.(*test); ok {
c.status = impl.Status
}
registerCluster(c)

sargs := []string{"roachprod", "create", c.name, "-n", fmt.Sprint(nodes)}
for _, arg := range args {
sargs = append(sargs, fmt.Sprint(arg))
}
if c.name != clusterName {
sargs := []string{"roachprod", "create", c.name, "-n", fmt.Sprint(nodes)}
for _, arg := range args {
sargs = append(sargs, fmt.Sprint(arg))
}

c.status("creating cluster")
if err := execCmd(ctx, l, sargs...); err != nil {
t.Fatal(err)
return nil
c.status("creating cluster")
if err := execCmd(ctx, l, sargs...); err != nil {
t.Fatal(err)
return nil
}
} else {
// NB: if the existing cluster is not as large as the desired cluster, the
// test will fail when trying to perform various operations such as putting
// binaries or starting the cockroach nodes.
c.status("wiping cluster")
if err := execCmd(ctx, c.l, "roachprod", "wipe", c.name); err != nil {
c.l.errorf("%s", err)
}
}
return c
}
Expand Down Expand Up @@ -334,26 +352,40 @@ func (c *cluster) Destroy(ctx context.Context) {
_ = execCmd(ctx, c.l, "roachprod", "get", c.name, "logs",
filepath.Join(artifacts, c.t.Name(), "logs"))
}
c.status("destroying cluster")
if err := execCmd(ctx, c.l, "roachprod", "destroy", c.name); err != nil {
c.l.errorf("%s", err)
c.destroy(ctx)
}

func (c *cluster) destroy(ctx context.Context) {
if c.name != clusterName {
c.status("destroying cluster")
if err := execCmd(ctx, c.l, "roachprod", "destroy", c.name); err != nil {
c.l.errorf("%s", err)
}
} else {
c.status("wiping cluster")
if err := execCmd(ctx, c.l, "roachprod", "wipe", c.name); err != nil {
c.l.errorf("%s", err)
}
}

clustersMu.Lock()
delete(clusters, c.name)
delete(clusters, c)
clustersMu.Unlock()

c.l.close()
}

// Put a local file to all of the machines in a cluster.
func (c *cluster) Put(ctx context.Context, src, dest string) {
func (c *cluster) Put(ctx context.Context, src, dest string, opts ...option) {
if c.t.Failed() {
// If the test has failed, don't try to limp along.
return
}
if atomic.LoadInt32(&interrupted) == 1 {
c.t.Fatal("interrupted")
}
c.status("uploading binary")
err := execCmd(ctx, c.l, "roachprod", "put", c.name, src, dest)
err := execCmd(ctx, c.l, "roachprod", "put", c.makeNodes(opts), src, dest)
if err != nil {
c.t.Fatal(err)
}
Expand All @@ -367,6 +399,9 @@ func (c *cluster) Start(ctx context.Context, opts ...option) {
// If the test has failed, don't try to limp along.
return
}
if atomic.LoadInt32(&interrupted) == 1 {
c.t.Fatal("interrupted")
}
c.status("starting cluster")
err := execCmd(ctx, c.l, "roachprod", "start", c.makeNodes(opts))
if err != nil {
Expand All @@ -381,6 +416,9 @@ func (c *cluster) Stop(ctx context.Context, opts ...option) {
// If the test has failed, don't try to limp along.
return
}
if atomic.LoadInt32(&interrupted) == 1 {
c.t.Fatal("interrupted")
}
c.status("stopping cluster")
err := execCmd(ctx, c.l, "roachprod", "stop", c.makeNodes(opts))
if err != nil {
Expand All @@ -395,6 +433,9 @@ func (c *cluster) Wipe(ctx context.Context, opts ...option) {
// If the test has failed, don't try to limp along.
return
}
if atomic.LoadInt32(&interrupted) == 1 {
c.t.Fatal("interrupted")
}
c.status("wiping cluster")
err := execCmd(ctx, c.l, "roachprod", "wipe", c.makeNodes(opts))
if err != nil {
Expand All @@ -418,6 +459,9 @@ func (c *cluster) RunL(ctx context.Context, l *logger, node int, args ...string)
// If the test has failed, don't try to limp along.
return
}
if atomic.LoadInt32(&interrupted) == 1 {
c.t.Fatal("interrupted")
}
err := execCmd(ctx, l,
append([]string{"roachprod", "ssh", fmt.Sprintf("%s:%d", c.name, node), "--"}, args...)...)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func init() {
c := newCluster(ctx, t, nodes)
defer c.Destroy(ctx)

c.Put(ctx, cockroach, "./cockroach")
c.Put(ctx, workload, "./workload")
c.Put(ctx, cockroach, "./cockroach", c.Range(1, nodes))
c.Put(ctx, workload, "./workload", c.Node(1))
t.Status("starting csv servers")
for node := 1; node <= nodes; node++ {
c.Run(ctx, node, `./workload csv-server --port=8081 &> /dev/null < /dev/null &`)
Expand Down
31 changes: 19 additions & 12 deletions pkg/cmd/roachtest/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ import (
func init() {
runKV := func(t *test, percent, nodes int) {
ctx := context.Background()
c := newCluster(ctx, t, nodes+1)
c := newCluster(ctx, t, nodes+1, "--local-ssd")
defer c.Destroy(ctx)

c.Put(ctx, cockroach, "./cockroach")
c.Put(ctx, workload, "./workload")
c.Put(ctx, cockroach, "./cockroach", c.Range(1, nodes))
c.Put(ctx, workload, "./workload", c.Node(nodes+1))
c.Start(ctx, c.Range(1, nodes))

t.Status("running workload")
m := newMonitor(ctx, c, c.Range(1, nodes))
m.Go(func(ctx context.Context) error {
concurrency := ifLocal("", " --concurrency=384")
concurrency := ifLocal("", " --concurrency="+fmt.Sprint(nodes*64))
duration := " --duration=" + ifLocal("10s", "10m")
cmd := fmt.Sprintf(
"./workload run kv --init --read-percent=%d --splits=1000"+
Expand Down Expand Up @@ -62,17 +62,17 @@ func init() {
func init() {
runSplits := func(t *test, nodes int) {
ctx := context.Background()
c := newCluster(ctx, t, nodes+1)
c := newCluster(ctx, t, nodes+1, "--local-ssd")
defer c.Destroy(ctx)

c.Put(ctx, cockroach, "./cockroach")
c.Put(ctx, workload, "./workload")
c.Put(ctx, cockroach, "./cockroach", c.Range(1, nodes))
c.Put(ctx, workload, "./workload", c.Node(nodes+1))
c.Start(ctx, c.Range(1, nodes))

t.Status("running workload")
m := newMonitor(ctx, c, c.Range(1, nodes))
m.Go(func(ctx context.Context) error {
concurrency := ifLocal("", " --concurrency=384")
concurrency := ifLocal("", " --concurrency="+fmt.Sprint(nodes*64))
splits := " --splits=" + ifLocal("2000", "500000")
cmd := fmt.Sprintf(
"./workload run kv --init --max-ops=1"+
Expand All @@ -96,7 +96,7 @@ func init() {
c := newCluster(ctx, t, nodes+1, "--local-ssd", "--machine-type", "n1-highcpu-8")
defer c.Destroy(ctx)

if !local {
if !c.isLocal() {
var wg sync.WaitGroup
wg.Add(nodes)
for i := 1; i <= 6; i++ {
Expand All @@ -111,8 +111,8 @@ func init() {
wg.Wait()
}

c.Put(ctx, cockroach, "./cockroach")
c.Put(ctx, workload, "./workload")
c.Put(ctx, cockroach, "./cockroach", c.Range(1, nodes))
c.Put(ctx, workload, "./workload", c.Node(nodes+1))

const maxPerNodeConcurrency = 64
for i := nodes; i <= nodes*maxPerNodeConcurrency; i += nodes {
Expand All @@ -126,7 +126,14 @@ func init() {
"--splits=1000 --duration=1m "+fmt.Sprintf("--concurrency=%d", i)+
" {pgurl:1-%d}",
percent, nodes)
c.Run(ctx, nodes+1, cmd)

l, err := c.l.childLogger(fmt.Sprint(i))
if err != nil {
t.Fatal(err)
}
defer l.close()

c.RunL(ctx, l, nodes+1, cmd)
return nil
})
m.Wait()
Expand Down
7 changes: 7 additions & 0 deletions pkg/cmd/roachtest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package main

import (
"fmt"
"os"
"strings"

Expand All @@ -40,6 +41,10 @@ func main() {
` + strings.Join(allTests(), "\n\t") + `
`,
RunE: func(_ *cobra.Command, args []string) error {
if clusterName != "" && local {
return fmt.Errorf("cannot specify both an existing cluster (%s) and --local", clusterName)
}

initBinaries()
os.Exit(tests.Run(args))
return nil
Expand All @@ -56,6 +61,8 @@ func main() {
&parallelism, "parallelism", "p", parallelism, "number of tests to run in parallel")
runCmd.Flags().StringVar(
&artifacts, "artifacts", "artifacts", "path to artifacts directory")
runCmd.Flags().StringVarP(
&clusterName, "cluster", "c", "", "name of an existing cluster to use for running tests")
runCmd.Flags().StringVar(
&clusterID, "cluster-id", "", "an identifier to use in the test cluster's name")
runCmd.Flags().StringVar(
Expand Down
10 changes: 8 additions & 2 deletions pkg/cmd/roachtest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,14 @@ func (r *registry) Run(filter []string) int {
wg := &sync.WaitGroup{}
wg.Add(len(tests))

// We can't run local tests in parallel as there is only 1 "local" cluster.
if local {
// If we're running against an existing "local" cluster, force the local flag
// to true in order to get the "local" test configurations.
if clusterName == "local" {
local = true
}
// We can't run tests in parallel on local clusters or on an existing
// cluster.
if local || clusterName != "" {
parallelism = 1
}
// Limit the parallelism to the number of tests. The primary effect this has
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func init() {
c := newCluster(ctx, t, nodes+1)
defer c.Destroy(ctx)

c.Put(ctx, cockroach, "./cockroach")
c.Put(ctx, workload, "./workload")
c.Put(ctx, cockroach, "./cockroach", c.Range(1, nodes))
c.Put(ctx, workload, "./workload", c.Node(nodes+1))
c.Start(ctx, c.Range(1, nodes))

t.Status("running workload")
Expand Down

0 comments on commit f9f3d43

Please sign in to comment.