Skip to content

Commit

Permalink
Make testserver multi-node more robust.
Browse files Browse the repository at this point in the history
- Fix race condition in pollListeningURLFile.
- Add Opts for init node timeout and poll listening url.
  - InitTimeoutOpt, PollListenURLTimeoutOpt
- Parallelize restart test
  • Loading branch information
RichardJCai committed Sep 21, 2022
1 parent f1b2e37 commit a98fbfc
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 26 deletions.
80 changes: 60 additions & 20 deletions testserver/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ const (
// By default, we allocate 20% of available memory to the test server.
const defaultStoreMemSize = 0.2

const defaultInitTimeout = 60
const defaultPollListenURLTimeout = 60

const testserverMessagePrefix = "cockroach-go testserver"
const tenantserverMessagePrefix = "cockroach-go tenantserver"

Expand Down Expand Up @@ -212,19 +215,21 @@ type TestConfig struct {
}

type testServerArgs struct {
secure bool
rootPW string // if nonempty, set as pw for root
storeOnDisk bool // to save database in disk
storeMemSize float64 // the proportion of available memory allocated to test server
httpPorts []int
listenAddrPorts []int
testConfig TestConfig
nonStableDB bool
customVersion string // custom cockroach version to use
cockroachBinary string // path to cockroach executable file
upgradeCockroachBinary string // path to cockroach binary for upgrade
numNodes int
externalIODir string
secure bool
rootPW string // if nonempty, set as pw for root
storeOnDisk bool // to save database in disk
storeMemSize float64 // the proportion of available memory allocated to test server
httpPorts []int
listenAddrPorts []int
testConfig TestConfig
nonStableDB bool
customVersion string // custom cockroach version to use
cockroachBinary string // path to cockroach executable file
upgradeCockroachBinary string // path to cockroach binary for upgrade
numNodes int
externalIODir string
initTimeoutSeconds int
pollListenURLTimeoutSeconds int
}

// CockroachBinaryPathOpt is a TestServer option that can be passed to
Expand Down Expand Up @@ -341,14 +346,25 @@ func ThreeNodeOpt() TestServerOpt {
}
}

// ExternalIODirOpt is a TestServer option that can be passed to NewTestServer to
// specify the external IO directory to be used for the cluster.
func ExternalIODirOpt(ioDir string) TestServerOpt {
return func(args *testServerArgs) {
args.externalIODir = ioDir
}
}

func InitTimeoutOpt(timeout int) TestServerOpt {
return func(args *testServerArgs) {
args.initTimeoutSeconds = timeout
}
}

func PollListenURLTimeoutOpt(timeout int) TestServerOpt {
return func(args *testServerArgs) {
args.pollListenURLTimeoutSeconds = timeout
}
}

const (
logsDirName = "logs"
certsDirName = "certs"
Expand All @@ -365,6 +381,8 @@ var errStoppedInMiddle = errors.New("download stopped in middle")
func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
serverArgs := &testServerArgs{numNodes: 1}
serverArgs.storeMemSize = defaultStoreMemSize
serverArgs.initTimeoutSeconds = defaultInitTimeout
serverArgs.pollListenURLTimeoutSeconds = defaultPollListenURLTimeout
for _, applyOptToArgs := range opts {
applyOptToArgs(serverArgs)
}
Expand Down Expand Up @@ -620,6 +638,9 @@ func (ts *testServerImpl) PGURLForNode(nodeNum int) *url.URL {
}

func (ts *testServerImpl) setPGURLForNode(nodeNum int, u *url.URL) {
if u.String() == "" {
panic("empty conn str")
}
ts.pgURL[nodeNum].u = u
close(ts.pgURL[nodeNum].set)
}
Expand All @@ -632,7 +653,7 @@ func (ts *testServerImpl) WaitForInitFinishForNode(nodeNum int) error {
defer func() {
_ = db.Close()
}()
for i := 0; i < 100; i++ {
for i := 0; i < ts.serverArgs.initTimeoutSeconds*10; i++ {
if err = db.Ping(); err == nil {
return nil
}
Expand All @@ -649,22 +670,23 @@ func (ts *testServerImpl) WaitForInit() error {

func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error {
var data []byte
for {
for i := 0; i < ts.serverArgs.pollListenURLTimeoutSeconds*10; i++ {
ts.mu.RLock()
state := ts.nodes[nodeNum].state
ts.mu.RUnlock()
if state != stateRunning {
return fmt.Errorf("server stopped or crashed before listening URL file was available")
}

var err error
data, err = ioutil.ReadFile(ts.nodes[nodeNum].listeningURLFile)
if err == nil {
if len(data) == 0 {
time.Sleep(100 * time.Millisecond)
continue
} else if err == nil {
break
} else if !os.IsNotExist(err) {
return fmt.Errorf("unexpected error while reading listening URL file: %w", err)
}
time.Sleep(100 * time.Millisecond)
}

u, err := url.Parse(string(bytes.TrimSpace(data)))
Expand Down Expand Up @@ -765,15 +787,33 @@ func (ts *testServerImpl) Stop() {
}

ts.serverState = stateStopped
for _, node := range ts.nodes {
for i, node := range ts.nodes {
cmd := node.startCmd
if cmd.Process != nil {
_ = cmd.Process.Kill()
}

if node.state != stateFailed {
node.state = stateStopped
}

if node.state != stateStopped {
ts.serverState = stateFailed
}

// RUnlock such that StopNode can Lock and Unlock.
ts.mu.RUnlock()
err := ts.StopNode(i)
if err != nil {
log.Printf("error stopping node %d: %s", i, err)
}
ts.mu.RLock()

nodeDir := fmt.Sprintf("%s%d", ts.baseDir, i)
if err := os.RemoveAll(nodeDir); err != nil {
log.Printf("error deleting tmp directory %s for node: %s", nodeDir, err)
}

}

// Only cleanup on intentional stops.
Expand Down
93 changes: 88 additions & 5 deletions testserver/testserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"
"log"
"net"
http "net/http"
"os"
"os/exec"
Expand Down Expand Up @@ -401,20 +402,102 @@ func TestFlockOnDownloadedCRDB(t *testing.T) {
}
}

func TestRestartNode(t *testing.T) {
func getFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", ":0")
if err != nil {
return 0, err
}

l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
port := l.Addr().(*net.TCPAddr).Port
if err != nil {
return 0, err
}

err = l.Close()
if err != nil {
return 0, err
}

return port, err
}

func TestRestartNodeParallel(t *testing.T) {
require.NoError(t, os.Mkdir("./temp_binaries", 0755))
defer func() {
require.NoError(t, os.RemoveAll("./temp_binaries"))
}()
var fileName string
switch runtime.GOOS {
case "darwin":
fileName = fmt.Sprintf("cockroach-%s.darwin-10.9-amd64", "v22.1.6")
require.NoError(t, downloadBinaryTest(
fmt.Sprintf("./temp_binaries/%s.tgz", fileName),
fmt.Sprintf("https://binaries.cockroachdb.com/%s.tgz", fileName)))
case "linux":
fileName = fmt.Sprintf("cockroach-%s.linux-amd64", "v22.1.6")
require.NoError(t, downloadBinaryTest(
fmt.Sprintf("./temp_binaries/%s.tgz", fileName),
fmt.Sprintf("https://binaries.cockroachdb.com/%s.tgz", fileName)))
default:
t.Fatalf("unsupported os for test: %s", runtime.GOOS)
}

tarCmd := exec.Command("tar", "-zxvf", fmt.Sprintf("./temp_binaries/%s.tgz", fileName), "-C", "./temp_binaries")
require.NoError(t, tarCmd.Start())
require.NoError(t, tarCmd.Wait())

mu := sync.Mutex{}
usedPorts := make(map[int]struct{})
const ParallelExecs = 10
var wg sync.WaitGroup
wg.Add(ParallelExecs)
for i := 0; i < ParallelExecs; i++ {
go func() {
ports := make([]int, 3)
for j := 0; j < 3; j++ {
for {
port, err := getFreePort()
require.NoError(t, err)
mu.Lock()
_, found := usedPorts[port]
if !found {
usedPorts[port] = struct{}{}
}
ports[j] = port
mu.Unlock()
if !found {
break
}
}
}
testRestartNode(t, ports, "temp_binaries/"+fileName+"/cockroach")
wg.Done()
}()
}
wg.Wait()
}

func testRestartNode(t *testing.T, ports []int, binaryPath string) {
const pollListenURLTimeout = 150
ts, err := testserver.NewTestServer(
testserver.ThreeNodeOpt(),
testserver.StoreOnDiskOpt(),
testserver.AddListenAddrPortOpt(26257),
testserver.AddListenAddrPortOpt(26258),
testserver.AddListenAddrPortOpt(26259))
testserver.AddListenAddrPortOpt(ports[0]),
testserver.AddListenAddrPortOpt(ports[1]),
testserver.AddListenAddrPortOpt(ports[2]),
testserver.CockroachBinaryPathOpt(binaryPath),
testserver.PollListenURLTimeoutOpt(pollListenURLTimeout))
require.NoError(t, err)
defer ts.Stop()
for i := 0; i < 3; i++ {
require.NoError(t, ts.WaitForInitFinishForNode(i))
}

log.Printf("Stopping Node 2")
log.Printf("Stopping Node 0")
require.NoError(t, ts.StopNode(0))
for i := 1; i < 3; i++ {
url := ts.PGURLForNode(i)
Expand Down
1 change: 0 additions & 1 deletion testserver/testservernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func (ts *testServerImpl) StopNode(nodeNum int) error {
ts.mu.Lock()
ts.nodes[nodeNum].state = stateStopped
ts.mu.Unlock()
ts.pgURL[nodeNum].u = nil
cmd := ts.nodes[nodeNum].startCmd

// Kill the process.
Expand Down

0 comments on commit a98fbfc

Please sign in to comment.