Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Playground: support setting custom timeout for waiting instances up #968

Merged
merged 10 commits into from
Dec 8, 2020
1 change: 1 addition & 0 deletions components/playground/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Config struct {
BinPath string
Num int
Host string
UpTimeout int
}

type instance struct {
Expand Down
68 changes: 34 additions & 34 deletions components/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/tiup/components/playground/instance"
"github.com/pingcap/tiup/pkg/cliutil/progress"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/environment"
"github.com/pingcap/tiup/pkg/localdata"
Expand Down Expand Up @@ -79,7 +78,8 @@ func installIfMissing(profile *localdata.Profile, component, version string) err
func execute() error {
opt := &bootOptions{
tidb: instance.Config{
Num: 1,
Num: 1,
UpTimeout: 60,
},
tikv: instance.Config{
Num: 1,
Expand All @@ -88,7 +88,8 @@ func execute() error {
Num: 1,
},
tiflash: instance.Config{
Num: 1,
Num: 1,
UpTimeout: 120,
},
host: "127.0.0.1",
monitor: true,
Expand Down Expand Up @@ -203,6 +204,9 @@ Examples:
rootCmd.Flags().IntVarP(&opt.pump.Num, "pump", "", opt.pump.Num, "Pump instance number")
rootCmd.Flags().IntVarP(&opt.drainer.Num, "drainer", "", opt.drainer.Num, "Drainer instance number")

rootCmd.Flags().IntVarP(&opt.tidb.UpTimeout, "db.timeout", "", opt.tidb.UpTimeout, "TiDB max wait time in seconds for starting, 0 means no limit")
rootCmd.Flags().IntVarP(&opt.tiflash.UpTimeout, "tiflash.timeout", "", opt.tiflash.UpTimeout, "TiFlash max wait time in seconds for starting, 0 means no limit")

rootCmd.Flags().StringVarP(&opt.host, "host", "", opt.host, "Playground cluster host")
rootCmd.Flags().StringVarP(&opt.tidb.Host, "db.host", "", opt.tidb.Host, "Playground TiDB host. If not provided, TiDB will still use `host` flag as its host")
rootCmd.Flags().StringVarP(&opt.pd.Host, "pd.host", "", opt.pd.Host, "Playground PD host. If not provided, PD will still use `host` flag as its host")
Expand Down Expand Up @@ -246,47 +250,43 @@ func tryConnect(dsn string) error {
return nil
}

// checkDB check if the addr is connectable by getting a connection from sql.DB.
func checkDB(dbAddr string) bool {
// checkDB check if the addr is connectable by getting a connection from sql.DB. timeout <=0 means no timeout
func checkDB(dbAddr string, timeout int) bool {
dsn := fmt.Sprintf("root:@tcp(%s)/", dbAddr)
for i := 0; i < 60; i++ {
if err := tryConnect(dsn); err != nil {
time.Sleep(time.Second)
} else {
if i != 0 {
fmt.Println()
if timeout > 0 {
for i := 0; i < timeout; i++ {
if tryConnect(dsn) == nil {
return true
}
time.Sleep(time.Second)
}
return false
}
for {
if err := tryConnect(dsn); err == nil {
return true
}
time.Sleep(time.Second)
}
return false
}

func checkStoreStatus(pdClient *api.PDClient, typ, storeAddr string) error {
prefix := color.YellowString("Waiting for %s %s ready ", typ, storeAddr)
bar := progress.NewSingleBar(prefix)
unbyte marked this conversation as resolved.
Show resolved Hide resolved
bar.StartRenderLoop()
defer bar.StopRenderLoop()

for i := 0; i < 180; i++ {
up, err := pdClient.IsUp(storeAddr)
if err != nil || !up {
// checkStoreStatus uses pd client to check whether a store is up. timeout <= 0 means no timeout
func checkStoreStatus(pdClient *api.PDClient, storeAddr string, timeout int) bool {
if timeout > 0 {
for i := 0; i < timeout; i++ {
if up, err := pdClient.IsUp(storeAddr); err == nil && up {
return true
}
time.Sleep(time.Second)
} else {
bar.UpdateDisplay(&progress.DisplayProps{
Prefix: prefix,
Mode: progress.ModeDone,
})
return nil
}
return false
}
for {
if up, err := pdClient.IsUp(storeAddr); err == nil && up {
return true
}
time.Sleep(time.Second)
}

bar.UpdateDisplay(&progress.DisplayProps{
Prefix: prefix,
Mode: progress.ModeError,
})

return errors.Errorf(fmt.Sprintf("store %s failed to up after timeout(180s)", storeAddr))
}

func hasDashboard(pdAddr string) bool {
Expand Down
122 changes: 71 additions & 51 deletions components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/pingcap/tiup/pkg/cliutil/progress"
"io"
"io/ioutil"
"net/http"
Expand All @@ -26,6 +27,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"text/tabwriter"
Expand All @@ -35,7 +37,6 @@ import (
"github.com/fatih/color"
"github.com/pingcap/errors"
"github.com/pingcap/tiup/components/playground/instance"
"github.com/pingcap/tiup/pkg/cliutil/progress"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/environment"
pkgver "github.com/pingcap/tiup/pkg/repository/version"
Expand Down Expand Up @@ -407,7 +408,7 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e
case "drainer":
return p.sanitizeConfig(p.bootOptions.drainer, cfg)
default:
return fmt.Errorf("unknow %s in sanitizeConfig", cid)
return fmt.Errorf("unknown %s in sanitizeConfig", cid)
}
}

Expand Down Expand Up @@ -785,68 +786,87 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
p.booted = true

var succ []string
for _, db := range p.tidbs {
prefix := color.YellowString("Waiting for tidb %s ready ", db.Addr())
bar := progress.NewSingleBar(prefix)
bar.StartRenderLoop()
if s := checkDB(db.Addr()); s {
succ = append(succ, db.Addr())
bar.UpdateDisplay(&progress.DisplayProps{
Prefix: prefix,
Mode: progress.ModeDone,
})
} else {
bar.UpdateDisplay(&progress.DisplayProps{
Prefix: prefix,
Mode: progress.ModeError,
})
if len(p.tidbs) > 0 {
var wg sync.WaitGroup
var appendMutex sync.Mutex
bars := progress.NewMultiBar(color.YellowString("Waiting for tidb instances ready\n"))
for _, db := range p.tidbs {
wg.Add(1)
prefix := color.YellowString(db.Addr())
bar := bars.AddBar(prefix)
go func(dbInst *instance.TiDBInstance) {
defer wg.Done()
if s := checkDB(dbInst.Addr(), options.tidb.UpTimeout); s {
{
appendMutex.Lock()
succ = append(succ, dbInst.Addr())
appendMutex.Unlock()
}
bar.UpdateDisplay(&progress.DisplayProps{
Prefix: prefix,
Mode: progress.ModeDone,
})
} else {
bar.UpdateDisplay(&progress.DisplayProps{
Prefix: prefix,
Mode: progress.ModeError,
})
}
}(db)
}
bar.StopRenderLoop()
bars.StartRenderLoop()
wg.Wait()
bars.StopRenderLoop()
}

if len(succ) > 0 {
// start TiFlash after at least one TiDB is up.
startTiFlash := func() error {
var started []*instance.TiFlashInstance
for _, flash := range p.tiflashs {
if err := p.startInstance(ctx, flash); err != nil {
fmt.Println(color.RedString("TiFlash %s failed to start: %s", flash.Addr(), err))
} else {
started = append(started, flash)
}
}
p.tiflashs = started
unbyte marked this conversation as resolved.
Show resolved Hide resolved

if len(p.tiflashs) > 0 {
var endpoints []string
for _, pd := range p.pds {
endpoints = append(endpoints, pd.Addr())
}
pdClient := api.NewPDClient(endpoints, 10*time.Second, nil)

// make sure TiKV are all up
for _, kv := range p.tikvs {
if err := checkStoreStatus(pdClient, "tikv", kv.StoreAddr()); err != nil {
return err
}
}

for _, flash := range p.tiflashs {
if err := p.startInstance(ctx, flash); err != nil {
return err
}
}

// check if all TiFlash is up
var wg sync.WaitGroup
bars := progress.NewMultiBar(color.YellowString("Waiting for tiflash instances ready\n"))
for _, flash := range p.tiflashs {
cmd := flash.Cmd()
if cmd == nil {
return errors.Errorf("tiflash %s initialize command failed", flash.StoreAddr())
}
if state := cmd.ProcessState; state != nil && state.Exited() {
return errors.Errorf("tiflash process exited with code: %d", state.ExitCode())
}
if err := checkStoreStatus(pdClient, "tiflash", flash.StoreAddr()); err != nil {
return err
}
}

return nil
}
if len(p.tiflashs) > 0 {
err := startTiFlash()
if err != nil {
fmt.Println(color.RedString("TiFlash failed to start: %s", err))
wg.Add(1)
prefix := color.YellowString(flash.Addr())
bar := bars.AddBar(prefix)
go func(flashInst *instance.TiFlashInstance) {
defer wg.Done()
displayResult := &progress.DisplayProps{
Prefix: prefix,
}
if cmd := flashInst.Cmd(); cmd == nil {
displayResult.Mode = progress.ModeError
displayResult.Suffix = "initialize command failed"
} else if state := cmd.ProcessState; state != nil && state.Exited() {
displayResult.Mode = progress.ModeError
displayResult.Suffix = fmt.Sprintf("process exited with code: %d", state.ExitCode())
} else if s := checkStoreStatus(pdClient, flashInst.Addr(), options.tiflash.UpTimeout); !s {
displayResult.Mode = progress.ModeError
displayResult.Suffix = "failed to up after timeout"
} else {
displayResult.Mode = progress.ModeDone
}
bar.UpdateDisplay(displayResult)
}(flash)
}
bars.StartRenderLoop()
wg.Wait()
bars.StopRenderLoop()
}

fmt.Println(color.GreenString("CLUSTER START SUCCESSFULLY, Enjoy it ^-^"))
Expand Down