From d643559dd95bb8c3987ae3a4486d15b9861a4842 Mon Sep 17 00:00:00 2001 From: Helios Date: Thu, 3 Dec 2020 17:10:41 +0800 Subject: [PATCH 1/5] refactor bootCluster --- components/playground/main.go | 59 +++++++------- components/playground/playground.go | 117 ++++++++++++++++------------ 2 files changed, 94 insertions(+), 82 deletions(-) diff --git a/components/playground/main.go b/components/playground/main.go index b66d9e2292..a28be68f05 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -34,7 +34,6 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/tiup/components/playground/instance" - "github.com/pingcap/tiup/pkg/cliutil/progress" "github.com/pingcap/tiup/pkg/cluster/api" "github.com/pingcap/tiup/pkg/environment" "github.com/pingcap/tiup/pkg/localdata" @@ -247,46 +246,44 @@ func tryConnect(dsn string) error { } // checkDB check if the addr is connectable by getting a connection from sql.DB. -func checkDB(dbAddr string) bool { +func checkDB(dbAddr string, timeout int) bool { dsn := fmt.Sprintf("root:@tcp(%s)/", dbAddr) - for i := 0; i < 60; i++ { - if err := tryConnect(dsn); err != nil { + if timeout > 0 { + for i := 0; i < timeout; i++ { + if tryConnect(dsn) == nil { + return true + } time.Sleep(time.Second) - } else { - if i != 0 { - fmt.Println() + } + return false + } else { + for { + if err := tryConnect(dsn); err == nil { + return true } - return true + time.Sleep(time.Second) } } - return false -} -func checkStoreStatus(pdClient *api.PDClient, typ, storeAddr string) error { - prefix := color.YellowString("Waiting for %s %s ready ", typ, storeAddr) - bar := progress.NewSingleBar(prefix) - bar.StartRenderLoop() - defer bar.StopRenderLoop() +} - for i := 0; i < 180; i++ { - up, err := pdClient.IsUp(storeAddr) - if err != nil || !up { +func checkStoreStatus(pdClient *api.PDClient, storeAddr string, timeout int) bool { + if timeout > 0 { + for i := 0; i < timeout; i++ { + if up, err := pdClient.IsUp(storeAddr); err == nil && up { + return true + } + time.Sleep(time.Second) + } + return false + } else { + for { + if up, err := pdClient.IsUp(storeAddr); err == nil && up { + return true + } time.Sleep(time.Second) - } else { - bar.UpdateDisplay(&progress.DisplayProps{ - Prefix: prefix, - Mode: progress.ModeDone, - }) - return nil } } - - bar.UpdateDisplay(&progress.DisplayProps{ - Prefix: prefix, - Mode: progress.ModeError, - }) - - return errors.Errorf(fmt.Sprintf("store %s failed to up after timeout(180s)", storeAddr)) } func hasDashboard(pdAddr string) bool { diff --git a/components/playground/playground.go b/components/playground/playground.go index e385f21fe6..3c05711a23 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/pingcap/tiup/pkg/cliutil/progress" "io" "io/ioutil" "net/http" @@ -26,6 +27,7 @@ import ( "runtime" "strconv" "strings" + "sync" "sync/atomic" "syscall" "text/tabwriter" @@ -35,7 +37,6 @@ import ( "github.com/fatih/color" "github.com/pingcap/errors" "github.com/pingcap/tiup/components/playground/instance" - "github.com/pingcap/tiup/pkg/cliutil/progress" "github.com/pingcap/tiup/pkg/cluster/api" "github.com/pingcap/tiup/pkg/environment" pkgver "github.com/pingcap/tiup/pkg/repository/version" @@ -407,7 +408,7 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e case "drainer": return p.sanitizeConfig(p.bootOptions.drainer, cfg) default: - return fmt.Errorf("unknow %s in sanitizeConfig", cid) + return fmt.Errorf("unknown %s in sanitizeConfig", cid) } } @@ -785,68 +786,82 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme p.booted = true var succ []string - for _, db := range p.tidbs { - prefix := color.YellowString("Waiting for tidb %s ready ", db.Addr()) - bar := progress.NewSingleBar(prefix) - bar.StartRenderLoop() - if s := checkDB(db.Addr()); s { - succ = append(succ, db.Addr()) - bar.UpdateDisplay(&progress.DisplayProps{ - Prefix: prefix, - Mode: progress.ModeDone, - }) - } else { - bar.UpdateDisplay(&progress.DisplayProps{ - Prefix: prefix, - Mode: progress.ModeError, - }) + if len(p.tidbs) > 0 { + var wg sync.WaitGroup + bars := progress.NewMultiBar(color.YellowString("Waiting for tidb instances ready\n")) + for _, db := range p.tidbs { + wg.Add(1) + prefix := color.YellowString(db.Addr()) + bar := bars.AddBar(prefix) + go func(dbInst *instance.TiDBInstance) { + defer wg.Done() + if s := checkDB(dbInst.Addr(), 0); s { + succ = append(succ, dbInst.Addr()) + bar.UpdateDisplay(&progress.DisplayProps{ + Prefix: prefix, + Mode: progress.ModeDone, + }) + } else { + bar.UpdateDisplay(&progress.DisplayProps{ + Prefix: prefix, + Mode: progress.ModeError, + }) + } + }(db) } - bar.StopRenderLoop() + bars.StartRenderLoop() + wg.Wait() + bars.StopRenderLoop() } if len(succ) > 0 { // start TiFlash after at least one TiDB is up. - startTiFlash := func() error { + var started []*instance.TiFlashInstance + for _, flash := range p.tiflashs { + if err := p.startInstance(ctx, flash); err != nil { + fmt.Println(color.RedString("TiFlash %s failed to start: %s", flash.Addr(), err)) + } else { + started = append(started, flash) + } + } + p.tiflashs = started + + if len(p.tiflashs) > 0 { var endpoints []string for _, pd := range p.pds { endpoints = append(endpoints, pd.Addr()) } pdClient := api.NewPDClient(endpoints, 10*time.Second, nil) - // make sure TiKV are all up - for _, kv := range p.tikvs { - if err := checkStoreStatus(pdClient, "tikv", kv.StoreAddr()); err != nil { - return err - } - } - + var wg sync.WaitGroup + bars := progress.NewMultiBar(color.YellowString("Waiting for tiflash instances ready\n")) for _, flash := range p.tiflashs { - if err := p.startInstance(ctx, flash); err != nil { - return err - } - } - - // check if all TiFlash is up - for _, flash := range p.tiflashs { - cmd := flash.Cmd() - if cmd == nil { - return errors.Errorf("tiflash %s initialize command failed", flash.StoreAddr()) - } - if state := cmd.ProcessState; state != nil && state.Exited() { - return errors.Errorf("tiflash process exited with code: %d", state.ExitCode()) - } - if err := checkStoreStatus(pdClient, "tiflash", flash.StoreAddr()); err != nil { - return err - } - } - - return nil - } - if len(p.tiflashs) > 0 { - err := startTiFlash() - if err != nil { - fmt.Println(color.RedString("TiFlash failed to start: %s", err)) + wg.Add(1) + prefix := color.YellowString(flash.Addr()) + bar := bars.AddBar(prefix) + go func(flashInst *instance.TiFlashInstance) { + defer wg.Done() + displayResult := &progress.DisplayProps{ + Prefix: prefix, + } + if cmd := flashInst.Cmd(); cmd == nil { + displayResult.Mode = progress.ModeError + displayResult.Suffix = "initialize command failed" + } else if state := cmd.ProcessState; state != nil && state.Exited() { + displayResult.Mode = progress.ModeError + displayResult.Suffix = fmt.Sprintf("process exited with code: %d", state.ExitCode()) + } else if s := checkStoreStatus(pdClient, flashInst.Addr(), 0); s { + displayResult.Mode = progress.ModeDone + } else { + displayResult.Mode = progress.ModeError + displayResult.Suffix = "failed to up after timeout" + } + bar.UpdateDisplay(displayResult) + }(flash) } + bars.StartRenderLoop() + wg.Wait() + bars.StopRenderLoop() } fmt.Println(color.GreenString("CLUSTER START SUCCESSFULLY, Enjoy it ^-^")) From 6dd7c5fc34cdd563a6c18b073b3b80875e7712fe Mon Sep 17 00:00:00 2001 From: Helios Date: Thu, 3 Dec 2020 22:31:32 +0800 Subject: [PATCH 2/5] add flag to set timeout --- components/playground/instance/instance.go | 1 + components/playground/main.go | 12 +++++++++--- components/playground/playground.go | 4 ++-- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/components/playground/instance/instance.go b/components/playground/instance/instance.go index d1be56f1da..7467091193 100644 --- a/components/playground/instance/instance.go +++ b/components/playground/instance/instance.go @@ -26,6 +26,7 @@ type Config struct { BinPath string Num int Host string + UpTimeout int } type instance struct { diff --git a/components/playground/main.go b/components/playground/main.go index a28be68f05..8fa464007a 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -78,7 +78,8 @@ func installIfMissing(profile *localdata.Profile, component, version string) err func execute() error { opt := &bootOptions{ tidb: instance.Config{ - Num: 1, + Num: 1, + UpTimeout: 60, }, tikv: instance.Config{ Num: 1, @@ -87,7 +88,8 @@ func execute() error { Num: 1, }, tiflash: instance.Config{ - Num: 1, + Num: 1, + UpTimeout: 120, }, host: "127.0.0.1", monitor: true, @@ -202,6 +204,9 @@ Examples: rootCmd.Flags().IntVarP(&opt.pump.Num, "pump", "", opt.pump.Num, "Pump instance number") rootCmd.Flags().IntVarP(&opt.drainer.Num, "drainer", "", opt.drainer.Num, "Drainer instance number") + rootCmd.Flags().IntVarP(&opt.tidb.UpTimeout, "db.timeout", "", opt.tidb.UpTimeout, "TiDB max wait time in seconds for staring, 0 means no limit") + rootCmd.Flags().IntVarP(&opt.tiflash.UpTimeout, "tiflash.timeout", "", opt.tiflash.UpTimeout, "TiFlash max wait time in seconds for staring, 0 means no limit") + rootCmd.Flags().StringVarP(&opt.host, "host", "", opt.host, "Playground cluster host") rootCmd.Flags().StringVarP(&opt.tidb.Host, "db.host", "", opt.tidb.Host, "Playground TiDB host. If not provided, TiDB will still use `host` flag as its host") rootCmd.Flags().StringVarP(&opt.pd.Host, "pd.host", "", opt.pd.Host, "Playground PD host. If not provided, PD will still use `host` flag as its host") @@ -245,7 +250,7 @@ func tryConnect(dsn string) error { return nil } -// checkDB check if the addr is connectable by getting a connection from sql.DB. +// checkDB check if the addr is connectable by getting a connection from sql.DB. timeout <=0 means no timeout func checkDB(dbAddr string, timeout int) bool { dsn := fmt.Sprintf("root:@tcp(%s)/", dbAddr) if timeout > 0 { @@ -267,6 +272,7 @@ func checkDB(dbAddr string, timeout int) bool { } +// checkStoreStatus uses pd client to check whether a store is up. timeout <= 0 means no timeout func checkStoreStatus(pdClient *api.PDClient, storeAddr string, timeout int) bool { if timeout > 0 { for i := 0; i < timeout; i++ { diff --git a/components/playground/playground.go b/components/playground/playground.go index 3c05711a23..dbd180f0c0 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -795,7 +795,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme bar := bars.AddBar(prefix) go func(dbInst *instance.TiDBInstance) { defer wg.Done() - if s := checkDB(dbInst.Addr(), 0); s { + if s := checkDB(dbInst.Addr(), options.tidb.UpTimeout); s { succ = append(succ, dbInst.Addr()) bar.UpdateDisplay(&progress.DisplayProps{ Prefix: prefix, @@ -850,7 +850,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme } else if state := cmd.ProcessState; state != nil && state.Exited() { displayResult.Mode = progress.ModeError displayResult.Suffix = fmt.Sprintf("process exited with code: %d", state.ExitCode()) - } else if s := checkStoreStatus(pdClient, flashInst.Addr(), 0); s { + } else if s := checkStoreStatus(pdClient, flashInst.Addr(), options.tiflash.UpTimeout); s { displayResult.Mode = progress.ModeDone } else { displayResult.Mode = progress.ModeError From 1493f958d952675c4243b530dbd4510f7645ff39 Mon Sep 17 00:00:00 2001 From: Helios Date: Thu, 3 Dec 2020 22:54:11 +0800 Subject: [PATCH 3/5] lint --- components/playground/main.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/components/playground/main.go b/components/playground/main.go index 8fa464007a..d6928ad041 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -261,15 +261,13 @@ func checkDB(dbAddr string, timeout int) bool { time.Sleep(time.Second) } return false - } else { - for { - if err := tryConnect(dsn); err == nil { - return true - } - time.Sleep(time.Second) + } + for { + if err := tryConnect(dsn); err == nil { + return true } + time.Sleep(time.Second) } - } // checkStoreStatus uses pd client to check whether a store is up. timeout <= 0 means no timeout @@ -282,13 +280,12 @@ func checkStoreStatus(pdClient *api.PDClient, storeAddr string, timeout int) boo time.Sleep(time.Second) } return false - } else { - for { - if up, err := pdClient.IsUp(storeAddr); err == nil && up { - return true - } - time.Sleep(time.Second) + } + for { + if up, err := pdClient.IsUp(storeAddr); err == nil && up { + return true } + time.Sleep(time.Second) } } From 281438f4db306e40ab742c7561dfcc4f17f74630 Mon Sep 17 00:00:00 2001 From: Helios Date: Sat, 5 Dec 2020 00:13:42 +0800 Subject: [PATCH 4/5] typo --- components/playground/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/playground/main.go b/components/playground/main.go index d6928ad041..da0879e6ac 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -204,8 +204,8 @@ Examples: rootCmd.Flags().IntVarP(&opt.pump.Num, "pump", "", opt.pump.Num, "Pump instance number") rootCmd.Flags().IntVarP(&opt.drainer.Num, "drainer", "", opt.drainer.Num, "Drainer instance number") - rootCmd.Flags().IntVarP(&opt.tidb.UpTimeout, "db.timeout", "", opt.tidb.UpTimeout, "TiDB max wait time in seconds for staring, 0 means no limit") - rootCmd.Flags().IntVarP(&opt.tiflash.UpTimeout, "tiflash.timeout", "", opt.tiflash.UpTimeout, "TiFlash max wait time in seconds for staring, 0 means no limit") + rootCmd.Flags().IntVarP(&opt.tidb.UpTimeout, "db.timeout", "", opt.tidb.UpTimeout, "TiDB max wait time in seconds for starting, 0 means no limit") + rootCmd.Flags().IntVarP(&opt.tiflash.UpTimeout, "tiflash.timeout", "", opt.tiflash.UpTimeout, "TiFlash max wait time in seconds for starting, 0 means no limit") rootCmd.Flags().StringVarP(&opt.host, "host", "", opt.host, "Playground cluster host") rootCmd.Flags().StringVarP(&opt.tidb.Host, "db.host", "", opt.tidb.Host, "Playground TiDB host. If not provided, TiDB will still use `host` flag as its host") From bf37ae8f037d059975ed43f7720954ffdd7e6579 Mon Sep 17 00:00:00 2001 From: Helios Date: Tue, 8 Dec 2020 20:17:35 +0800 Subject: [PATCH 5/5] add mutex --- components/playground/playground.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/components/playground/playground.go b/components/playground/playground.go index dbd180f0c0..cd249989ed 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -788,6 +788,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme var succ []string if len(p.tidbs) > 0 { var wg sync.WaitGroup + var appendMutex sync.Mutex bars := progress.NewMultiBar(color.YellowString("Waiting for tidb instances ready\n")) for _, db := range p.tidbs { wg.Add(1) @@ -796,7 +797,11 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme go func(dbInst *instance.TiDBInstance) { defer wg.Done() if s := checkDB(dbInst.Addr(), options.tidb.UpTimeout); s { - succ = append(succ, dbInst.Addr()) + { + appendMutex.Lock() + succ = append(succ, dbInst.Addr()) + appendMutex.Unlock() + } bar.UpdateDisplay(&progress.DisplayProps{ Prefix: prefix, Mode: progress.ModeDone, @@ -850,11 +855,11 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme } else if state := cmd.ProcessState; state != nil && state.Exited() { displayResult.Mode = progress.ModeError displayResult.Suffix = fmt.Sprintf("process exited with code: %d", state.ExitCode()) - } else if s := checkStoreStatus(pdClient, flashInst.Addr(), options.tiflash.UpTimeout); s { - displayResult.Mode = progress.ModeDone - } else { + } else if s := checkStoreStatus(pdClient, flashInst.Addr(), options.tiflash.UpTimeout); !s { displayResult.Mode = progress.ModeError displayResult.Suffix = "failed to up after timeout" + } else { + displayResult.Mode = progress.ModeDone } bar.UpdateDisplay(displayResult) }(flash)