diff --git a/.github/workflows/release-tiup.yaml b/.github/workflows/release-tiup.yaml index a0821e1431..047cca3056 100644 --- a/.github/workflows/release-tiup.yaml +++ b/.github/workflows/release-tiup.yaml @@ -26,6 +26,8 @@ jobs: release: runs-on: ubuntu-latest timeout-minutes: 30 + outputs: + REL_VER: ${{ steps.build_tiup.outputs.REL_VER }} strategy: fail-fast: true matrix: @@ -195,3 +197,40 @@ jobs: omitNameDuringUpdate: true prerelease: ${{ github.event.release.prerelease }} token: ${{ secrets.GITHUB_TOKEN }} + + brew-upgrade: + runs-on: ubuntu-latest + timeout-minutes: 5 + needs: release + steps: + - name: Check out brew code + uses: actions/checkout@v3 + continue-on-error: true + if: github.event_name == 'release' + with: + repository: pingcap/homebrew-brew + persist-credentials: false + ref: master + path: ${{ github.workspace }}/homebrew-brew + fetch-depth: 0 + + - name: Update and Check tiup version + id: update_version + working-directory: ${{ github.workspace }}/homebrew-brew + continue-on-error: true + if: github.event_name == 'release' + run: | + sed -i 's/version.*/version "${{ needs.release.outputs.REL_VER }}"/g' Formula/tiup.rb + sed -i 's/tag:.*/tag: "${{ needs.release.outputs.REL_VER }}"/g' Formula/tiup.rb + cat Formula/tiup.rb + + - name: Push new homebrew + uses: actions-js/push@master + continue-on-error: true + if: github.event_name == 'release' + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + directory: ${{ github.workspace }}/homebrew-brew + message: "tiup: ${{ needs.release.outputs.REL_VER }}" + branch: master + repository: pingcap/homebrew-brew \ No newline at end of file diff --git a/cmd/help.go b/cmd/help.go deleted file mode 100644 index da6bf175d7..0000000000 --- a/cmd/help.go +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package cmd - -import ( - "fmt" - "os" - "os/exec" - "path/filepath" - "strings" - - "github.com/pingcap/tiup/pkg/environment" - "github.com/pingcap/tiup/pkg/localdata" - "github.com/pingcap/tiup/pkg/utils" - "github.com/spf13/cobra" -) - -func newHelpCmd() *cobra.Command { - return &cobra.Command{ - Use: "help [command]", - Short: "Help about any command or component", - Long: `Help provides help for any command or component in the application. -Simply type tiup help | for full details.`, - Run: func(cmd *cobra.Command, args []string) { - env := environment.GlobalEnv() - cmd, n, e := cmd.Root().Find(args) - if (cmd == rootCmd || e != nil) && len(n) > 0 { - externalHelp(env, n[0], n[1:]...) - } else { - cmd.InitDefaultHelpFlag() // make possible 'help' flag to be shown - cmd.HelpFunc()(cmd, args) - } - }, - } -} - -func externalHelp(env *environment.Environment, spec string, args ...string) { - profile := env.Profile() - component, version := environment.ParseCompVersion(spec) - selectVer, err := env.SelectInstalledVersion(component, version) - if err != nil { - fmt.Println(err) - return - } - binaryPath, err := env.BinaryPath(component, selectVer) - if err != nil { - fmt.Println(err) - return - } - - installPath, err := profile.ComponentInstalledPath(component, selectVer) - if err != nil { - fmt.Println(err) - return - } - - sd := profile.Path(filepath.Join(localdata.StorageParentDir, strings.Split(spec, ":")[0])) - envs := []string{ - fmt.Sprintf("%s=%s", localdata.EnvNameHome, profile.Root()), - fmt.Sprintf("%s=%s", localdata.EnvNameComponentInstallDir, installPath), - fmt.Sprintf("%s=%s", localdata.EnvNameComponentDataDir, sd), - } - envs = append(envs, os.Environ()...) - - comp := exec.Command(binaryPath, utils.RebuildArgs(args)...) - comp.Env = envs - comp.Stdout = os.Stdout - comp.Stderr = os.Stderr - if err := comp.Start(); err != nil { - fmt.Printf("Cannot fetch help message from %s failed: %v\n", binaryPath, err) - return - } - if err := comp.Wait(); err != nil { - fmt.Printf("Cannot fetch help message from %s failed: %v\n", binaryPath, err) - } -} - -// nolint unused -func rebuildArgs(args []string) []string { - helpFlag := "--help" - argList := []string{} - for _, arg := range args { - if arg == "-h" || arg == "--help" { - helpFlag = arg - } else { - argList = append(argList, arg) - } - } - argList = append(argList, helpFlag) - return argList -} - -func usageTemplate(profile *localdata.Profile) string { - installComps := ` -Components Manifest: - use "tiup list" to fetch the latest components manifest -` - - return `Usage:{{if .Runnable}} - {{.UseLine}}{{end}}{{if gt (len .Aliases) 0}} - -Aliases: - {{.NameAndAliases}}{{end}}{{if .HasExample}} - -Examples: -{{.Example}}{{end}}{{if .HasAvailableSubCommands}} - -Available Commands:{{range .Commands}}{{if (or .IsAvailableCommand (eq .Name "help"))}} - {{rpad .Name .NamePadding }} {{.Short}}{{end}}{{end}}{{end}}{{if .HasAvailableLocalFlags}} -{{if not .HasParent}}` + installComps + `{{end}} -Flags: -{{.LocalFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}{{if .HasAvailableInheritedFlags}} - -Global Flags: -{{.InheritedFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}{{if .HasHelpSubCommands}} - -Additional help topics:{{range .Commands}}{{if .IsAdditionalHelpTopicCommand}} - {{rpad .CommandPath .CommandPathPadding}} {{.Short}}{{end}}{{end}}{{end}}{{if not .HasParent}} - -Component instances with the same "tag" will share a data directory ($TIUP_HOME/data/$tag): - $ tiup --tag mycluster playground - -Examples: - $ tiup playground # Quick start - $ tiup playground nightly # Start a playground with the latest nightly version - $ tiup install [:version] # Install a component of specific version - $ tiup update --all # Update all installed components to the latest version - $ tiup update --nightly # Update all installed components to the nightly version - $ tiup update --self # Update the "tiup" to the latest version - $ tiup list # Fetch the latest supported components list - $ tiup status # Display all running/terminated instances - $ tiup clean # Clean the data of running/terminated instance (Kill process if it's running) - $ tiup clean --all # Clean the data of all running/terminated instances{{end}}{{if .HasAvailableSubCommands}} - -Use "{{.CommandPath}} [command] --help" for more information about a command.{{end}} -` -} diff --git a/cmd/history.go b/cmd/history.go new file mode 100644 index 0000000000..8adc2b18c9 --- /dev/null +++ b/cmd/history.go @@ -0,0 +1,107 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "encoding/json" + "fmt" + "strconv" + + "github.com/pingcap/errors" + "github.com/pingcap/tiup/pkg/environment" + "github.com/pingcap/tiup/pkg/tui" + "github.com/spf13/cobra" +) + +// newHistoryCmd history +func newHistoryCmd() *cobra.Command { + rows := 100 + var displayMode string + var all bool + cmd := &cobra.Command{ + Use: "history ", + Short: "Display the historical execution record of TiUP, displays 100 lines by default", + RunE: func(cmd *cobra.Command, args []string) error { + if len(args) > 0 { + r, err := strconv.Atoi(args[0]) + if err == nil { + rows = r + } else { + return fmt.Errorf("%s: numeric argument required", args[0]) + } + } + + env := environment.GlobalEnv() + rows, err := env.GetHistory(rows, all) + if err != nil { + return err + } + + if displayMode == "json" { + for _, r := range rows { + rBytes, err := json.Marshal(r) + if err != nil { + continue + } + fmt.Println(string(rBytes)) + } + return nil + } + var table [][]string + table = append(table, []string{"Date", "Command", "Code"}) + + for _, r := range rows { + table = append(table, []string{ + r.Date.Format("2006-01-02T15:04:05"), + r.Command, + strconv.Itoa(r.Code), + }) + } + tui.PrintTable(table, true) + fmt.Printf("history log save path: %s\n", env.LocalPath(environment.HistoryDir)) + return nil + }, + } + cmd.Flags().StringVar(&displayMode, "format", "default", "The format of output, available values are [default, json]") + cmd.Flags().BoolVar(&all, "all", false, "Display all execution history") + cmd.AddCommand(newHistoryCleanupCmd()) + return cmd +} + +func newHistoryCleanupCmd() *cobra.Command { + var retainDays int + var all bool + var skipConfirm bool + cmd := &cobra.Command{ + Use: "cleanup", + Short: "delete all execution history", + RunE: func(cmd *cobra.Command, args []string) error { + if retainDays < 0 { + return errors.Errorf("retain-days cannot be less than 0") + } + + if all { + retainDays = 0 + } + + env := environment.GlobalEnv() + return env.DeleteHistory(retainDays, skipConfirm) + }, + } + + cmd.Flags().IntVar(&retainDays, "retain-days", 60, "Number of days to keep history for deletion") + cmd.Flags().BoolVar(&all, "all", false, "Delete all history") + cmd.Flags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'") + return cmd +} diff --git a/cmd/mirror.go b/cmd/mirror.go index 171ca15295..4c9ffe17aa 100644 --- a/cmd/mirror.go +++ b/cmd/mirror.go @@ -444,6 +444,7 @@ func newTransferOwnerCmd() *cobra.Command { // the `mirror rotate` sub command func newMirrorRotateCmd() *cobra.Command { addr := "0.0.0.0:8080" + keyDir := "" cmd := &cobra.Command{ Use: "rotate", @@ -451,6 +452,16 @@ func newMirrorRotateCmd() *cobra.Command { Long: "Rotate root.json make it possible to modify root.json", RunE: func(cmd *cobra.Command, args []string) error { teleCommand = cmd.CommandPath() + + e, err := environment.InitEnv(repoOpts, repository.MirrorOptions{KeyDir: keyDir}) + if err != nil { + if errors.Is(perrs.Cause(err), v1manifest.ErrLoadManifest) { + log.Warnf("Please check for root manifest file, you may download one from the repository mirror, or try `tiup mirror set` to force reset it.") + } + return err + } + environment.SetGlobalEnv(e) + root, err := editLatestRootManifest() if err != nil { return err @@ -465,6 +476,7 @@ func newMirrorRotateCmd() *cobra.Command { }, } cmd.Flags().StringVarP(&addr, "addr", "", addr, "listen address:port when starting the temp server for rotating") + cmd.Flags().StringVarP(&keyDir, "key-dir", "", keyDir, "specify the directory where stores the private keys") return cmd } diff --git a/cmd/root.go b/cmd/root.go index c8d3596072..e9422ced8a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -49,10 +49,8 @@ var ( // arguments var ( - binary string - binPath string - tag string - printVersion bool // not using cobra.Command.Version to make it possible to show component versions + binPath string + tag string ) func init() { @@ -67,20 +65,28 @@ TiDB platform components to the local system. You can run a specific version of "tiup [:version]". If no version number is specified, the latest version installed locally will be used. If the specified component does not have any version installed locally, the latest stable version will be downloaded from the repository.`, + Example: ` $ tiup playground # Quick start + $ tiup playground nightly # Start a playground with the latest nightly version + $ tiup install [:version] # Install a component of specific version + $ tiup update --all # Update all installed components to the latest version + $ tiup update --nightly # Update all installed components to the nightly version + $ tiup update --self # Update the "tiup" to the latest version + $ tiup list # Fetch the latest supported components list + $ tiup status # Display all running/terminated instances + $ tiup clean # Clean the data of running/terminated instance (Kill process if it's running) + $ tiup clean --all # Clean the data of all running/terminated instances`, SilenceErrors: true, - FParseErrWhitelist: cobra.FParseErrWhitelist{UnknownFlags: true}, + DisableFlagParsing: true, Args: func(cmd *cobra.Command, args []string) error { // Support `tiup ` return nil }, PersistentPreRunE: func(cmd *cobra.Command, args []string) error { teleCommand = cmd.CommandPath() - if printVersion && len(args) == 0 { - return nil - } switch cmd.Name() { case "init", + "rotate", "set": if cmd.HasParent() && cmd.Parent().Name() == "mirror" { // skip environment init @@ -88,7 +94,7 @@ the latest stable version will be downloaded from the repository.`, } fallthrough default: - e, err := environment.InitEnv(repoOpts) + e, err := environment.InitEnv(repoOpts, repository.MirrorOptions{}) if err != nil { if errors.Is(perrs.Cause(err), v1manifest.ErrLoadManifest) { log.Warnf("Please check for root manifest file, you may download one from the repository mirror, or try `tiup mirror set` to force reset it.") @@ -100,13 +106,30 @@ the latest stable version will be downloaded from the repository.`, return nil }, RunE: func(cmd *cobra.Command, args []string) error { - if printVersion && len(args) == 0 { - fmt.Println(version.NewTiUPVersion().String()) - return nil + if len(args) == 0 { + return cmd.Help() } env := environment.GlobalEnv() - if binary != "" { - component, ver := environment.ParseCompVersion(binary) + + // TBD: change this flag to subcommand + + // We assume the first unknown parameter is the component name and following + // parameters will be transparent passed because registered flags and subcommands + // will be parsed correctly. + // e.g: tiup --tag mytag --rm playground --db 3 --pd 3 --kv 4 + // => run "playground" with parameters "--db 3 --pd 3 --kv 4" + // tiup --tag mytag --binpath /xxx/tikv-server tikv + switch args[0] { + case "--help", "-h": + return cmd.Help() + case "--version", "-v": + fmt.Println(version.NewTiUPVersion().String()) + return nil + case "--binary": + if len(args) < 2 { + return fmt.Errorf("flag needs an argument: %s", args[0]) + } + component, ver := environment.ParseCompVersion(args[1]) selectedVer, err := env.SelectInstalledVersion(component, ver) if err != nil { return err @@ -117,36 +140,31 @@ the latest stable version will be downloaded from the repository.`, } fmt.Println(binaryPath) return nil - } - if len(args) > 0 { - // We assume the first unknown parameter is the component name and following - // parameters will be transparent passed because registered flags and subcommands - // will be parsed correctly. - // e.g: tiup --tag mytag --rm playground --db 3 --pd 3 --kv 4 - // => run "playground" with parameters "--db 3 --pd 3 --kv 4" - // tiup --tag mytag --binpath /xxx/tikv-server tikv - var transparentParams []string - componentSpec := args[0] - for i, arg := range os.Args { - if arg == componentSpec { - transparentParams = os.Args[i+1:] - break - } + case "--binpath": + if len(args) < 2 { + return fmt.Errorf("flag needs an argument: %s", args[0]) } - if len(transparentParams) > 0 && transparentParams[0] == "--" { - transparentParams = transparentParams[1:] + binPath = args[1] + args = args[2:] + case "--tag", "-T": + if len(args) < 2 { + return fmt.Errorf("flag needs an argument: %s", args[0]) } - - teleCommand = fmt.Sprintf("%s %s", cmd.CommandPath(), componentSpec) - return tiupexec.RunComponent(env, tag, componentSpec, binPath, transparentParams) + tag = args[1] + args = args[2:] } - return cmd.Help() - }, - PersistentPostRunE: func(cmd *cobra.Command, args []string) error { - if env := environment.GlobalEnv(); env != nil { - return env.Close() + if len(args) < 1 { + return cmd.Help() } - return nil + + componentSpec := args[0] + args = args[1:] + if len(args) > 0 && args[0] == "--" { + args = args[1:] + } + + teleCommand = fmt.Sprintf("%s %s", cmd.CommandPath(), componentSpec) + return tiupexec.RunComponent(env, tag, componentSpec, binPath, args) }, SilenceUsage: true, // implement auto completion for tiup components @@ -181,12 +199,12 @@ the latest stable version will be downloaded from the repository.`, }, } - rootCmd.PersistentFlags().BoolVarP(&repoOpts.SkipVersionCheck, "skip-version-check", "", false, "Skip the strict version check, by default a version must be a valid SemVer string") - rootCmd.Flags().StringVar(&binary, "binary", "", "Print binary path of a specific version of a component `[:version]`\n"+ + // useless, exist to generate help information + rootCmd.Flags().String("binary", "", "Print binary path of a specific version of a component `[:version]`\n"+ "and the latest version installed will be selected if no version specified") - rootCmd.Flags().StringVarP(&tag, "tag", "T", "", "[Deprecated] Specify a tag for component instance") - rootCmd.Flags().StringVar(&binPath, "binpath", "", "Specify the binary path of component instance") - rootCmd.Flags().BoolVarP(&printVersion, "version", "v", false, "Print the version of tiup") + rootCmd.Flags().StringP("tag", "T", "", "[Deprecated] Specify a tag for component instance") + rootCmd.Flags().String("binpath", "", "Specify the binary path of component instance") + rootCmd.Flags().BoolP("version", "v", false, "Print the version of tiup") rootCmd.AddCommand( newInstallCmd(), @@ -198,25 +216,8 @@ the latest stable version will be downloaded from the repository.`, newMirrorCmd(), newTelemetryCmd(), newEnvCmd(), + newHistoryCmd(), ) - - originHelpFunc := rootCmd.HelpFunc() - rootCmd.SetHelpFunc(func(cmd *cobra.Command, args []string) { - cmd, _, _ = cmd.Root().Find(args) - if len(args) < 2 || cmd != rootCmd { - originHelpFunc(cmd, args) - return - } - - env, _ := environment.InitEnv(repoOpts) - environment.SetGlobalEnv(env) - _ = cmd.RunE(cmd, args) - }) - - rootCmd.SetHelpCommand(newHelpCmd()) - // If env is inited before, localdata.InitProfile() will return a valid profile - // or it will return an invalid one but still print usage - rootCmd.SetUsageTemplate(usageTemplate(localdata.InitProfile())) } // Execute parses the command line arguments and calls proper functions @@ -248,6 +249,12 @@ func Execute() { // us a dedicated package for that reportEnabled = false } else { + // record TiUP execution history + err := environment.HistoryRecord(env, os.Args, start, code) + if err != nil { + log.Warnf("Record TiUP execution history log failed: %v", err) + } + teleMeta, _, err := telemetry.GetMeta(env) if err == nil { reportEnabled = teleMeta.Status == telemetry.EnableStatus diff --git a/components/cluster/command/display.go b/components/cluster/command/display.go index 8e43adc3cc..a37c0aaf3f 100644 --- a/components/cluster/command/display.go +++ b/components/cluster/command/display.go @@ -16,6 +16,7 @@ package command import ( "errors" "fmt" + "time" perrs "github.com/pingcap/errors" "github.com/pingcap/tiup/pkg/cluster/spec" @@ -29,6 +30,7 @@ func newDisplayCmd() *cobra.Command { showDashboardOnly bool showVersionOnly bool showTiKVLabels bool + statusTimeout uint64 ) cmd := &cobra.Command{ Use: "display ", @@ -38,6 +40,7 @@ func newDisplayCmd() *cobra.Command { return cmd.Help() } + gOpt.APITimeout = statusTimeout clusterName = args[0] clusterReport.ID = scrubClusterName(clusterName) teleCommand = append(teleCommand, scrubClusterName(clusterName)) @@ -67,7 +70,7 @@ func newDisplayCmd() *cobra.Command { if err != nil { return err } - return cm.DisplayDashboardInfo(clusterName, tlsCfg) + return cm.DisplayDashboardInfo(clusterName, time.Second*time.Duration(gOpt.APITimeout), tlsCfg) } if showTiKVLabels { return cm.DisplayTiKVLabels(clusterName, gOpt) @@ -82,6 +85,7 @@ func newDisplayCmd() *cobra.Command { cmd.Flags().BoolVar(&showDashboardOnly, "dashboard", false, "Only display TiDB Dashboard information") cmd.Flags().BoolVar(&showVersionOnly, "version", false, "Only display TiDB cluster version") cmd.Flags().BoolVar(&showTiKVLabels, "labels", false, "Only display labels of specified TiKV role or nodes") + cmd.Flags().Uint64Var(&statusTimeout, "status-timeout", 10, "Timeout in seconds when getting node status") return cmd } diff --git a/components/cluster/command/root.go b/components/cluster/command/root.go index ca976c7e92..6bad82f4fc 100644 --- a/components/cluster/command/root.go +++ b/components/cluster/command/root.go @@ -120,7 +120,7 @@ func init() { env, err = tiupmeta.InitEnv(repository.Options{ GOOS: "linux", GOARCH: "amd64", - }) + }, repository.MirrorOptions{}) if err != nil { return err } diff --git a/components/dm/command/display.go b/components/dm/command/display.go index 670e6e4f43..e18ac67458 100644 --- a/components/dm/command/display.go +++ b/components/dm/command/display.go @@ -27,6 +27,7 @@ func newDisplayCmd() *cobra.Command { var ( clusterName string showVersionOnly bool + statusTimeout uint64 ) cmd := &cobra.Command{ Use: "display ", @@ -36,6 +37,7 @@ func newDisplayCmd() *cobra.Command { return cmd.Help() } + gOpt.APITimeout = statusTimeout clusterName = args[0] if showVersionOnly { @@ -55,6 +57,7 @@ func newDisplayCmd() *cobra.Command { cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Only display specified nodes") cmd.Flags().BoolVar(&showVersionOnly, "version", false, "Only display DM cluster version") cmd.Flags().BoolVar(&gOpt.ShowUptime, "uptime", false, "Display DM with uptime") + cmd.Flags().Uint64Var(&statusTimeout, "status-timeout", 10, "Timeout in seconds when getting node status") return cmd } diff --git a/components/dm/command/root.go b/components/dm/command/root.go index 4246c5088f..b6bf79ac9c 100644 --- a/components/dm/command/root.go +++ b/components/dm/command/root.go @@ -91,7 +91,7 @@ please backup your data before process.`, env, err = tiupmeta.InitEnv(repository.Options{ GOOS: "linux", GOARCH: "amd64", - }) + }, repository.MirrorOptions{}) if err != nil { return err } diff --git a/components/dm/spec/logic.go b/components/dm/spec/logic.go index 0566d3003f..41f62c9aca 100644 --- a/components/dm/spec/logic.go +++ b/components/dm/spec/logic.go @@ -19,6 +19,7 @@ import ( "fmt" "path/filepath" "strings" + "sync" "time" "github.com/pingcap/tiup/pkg/cluster/ctxt" @@ -89,8 +90,8 @@ func (c *DMMasterComponent) Instances() []Instance { s.DataDir, }, StatusFn: s.Status, - UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration { - return spec.UptimeByHost(s.Host, s.Port, tlsCfg) + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return spec.UptimeByHost(s.Host, s.Port, timeout, tlsCfg) }, }, topo: c.Topology, @@ -267,8 +268,8 @@ func (c *DMWorkerComponent) Instances() []Instance { s.DataDir, }, StatusFn: s.Status, - UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration { - return spec.UptimeByHost(s.Host, s.Port, tlsCfg) + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return spec.UptimeByHost(s.Host, s.Port, timeout, tlsCfg) }, }, topo: c.Topology, @@ -440,12 +441,28 @@ func (topo *Specification) IterComponent(fn func(comp Component)) { } // IterInstance iterates all instances in component starting order -func (topo *Specification) IterInstance(fn func(instance Instance)) { +func (topo *Specification) IterInstance(fn func(instance Instance), concurrency ...int) { + maxWorkers := 1 + wg := sync.WaitGroup{} + if len(concurrency) > 0 && concurrency[0] > 1 { + maxWorkers = concurrency[0] + } + workerPool := make(chan struct{}, maxWorkers) + for _, comp := range topo.ComponentsByStartOrder() { for _, inst := range comp.Instances() { - fn(inst) + wg.Add(1) + workerPool <- struct{}{} + go func(inst Instance) { + defer func() { + <-workerPool + wg.Done() + }() + fn(inst) + }(inst) } } + wg.Wait() } // IterHost iterates one instance for each host diff --git a/components/dm/spec/topology_dm.go b/components/dm/spec/topology_dm.go index 8959f7bac0..a400688a0f 100644 --- a/components/dm/spec/topology_dm.go +++ b/components/dm/spec/topology_dm.go @@ -141,9 +141,13 @@ type MasterSpec struct { } // Status queries current status of the instance -func (s *MasterSpec) Status(_ context.Context, tlsCfg *tls.Config, _ ...string) string { +func (s *MasterSpec) Status(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string { + if timeout < time.Second { + timeout = statusQueryTimeout + } + addr := fmt.Sprintf("%s:%d", s.Host, s.Port) - dc := api.NewDMMasterClient([]string{addr}, statusQueryTimeout, tlsCfg) + dc := api.NewDMMasterClient([]string{addr}, timeout, tlsCfg) isFound, isActive, isLeader, err := dc.GetMaster(s.Name) if err != nil { return "Down" @@ -207,11 +211,15 @@ type WorkerSpec struct { } // Status queries current status of the instance -func (s *WorkerSpec) Status(_ context.Context, tlsCfg *tls.Config, masterList ...string) string { +func (s *WorkerSpec) Status(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, masterList ...string) string { if len(masterList) < 1 { return "N/A" } - dc := api.NewDMMasterClient(masterList, statusQueryTimeout, tlsCfg) + + if timeout < time.Second { + timeout = statusQueryTimeout + } + dc := api.NewDMMasterClient(masterList, timeout, tlsCfg) stage, err := dc.GetWorker(s.Name) if err != nil { return "Down" diff --git a/components/playground/main.go b/components/playground/main.go index a2c159733b..bdade14803 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -218,7 +218,7 @@ Examples: return err } - env, err := environment.InitEnv(repository.Options{}) + env, err := environment.InitEnv(repository.Options{}, repository.MirrorOptions{}) if err != nil { return err } diff --git a/components/playground/playground.go b/components/playground/playground.go index 715aa8e1ca..7227c3eceb 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -137,7 +137,7 @@ func (p *Playground) binlogClient() (*api.BinlogClient, error) { addrs = append(addrs, inst.Addr()) } - return api.NewBinlogClient(addrs, nil) + return api.NewBinlogClient(addrs, 5*time.Second, nil) } func (p *Playground) pdClient() *api.PDClient { diff --git a/embed/examples/cluster/topology.example.yaml b/embed/examples/cluster/topology.example.yaml index 0edac37932..c81f4824fe 100644 --- a/embed/examples/cluster/topology.example.yaml +++ b/embed/examples/cluster/topology.example.yaml @@ -327,3 +327,5 @@ alertmanager_servers: # data_dir: "/tidb-data/alertmanager-9093" # # Alertmanager log file storage directory. # log_dir: "/tidb-deploy/alertmanager-9093/log" + # # Alertmanager config file storage directory. + # config_file: "/tidb-deploy/alertmanager-9093/bin/alertmanager/alertmanager.yml" diff --git a/embed/templates/config/prometheus.yml.tpl b/embed/templates/config/prometheus.yml.tpl index 6dec42b071..65a4b8b4cf 100644 --- a/embed/templates/config/prometheus.yml.tpl +++ b/embed/templates/config/prometheus.yml.tpl @@ -298,6 +298,19 @@ scrape_configs: labels: group: 'tiflash' {{- end}} + relabel_configs: + - source_labels: [__address__] + target_label: __param_target + - source_labels: [__param_target] + target_label: instance + - target_label: __address__ + replacement: {{.BlackboxAddr}} + - job_name: "monitor_port_probe" + scrape_interval: 30s + metrics_path: /probe + params: + module: [tcp_connect] + static_configs: {{- if .PushgatewayAddr}} - targets: - '{{.PushgatewayAddr}}' diff --git a/embed/templates/scripts/run_dm-master.sh.tpl b/embed/templates/scripts/run_dm-master.sh.tpl index 246e2097ad..39fde5e418 100644 --- a/embed/templates/scripts/run_dm-master.sh.tpl +++ b/embed/templates/scripts/run_dm-master.sh.tpl @@ -25,7 +25,7 @@ exec bin/dm-master/dm-master \ --v1-sources-path="{{.V1SourcePath}}" \ {{- end}} --name="{{.Name}}" \ - --master-addr="{{.IP}}:{{.Port}}" \ + --master-addr="0.0.0.0:{{.Port}}" \ --advertise-addr="{{.IP}}:{{.Port}}" \ --peer-urls="{{.Scheme}}://{{.IP}}:{{.PeerPort}}" \ --advertise-peer-urls="{{.Scheme}}://{{.IP}}:{{.PeerPort}}" \ diff --git a/embed/templates/scripts/run_dm-master_scale.sh.tpl b/embed/templates/scripts/run_dm-master_scale.sh.tpl index 90572b0241..13a31977aa 100644 --- a/embed/templates/scripts/run_dm-master_scale.sh.tpl +++ b/embed/templates/scripts/run_dm-master_scale.sh.tpl @@ -22,7 +22,7 @@ exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} bin/dm-master/d exec bin/dm-master/dm-master \ {{- end}} --name="{{.Name}}" \ - --master-addr="{{.IP}}:{{.Port}}" \ + --master-addr="0.0.0.0:{{.Port}}" \ --advertise-addr="{{.IP}}:{{.Port}}" \ --peer-urls="{{.Scheme}}://{{.IP}}:{{.PeerPort}}" \ --advertise-peer-urls="{{.Scheme}}://{{.IP}}:{{.PeerPort}}" \ diff --git a/embed/templates/scripts/run_dm-worker.sh.tpl b/embed/templates/scripts/run_dm-worker.sh.tpl index df11e96e20..f1074b4967 100644 --- a/embed/templates/scripts/run_dm-worker.sh.tpl +++ b/embed/templates/scripts/run_dm-worker.sh.tpl @@ -23,7 +23,7 @@ exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} bin/dm-worker/d exec bin/dm-worker/dm-worker \ {{- end}} --name="{{.Name}}" \ - --worker-addr="{{.IP}}:{{.Port}}" \ + --worker-addr="0.0.0.0:{{.Port}}" \ --advertise-addr="{{.IP}}:{{.Port}}" \ --log-file="{{.LogDir}}/dm-worker.log" \ --join="{{template "MasterList" .Endpoints}}" \ diff --git a/go.mod b/go.mod index 4a78de0974..3653b1c439 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/pingcap/kvproto v0.0.0-20220125073028-58f2ac94aa38 github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee // indirect github.com/pingcap/tidb-insight/collector v0.0.0-20220111101533-227008e9835b + github.com/pkg/errors v0.9.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.32.1 github.com/prometheus/prom2json v1.3.0 diff --git a/pkg/cluster/api/binlog.go b/pkg/cluster/api/binlog.go index a16b85f03c..918c246f29 100644 --- a/pkg/cluster/api/binlog.go +++ b/pkg/cluster/api/binlog.go @@ -35,10 +35,14 @@ type BinlogClient struct { } // NewBinlogClient create a BinlogClient. -func NewBinlogClient(pdEndpoints []string, tlsConfig *tls.Config) (*BinlogClient, error) { +func NewBinlogClient(pdEndpoints []string, timeout time.Duration, tlsConfig *tls.Config) (*BinlogClient, error) { + if timeout < time.Second { + timeout = time.Second * 5 + } + etcdClient, err := clientv3.New(clientv3.Config{ Endpoints: pdEndpoints, - DialTimeout: time.Second * 5, + DialTimeout: timeout, TLS: tlsConfig, }) if err != nil { @@ -47,7 +51,7 @@ func NewBinlogClient(pdEndpoints []string, tlsConfig *tls.Config) (*BinlogClient return &BinlogClient{ tls: tlsConfig, - httpClient: utils.NewHTTPClient(5*time.Second, tlsConfig).Client(), + httpClient: utils.NewHTTPClient(timeout, tlsConfig).Client(), etcdClient: etcdClient, }, nil } @@ -165,6 +169,9 @@ func (c *BinlogClient) UpdatePumpState(ctx context.Context, addr string, state s func (c *BinlogClient) updateStatus(ctx context.Context, ty string, nodeID string, state string) error { key := fmt.Sprintf("/tidb-binlog/v1/%s/%s", ty, nodeID) + // set timeout, otherwise it will keep retrying + ctx, f := context.WithTimeout(ctx, c.httpClient.Timeout) + defer f() resp, err := c.etcdClient.KV.Get(ctx, key) if err != nil { return errors.AddStack(err) @@ -202,6 +209,9 @@ func (c *BinlogClient) updateStatus(ctx context.Context, ty string, nodeID strin func (c *BinlogClient) nodeStatus(ctx context.Context, ty string) (status []*NodeStatus, err error) { key := fmt.Sprintf("/tidb-binlog/v1/%s", ty) + // set timeout, otherwise it will keep retrying + ctx, f := context.WithTimeout(ctx, c.httpClient.Timeout) + defer f() resp, err := c.etcdClient.KV.Get(ctx, key, clientv3.WithPrefix()) if err != nil { return nil, errors.AddStack(err) diff --git a/pkg/cluster/manager/destroy.go b/pkg/cluster/manager/destroy.go index c89d01d952..1d53084322 100644 --- a/pkg/cluster/manager/destroy.go +++ b/pkg/cluster/manager/destroy.go @@ -159,7 +159,8 @@ func (m *Manager) DestroyTombstone( Func("FindTomestoneNodes", func(ctx context.Context) (err error) { if !skipConfirm { err = tui.PromptForConfirmOrAbortError( - color.HiYellowString(fmt.Sprintf("Will destroy these nodes: %v\nDo you confirm this action? [y/N]:", nodes)), + fmt.Sprintf("%s\nDo you confirm this action? [y/N]:", + color.HiYellowString("Will destroy these nodes: %v", nodes)), ) if err != nil { return err diff --git a/pkg/cluster/manager/display.go b/pkg/cluster/manager/display.go index 2ddb9c0ce4..e8b72d3b48 100644 --- a/pkg/cluster/manager/display.go +++ b/pkg/cluster/manager/display.go @@ -28,6 +28,7 @@ import ( "github.com/fatih/color" perrs "github.com/pingcap/errors" + "github.com/pingcap/tiup/pkg/checkpoint" "github.com/pingcap/tiup/pkg/cluster/api" "github.com/pingcap/tiup/pkg/cluster/clusterutil" "github.com/pingcap/tiup/pkg/cluster/ctxt" @@ -109,6 +110,8 @@ func (m *Manager) Display(name string, opt operator.Options) error { topo := metadata.GetTopology() base := metadata.GetBaseMeta() cyan := color.New(color.FgCyan, color.Bold) + + statusTimeout := time.Duration(opt.APITimeout) * time.Second // display cluster meta var j *JSONOutput if m.logger.GetDisplayMode() == logprinter.DisplayModeJSON { @@ -201,7 +204,7 @@ func (m *Manager) Display(name string, opt operator.Options) error { ) if t, ok := topo.(*spec.Specification); ok { var err error - dashboardAddr, err = t.GetDashboardAddress(ctx, tlsCfg, masterActive...) + dashboardAddr, err = t.GetDashboardAddress(ctx, tlsCfg, statusTimeout, masterActive...) if err == nil && !set.NewStringSet("", "auto", "none").Exist(dashboardAddr) { scheme := "http" if tlsCfg != nil { @@ -288,6 +291,7 @@ func (m *Manager) DisplayTiKVLabels(name string, opt operator.Options) error { metadata, _ := m.meta(name) topo := metadata.GetTopology() base := metadata.GetBaseMeta() + statusTimeout := time.Duration(opt.APITimeout) * time.Second // display cluster meta cyan := color.New(color.FgCyan, color.Bold) @@ -360,13 +364,13 @@ func (m *Manager) DisplayTiKVLabels(name string, opt operator.Options) error { } topo.IterInstance(func(ins spec.Instance) { if ins.ComponentName() == spec.ComponentPD { - status := ins.Status(ctx, tlsCfg, masterList...) + status := ins.Status(ctx, statusTimeout, tlsCfg, masterList...) if strings.HasPrefix(status, "Up") || strings.HasPrefix(status, "Healthy") { instAddr := fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort()) masterActive = append(masterActive, instAddr) } } - }) + }, opt.Concurrency) var ( labelInfoArr []api.LabelInfo @@ -455,6 +459,8 @@ func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstI topo := metadata.GetTopology() base := metadata.GetBaseMeta() + statusTimeout := time.Duration(opt.APITimeout) * time.Second + err = SetSSHKeySet(ctx, m.specManager.Path(name, "ssh", "id_rsa"), m.specManager.Path(name, "ssh", "id_rsa.pub")) if err != nil { return nil, err @@ -480,17 +486,18 @@ func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstI if ins.ComponentName() != spec.ComponentPD && ins.ComponentName() != spec.ComponentDMMaster { return } - status := ins.Status(ctx, tlsCfg, masterList...) + + status := ins.Status(ctx, statusTimeout, tlsCfg, masterList...) if strings.HasPrefix(status, "Up") || strings.HasPrefix(status, "Healthy") { instAddr := fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort()) masterActive = append(masterActive, instAddr) } masterStatus[ins.ID()] = status - }) + }, opt.Concurrency) var dashboardAddr string if t, ok := topo.(*spec.Specification); ok { - dashboardAddr, _ = t.GetDashboardAddress(ctx, tlsCfg, masterActive...) + dashboardAddr, _ = t.GetDashboardAddress(ctx, tlsCfg, statusTimeout, masterActive...) } clusterInstInfos := []InstInfo{} @@ -523,19 +530,20 @@ func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstI case spec.ComponentDMMaster: status = masterStatus[ins.ID()] default: - status = ins.Status(ctx, tlsCfg, masterActive...) + status = ins.Status(ctx, statusTimeout, tlsCfg, masterActive...) } since := "-" if opt.ShowUptime { - since = formatInstanceSince(ins.Uptime(ctx, tlsCfg)) + since = formatInstanceSince(ins.Uptime(ctx, statusTimeout, tlsCfg)) } // Query the service status and uptime if status == "-" || (opt.ShowUptime && since == "-") { e, found := ctxt.GetInner(ctx).GetExecutor(ins.GetHost()) if found { - active, _ := operator.GetServiceStatus(ctx, e, ins.ServiceName()) + nctx := checkpoint.NewContext(ctx) + active, _ := operator.GetServiceStatus(nctx, e, ins.ServiceName()) if status == "-" { if parts := strings.Split(strings.TrimSpace(active), " "); len(parts) > 2 { if parts[1] == "active" { @@ -569,7 +577,7 @@ func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstI Port: ins.GetPort(), Since: since, }) - }) + }, opt.Concurrency) // Sort by role,host,ports sort.Slice(clusterInstInfos, func(i, j int) bool { @@ -605,7 +613,7 @@ func formatInstanceStatus(status string) string { return color.GreenString(status) case startsWith("down", "err", "inactive"): // down, down|ui return color.RedString(status) - case startsWith("tombstone", "disconnected", "n/a"), strings.Contains(status, "offline"): + case startsWith("tombstone", "disconnected", "n/a"), strings.Contains(strings.ToLower(status), "offline"): return color.YellowString(status) default: return status @@ -715,7 +723,7 @@ func SetClusterSSH(ctx context.Context, topo spec.Topology, deployUser string, s } // DisplayDashboardInfo prints the dashboard address of cluster -func (m *Manager) DisplayDashboardInfo(clusterName string, tlsCfg *tls.Config) error { +func (m *Manager) DisplayDashboardInfo(clusterName string, timeout time.Duration, tlsCfg *tls.Config) error { metadata, err := spec.ClusterMetadata(clusterName) if err != nil && !errors.Is(perrs.Cause(err), meta.ErrValidate) && !errors.Is(perrs.Cause(err), spec.ErrNoTiSparkMaster) { @@ -728,7 +736,7 @@ func (m *Manager) DisplayDashboardInfo(clusterName string, tlsCfg *tls.Config) e } ctx := context.WithValue(context.Background(), logprinter.ContextKeyLogger, m.logger) - pdAPI := api.NewPDClient(ctx, pdEndpoints, 2*time.Second, tlsCfg) + pdAPI := api.NewPDClient(ctx, pdEndpoints, timeout, tlsCfg) dashboardAddr, err := pdAPI.GetDashboardAddress() if err != nil { return fmt.Errorf("failed to retrieve TiDB Dashboard instance from PD: %s", err) diff --git a/pkg/cluster/manager/scale_in.go b/pkg/cluster/manager/scale_in.go index bb770a750d..1923243ac5 100644 --- a/pkg/cluster/manager/scale_in.go +++ b/pkg/cluster/manager/scale_in.go @@ -17,6 +17,7 @@ import ( "context" "crypto/tls" "errors" + "fmt" "strings" "github.com/fatih/color" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tiup/pkg/cluster/spec" "github.com/pingcap/tiup/pkg/cluster/task" "github.com/pingcap/tiup/pkg/meta" + "github.com/pingcap/tiup/pkg/set" "github.com/pingcap/tiup/pkg/tui" ) @@ -51,6 +53,21 @@ func (m *Manager) ScaleIn( force bool = gOpt.Force nodes []string = gOpt.Nodes ) + + metadata, err := m.meta(name) + if err != nil && + !errors.Is(perrs.Cause(err), meta.ErrValidate) && + !errors.Is(perrs.Cause(err), spec.ErrMultipleTiSparkMaster) && + !errors.Is(perrs.Cause(err), spec.ErrMultipleTisparkWorker) && + !errors.Is(perrs.Cause(err), spec.ErrNoTiSparkMaster) { + // ignore conflict check error, node may be deployed by former version + // that lack of some certain conflict checks + return err + } + + topo := metadata.GetTopology() + base := metadata.GetBaseMeta() + if !skipConfirm { if force { m.logger.Warnf(color.HiRedString(tui.ASCIIArtWarning)) @@ -74,23 +91,13 @@ func (m *Manager) ScaleIn( return err } - m.logger.Infof("Scale-in nodes...") - } + if err := checkAsyncComps(topo, nodes); err != nil { + return err + } - metadata, err := m.meta(name) - if err != nil && - !errors.Is(perrs.Cause(err), meta.ErrValidate) && - !errors.Is(perrs.Cause(err), spec.ErrMultipleTiSparkMaster) && - !errors.Is(perrs.Cause(err), spec.ErrMultipleTisparkWorker) && - !errors.Is(perrs.Cause(err), spec.ErrNoTiSparkMaster) { - // ignore conflict check error, node may be deployed by former version - // that lack of some certain conflict checks - return err + m.logger.Infof("Scale-in nodes...") } - topo := metadata.GetTopology() - base := metadata.GetBaseMeta() - // Regenerate configuration gOpt.IgnoreConfigCheck = true regenConfigTasks, hasImported := buildInitConfigTasks(m, name, topo, base, gOpt, nodes) @@ -137,3 +144,25 @@ func (m *Manager) ScaleIn( return nil } + +// checkAsyncComps +func checkAsyncComps(topo spec.Topology, nodes []string) error { + var asyncOfflineComps = set.NewStringSet(spec.ComponentPump, spec.ComponentTiKV, spec.ComponentTiFlash, spec.ComponentDrainer) + deletedNodes := set.NewStringSet(nodes...) + delAsyncOfflineComps := set.NewStringSet() + topo.IterInstance(func(instance spec.Instance) { + if deletedNodes.Exist(instance.ID()) { + if asyncOfflineComps.Exist(instance.ComponentName()) { + delAsyncOfflineComps.Insert(instance.ComponentName()) + } + } + }) + + if len(delAsyncOfflineComps.Slice()) > 0 { + return tui.PromptForConfirmOrAbortError(fmt.Sprintf( + "%s\nDo you want to continue? [y/N]:", color.YellowString( + "The component `%s` will become tombstone, maybe exists in several minutes or hours, after that you can use the prune command to clean it", + delAsyncOfflineComps.Slice()))) + } + return nil +} diff --git a/pkg/cluster/operation/check.go b/pkg/cluster/operation/check.go index 08e6a0c3df..6da90e1493 100644 --- a/pkg/cluster/operation/check.go +++ b/pkg/cluster/operation/check.go @@ -24,6 +24,7 @@ import ( "github.com/AstroProfundis/sysinfo" "github.com/pingcap/tidb-insight/collector/insight" + "github.com/pingcap/tiup/pkg/checkpoint" "github.com/pingcap/tiup/pkg/cluster/ctxt" "github.com/pingcap/tiup/pkg/cluster/module" "github.com/pingcap/tiup/pkg/cluster/spec" @@ -845,7 +846,8 @@ func CheckJRE(ctx context.Context, e ctxt.Executor, host string, topo *spec.Spec } // check if java cli is available - stdout, stderr, err := e.Execute(ctx, "java -version", false) + // the checkpoint part of context can't be shared between goroutines + stdout, stderr, err := e.Execute(checkpoint.NewContext(ctx), "java -version", false) if err != nil { results = append(results, &CheckResult{ Name: CheckNameCommand, diff --git a/pkg/cluster/operation/destroy.go b/pkg/cluster/operation/destroy.go index c07c3a3b1e..44e59515f3 100644 --- a/pkg/cluster/operation/destroy.go +++ b/pkg/cluster/operation/destroy.go @@ -491,7 +491,7 @@ func DestroyClusterTombstone( defer tcpProxy.Close(closeC) pdEndpoints = tcpProxy.GetEndpoints() } - binlogClient, err := api.NewBinlogClient(pdEndpoints, tlsCfg) + binlogClient, err := api.NewBinlogClient(pdEndpoints, 5*time.Second, tlsCfg) if err != nil { return nil, err } @@ -600,6 +600,7 @@ func DestroyClusterTombstone( if !tombstone { pumpServers = append(pumpServers, s) + continue } nodes = append(nodes, id) @@ -629,6 +630,7 @@ func DestroyClusterTombstone( if !tombstone { drainerServers = append(drainerServers, s) + continue } nodes = append(nodes, id) diff --git a/pkg/cluster/operation/scale_in.go b/pkg/cluster/operation/scale_in.go index 2999e98da2..94fc3be0d7 100644 --- a/pkg/cluster/operation/scale_in.go +++ b/pkg/cluster/operation/scale_in.go @@ -180,7 +180,7 @@ func ScaleInCluster( defer tcpProxy.Close(closeC) pdEndpoints = tcpProxy.GetEndpoints() } - binlogClient, err := api.NewBinlogClient(pdEndpoints, tlsCfg) + binlogClient, err := api.NewBinlogClient(pdEndpoints, 5*time.Second, tlsCfg) if err != nil { return err } diff --git a/pkg/cluster/spec/alertmanager.go b/pkg/cluster/spec/alertmanager.go index a220715494..276a9997bf 100644 --- a/pkg/cluster/spec/alertmanager.go +++ b/pkg/cluster/spec/alertmanager.go @@ -108,11 +108,11 @@ func (c *AlertManagerComponent) Instances() []Instance { s.DeployDir, s.DataDir, }, - StatusFn: func(_ context.Context, _ *tls.Config, _ ...string) string { - return statusByHost(s.Host, s.WebPort, "/-/ready", nil) + StatusFn: func(_ context.Context, timeout time.Duration, _ *tls.Config, _ ...string) string { + return statusByHost(s.Host, s.WebPort, "/-/ready", timeout, nil) }, - UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration { - return UptimeByHost(s.Host, s.WebPort, tlsCfg) + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.Host, s.WebPort, timeout, tlsCfg) }, }, topo: c.Topology, diff --git a/pkg/cluster/spec/cdc.go b/pkg/cluster/spec/cdc.go index 7a53b11921..a4a52fd70d 100644 --- a/pkg/cluster/spec/cdc.go +++ b/pkg/cluster/spec/cdc.go @@ -104,11 +104,11 @@ func (c *CDCComponent) Instances() []Instance { Dirs: []string{ s.DeployDir, }, - StatusFn: func(_ context.Context, tlsCfg *tls.Config, _ ...string) string { - return statusByHost(s.Host, s.Port, "/status", tlsCfg) + StatusFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string { + return statusByHost(s.Host, s.Port, "/status", timeout, tlsCfg) }, - UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration { - return UptimeByHost(s.Host, s.Port, tlsCfg) + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.Host, s.Port, timeout, tlsCfg) }, }, c.Topology} if s.DataDir != "" { diff --git a/pkg/cluster/spec/drainer.go b/pkg/cluster/spec/drainer.go index 80676d46c4..66efadb4bc 100644 --- a/pkg/cluster/spec/drainer.go +++ b/pkg/cluster/spec/drainer.go @@ -22,6 +22,7 @@ import ( "strconv" "time" + "github.com/pingcap/tiup/pkg/cluster/api" "github.com/pingcap/tiup/pkg/cluster/ctxt" "github.com/pingcap/tiup/pkg/cluster/template/scripts" "github.com/pingcap/tiup/pkg/meta" @@ -47,6 +48,31 @@ type DrainerSpec struct { OS string `yaml:"os,omitempty"` } +// Status queries current status of the instance +func (s *DrainerSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string { + if timeout < time.Second { + timeout = statusQueryTimeout + } + + state := statusByHost(s.Host, s.Port, "/status", timeout, tlsCfg) + + if s.Offline { + binlogClient, err := api.NewBinlogClient(pdList, timeout, tlsCfg) + if err != nil { + return state + } + id := s.Host + ":" + strconv.Itoa(s.Port) + tombstone, _ := binlogClient.IsDrainerTombstone(ctx, id) + + if tombstone { + state = "Tombstone" + } else { + state = "Pending Offline" + } + } + return state +} + // Role returns the component role of the instance func (s *DrainerSpec) Role() string { return ComponentDrainer @@ -104,11 +130,9 @@ func (c *DrainerComponent) Instances() []Instance { s.DeployDir, s.DataDir, }, - StatusFn: func(_ context.Context, tlsCfg *tls.Config, _ ...string) string { - return statusByHost(s.Host, s.Port, "/status", tlsCfg) - }, - UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration { - return UptimeByHost(s.Host, s.Port, tlsCfg) + StatusFn: s.Status, + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.Host, s.Port, timeout, tlsCfg) }, }, c.Topology}) } diff --git a/pkg/cluster/spec/grafana.go b/pkg/cluster/spec/grafana.go index 42af998589..0aa56b4a0e 100644 --- a/pkg/cluster/spec/grafana.go +++ b/pkg/cluster/spec/grafana.go @@ -114,11 +114,11 @@ func (c *GrafanaComponent) Instances() []Instance { Dirs: []string{ s.DeployDir, }, - StatusFn: func(_ context.Context, _ *tls.Config, _ ...string) string { - return statusByHost(s.Host, s.Port, "", nil) + StatusFn: func(_ context.Context, timeout time.Duration, _ *tls.Config, _ ...string) string { + return statusByHost(s.Host, s.Port, "", timeout, nil) }, - UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration { - return UptimeByHost(s.Host, s.Port, tlsCfg) + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.Host, s.Port, timeout, tlsCfg) }, }, topo: c.Topology, diff --git a/pkg/cluster/spec/instance.go b/pkg/cluster/spec/instance.go index d69f7884b1..1956e003e3 100644 --- a/pkg/cluster/spec/instance.go +++ b/pkg/cluster/spec/instance.go @@ -94,8 +94,8 @@ type Instance interface { DeployDir() string UsedPorts() []int UsedDirs() []string - Status(ctx context.Context, tlsCfg *tls.Config, pdList ...string) string - Uptime(ctx context.Context, tlsCfg *tls.Config) time.Duration + Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string + Uptime(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration DataDir() string LogDir() string OS() string // only linux supported now @@ -139,8 +139,8 @@ type BaseInstance struct { Ports []int Dirs []string - StatusFn func(ctx context.Context, tlsCfg *tls.Config, pdHosts ...string) string - UptimeFn func(ctx context.Context, tlsCfg *tls.Config) time.Duration + StatusFn func(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdHosts ...string) string + UptimeFn func(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration } // Ready implements Instance interface @@ -464,11 +464,11 @@ func (i *BaseInstance) UsedDirs() []string { } // Status implements Instance interface -func (i *BaseInstance) Status(ctx context.Context, tlsCfg *tls.Config, pdList ...string) string { - return i.StatusFn(ctx, tlsCfg, pdList...) +func (i *BaseInstance) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string { + return i.StatusFn(ctx, timeout, tlsCfg, pdList...) } // Uptime implements Instance interface -func (i *BaseInstance) Uptime(ctx context.Context, tlsCfg *tls.Config) time.Duration { - return i.UptimeFn(ctx, tlsCfg) +func (i *BaseInstance) Uptime(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return i.UptimeFn(ctx, timeout, tlsCfg) } diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go index 9587cf4a8d..ccf21a949a 100644 --- a/pkg/cluster/spec/monitoring.go +++ b/pkg/cluster/spec/monitoring.go @@ -127,11 +127,11 @@ func (c *MonitorComponent) Instances() []Instance { s.DeployDir, s.DataDir, }, - StatusFn: func(_ context.Context, _ *tls.Config, _ ...string) string { - return statusByHost(s.Host, s.Port, "/-/ready", nil) + StatusFn: func(_ context.Context, timeout time.Duration, _ *tls.Config, _ ...string) string { + return statusByHost(s.Host, s.Port, "/-/ready", timeout, nil) }, - UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration { - return UptimeByHost(s.Host, s.Port, tlsCfg) + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.Host, s.Port, timeout, tlsCfg) }, }, c.Topology} if s.NgPort > 0 { diff --git a/pkg/cluster/spec/pd.go b/pkg/cluster/spec/pd.go index fd47a02769..e6ac6451a1 100644 --- a/pkg/cluster/spec/pd.go +++ b/pkg/cluster/spec/pd.go @@ -54,9 +54,13 @@ type PDSpec struct { } // Status queries current status of the instance -func (s *PDSpec) Status(ctx context.Context, tlsCfg *tls.Config, _ ...string) string { +func (s *PDSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string { + if timeout < time.Second { + timeout = statusQueryTimeout + } + addr := fmt.Sprintf("%s:%d", s.Host, s.ClientPort) - pc := api.NewPDClient(ctx, []string{addr}, statusQueryTimeout, tlsCfg) + pc := api.NewPDClient(ctx, []string{addr}, timeout, tlsCfg) // check health err := pc.CheckHealth() @@ -138,8 +142,8 @@ func (c *PDComponent) Instances() []Instance { s.DataDir, }, StatusFn: s.Status, - UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration { - return UptimeByHost(s.Host, s.ClientPort, tlsCfg) + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.Host, s.ClientPort, timeout, tlsCfg) }, }, topo: c.Topology, diff --git a/pkg/cluster/spec/pump.go b/pkg/cluster/spec/pump.go index ddea53e97c..0a5e210a58 100644 --- a/pkg/cluster/spec/pump.go +++ b/pkg/cluster/spec/pump.go @@ -19,8 +19,10 @@ import ( "fmt" "os" "path/filepath" + "strconv" "time" + "github.com/pingcap/tiup/pkg/cluster/api" "github.com/pingcap/tiup/pkg/cluster/ctxt" "github.com/pingcap/tiup/pkg/cluster/template/scripts" "github.com/pingcap/tiup/pkg/meta" @@ -45,6 +47,31 @@ type PumpSpec struct { OS string `yaml:"os,omitempty"` } +// Status queries current status of the instance +func (s *PumpSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string { + if timeout < time.Second { + timeout = statusQueryTimeout + } + + state := statusByHost(s.Host, s.Port, "/status", timeout, tlsCfg) + + if s.Offline { + binlogClient, err := api.NewBinlogClient(pdList, timeout, tlsCfg) + if err != nil { + return state + } + id := s.Host + ":" + strconv.Itoa(s.Port) + tombstone, _ := binlogClient.IsPumpTombstone(ctx, id) + + if tombstone { + state = "Tombstone" + } else { + state = "Pending Offline" + } + } + return state +} + // Role returns the component role of the instance func (s *PumpSpec) Role() string { return ComponentPump @@ -102,11 +129,9 @@ func (c *PumpComponent) Instances() []Instance { s.DeployDir, s.DataDir, }, - StatusFn: func(_ context.Context, tlsCfg *tls.Config, _ ...string) string { - return statusByHost(s.Host, s.Port, "/status", tlsCfg) - }, - UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration { - return UptimeByHost(s.Host, s.Port, tlsCfg) + StatusFn: s.Status, + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.Host, s.Port, timeout, tlsCfg) }, }, c.Topology}) } diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index 72c7f3fdc4..e13ddcd024 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -20,6 +20,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "time" "github.com/creasty/defaults" @@ -158,7 +159,7 @@ type Topology interface { ComponentsByStartOrder() []Component ComponentsByStopOrder() []Component ComponentsByUpdateOrder() []Component - IterInstance(fn func(instance Instance)) + IterInstance(fn func(instance Instance), concurrency ...int) GetMonitoredOptions() *MonitoredOptions // count how many time a path is used by instances in cluster CountDir(host string, dir string) int @@ -427,8 +428,12 @@ func (s *Specification) AdjustByVersion(clusterVersion string) { } // GetDashboardAddress returns the cluster's dashboard addr -func (s *Specification) GetDashboardAddress(ctx context.Context, tlsCfg *tls.Config, pdList ...string) (string, error) { - pc := api.NewPDClient(ctx, pdList, statusQueryTimeout, tlsCfg) +func (s *Specification) GetDashboardAddress(ctx context.Context, tlsCfg *tls.Config, timeout time.Duration, pdList ...string) (string, error) { + if timeout < time.Second { + timeout = statusQueryTimeout + } + + pc := api.NewPDClient(ctx, pdList, timeout, tlsCfg) dashboardAddr, err := pc.GetDashboardAddress() if err != nil { return "", err @@ -727,12 +732,28 @@ func (s *Specification) IterComponent(fn func(comp Component)) { } // IterInstance iterates all instances in component starting order -func (s *Specification) IterInstance(fn func(instance Instance)) { +func (s *Specification) IterInstance(fn func(instance Instance), concurrency ...int) { + maxWorkers := 1 + wg := sync.WaitGroup{} + if len(concurrency) > 0 && concurrency[0] > 1 { + maxWorkers = concurrency[0] + } + workerPool := make(chan struct{}, maxWorkers) + for _, comp := range s.ComponentsByStartOrder() { for _, inst := range comp.Instances() { - fn(inst) + wg.Add(1) + workerPool <- struct{}{} + go func(inst Instance) { + defer func() { + <-workerPool + wg.Done() + }() + fn(inst) + }(inst) } } + wg.Wait() } // IterHost iterates one instance for each host diff --git a/pkg/cluster/spec/spec_manager_test.go b/pkg/cluster/spec/spec_manager_test.go index 254fc39afd..76926878e2 100644 --- a/pkg/cluster/spec/spec_manager_test.go +++ b/pkg/cluster/spec/spec_manager_test.go @@ -101,7 +101,7 @@ func (t *TestTopology) ComponentsByUpdateOrder() []Component { return nil } -func (t *TestTopology) IterInstance(fn func(instance Instance)) { +func (t *TestTopology) IterInstance(fn func(instance Instance), concurrency ...int) { } func (t *TestTopology) GetMonitoredOptions() *MonitoredOptions { diff --git a/pkg/cluster/spec/tidb.go b/pkg/cluster/spec/tidb.go index 046511b242..6a9bd0cd48 100644 --- a/pkg/cluster/spec/tidb.go +++ b/pkg/cluster/spec/tidb.go @@ -105,11 +105,11 @@ func (c *TiDBComponent) Instances() []Instance { Dirs: []string{ s.DeployDir, }, - StatusFn: func(_ context.Context, tlsCfg *tls.Config, _ ...string) string { - return statusByHost(s.Host, s.StatusPort, "/status", tlsCfg) + StatusFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string { + return statusByHost(s.Host, s.StatusPort, "/status", timeout, tlsCfg) }, - UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration { - return UptimeByHost(s.Host, s.StatusPort, tlsCfg) + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.Host, s.StatusPort, timeout, tlsCfg) }, }, c.Topology}) } diff --git a/pkg/cluster/spec/tiflash.go b/pkg/cluster/spec/tiflash.go index 3fb7b33ac1..2e1f954a1c 100644 --- a/pkg/cluster/spec/tiflash.go +++ b/pkg/cluster/spec/tiflash.go @@ -66,7 +66,7 @@ type TiFlashSpec struct { } // Status queries current status of the instance -func (s *TiFlashSpec) Status(ctx context.Context, tlsCfg *tls.Config, pdList ...string) string { +func (s *TiFlashSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string { storeAddr := fmt.Sprintf("%s:%d", s.Host, s.FlashServicePort) state := checkStoreStatus(ctx, storeAddr, tlsCfg, pdList...) if s.Offline && strings.ToLower(state) == "offline" { @@ -234,7 +234,7 @@ func (c *TiFlashComponent) Instances() []Instance { s.DataDir, }, StatusFn: s.Status, - UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration { + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return 0 }, }, c.Topology}) diff --git a/pkg/cluster/spec/tikv.go b/pkg/cluster/spec/tikv.go index b4af65e067..7cc1478625 100644 --- a/pkg/cluster/spec/tikv.go +++ b/pkg/cluster/spec/tikv.go @@ -84,7 +84,7 @@ func checkStoreStatus(ctx context.Context, storeAddr string, tlsCfg *tls.Config, } // Status queries current status of the instance -func (s *TiKVSpec) Status(ctx context.Context, tlsCfg *tls.Config, pdList ...string) string { +func (s *TiKVSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string { storeAddr := addr(s) state := checkStoreStatus(ctx, storeAddr, tlsCfg, pdList...) if s.Offline && strings.ToLower(state) == "offline" { @@ -183,8 +183,8 @@ func (c *TiKVComponent) Instances() []Instance { s.DataDir, }, StatusFn: s.Status, - UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration { - return UptimeByHost(s.Host, s.StatusPort, tlsCfg) + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.Host, s.StatusPort, timeout, tlsCfg) }, }, c.Topology}) } diff --git a/pkg/cluster/spec/tispark.go b/pkg/cluster/spec/tispark.go index bc9199d77d..fb68e0b450 100644 --- a/pkg/cluster/spec/tispark.go +++ b/pkg/cluster/spec/tispark.go @@ -151,10 +151,10 @@ func (c *TiSparkMasterComponent) Instances() []Instance { Dirs: []string{ s.DeployDir, }, - StatusFn: func(_ context.Context, tlsCfg *tls.Config, _ ...string) string { - return statusByHost(s.Host, s.WebPort, "", tlsCfg) + StatusFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string { + return statusByHost(s.Host, s.WebPort, "", timeout, tlsCfg) }, - UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration { + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return 0 }, }, @@ -332,10 +332,10 @@ func (c *TiSparkWorkerComponent) Instances() []Instance { Dirs: []string{ s.DeployDir, }, - StatusFn: func(_ context.Context, tlsCfg *tls.Config, _ ...string) string { - return statusByHost(s.Host, s.WebPort, "", tlsCfg) + StatusFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string { + return statusByHost(s.Host, s.WebPort, "", timeout, tlsCfg) }, - UptimeFn: func(_ context.Context, tlsCfg *tls.Config) time.Duration { + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return 0 }, }, diff --git a/pkg/cluster/spec/util.go b/pkg/cluster/spec/util.go index 0e1a523ccb..49a790e927 100644 --- a/pkg/cluster/spec/util.go +++ b/pkg/cluster/spec/util.go @@ -124,8 +124,12 @@ func LoadClientCert(dir string) (*tls.Config, error) { } // statusByHost queries current status of the instance by http status api. -func statusByHost(host string, port int, path string, tlsCfg *tls.Config) string { - client := utils.NewHTTPClient(statusQueryTimeout, tlsCfg) +func statusByHost(host string, port int, path string, timeout time.Duration, tlsCfg *tls.Config) string { + if timeout < time.Second { + timeout = statusQueryTimeout + } + + client := utils.NewHTTPClient(timeout, tlsCfg) scheme := "http" if tlsCfg != nil { @@ -145,14 +149,18 @@ func statusByHost(host string, port int, path string, tlsCfg *tls.Config) string } // UptimeByHost queries current uptime of the instance by http Prometheus metric api. -func UptimeByHost(host string, port int, tlsCfg *tls.Config) time.Duration { +func UptimeByHost(host string, port int, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + if timeout < time.Second { + timeout = statusQueryTimeout + } + scheme := "http" if tlsCfg != nil { scheme = "https" } url := fmt.Sprintf("%s://%s:%d/metrics", scheme, host, port) - client := utils.NewHTTPClient(statusQueryTimeout, tlsCfg) + client := utils.NewHTTPClient(timeout, tlsCfg) body, err := client.Get(context.TODO(), url) if err != nil || body == nil { diff --git a/pkg/environment/env.go b/pkg/environment/env.go index a2d35f69bf..4bb55dd368 100644 --- a/pkg/environment/env.go +++ b/pkg/environment/env.go @@ -81,7 +81,7 @@ type Environment struct { } // InitEnv creates a new Environment object configured using env vars and defaults. -func InitEnv(options repository.Options) (*Environment, error) { +func InitEnv(options repository.Options, mOpt repository.MirrorOptions) (*Environment, error) { if env := GlobalEnv(); env != nil { return env, nil } @@ -92,7 +92,7 @@ func InitEnv(options repository.Options) (*Environment, error) { // Initialize the repository // Replace the mirror if some sub-commands use different mirror address mirrorAddr := Mirror() - mirror := repository.NewMirror(mirrorAddr, repository.MirrorOptions{}) + mirror := repository.NewMirror(mirrorAddr, mOpt) if err := mirror.Open(); err != nil { return nil, err } diff --git a/pkg/environment/history.go b/pkg/environment/history.go new file mode 100644 index 0000000000..a71e303fb7 --- /dev/null +++ b/pkg/environment/history.go @@ -0,0 +1,244 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package environment + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "time" + + "github.com/fatih/color" + "github.com/pingcap/tiup/pkg/tui" + "github.com/pingcap/tiup/pkg/utils" + "github.com/pkg/errors" +) + +const ( + // HistoryDir history save path + HistoryDir = "history" + historyPrefix = "tiup-history-" + historySize int64 = 1024 * 64 // history file default size is 64k +) + +// commandRow type of command history row +type historyRow struct { + Date time.Time `json:"time"` + Command string `json:"command"` + Code int `json:"exit_code"` +} + +// historyItem record history row file item +type historyItem struct { + path string + info fs.FileInfo + index int +} + +// HistoryRecord record tiup exec cmd +func HistoryRecord(env *Environment, command []string, date time.Time, code int) error { + if env == nil { + return nil + } + + historyPath := env.LocalPath(HistoryDir) + if utils.IsNotExist(historyPath) { + err := os.MkdirAll(historyPath, 0755) + if err != nil { + return err + } + } + + h := &historyRow{ + Command: strings.Join(command, " "), + Date: date, + Code: code, + } + + return h.save(historyPath) +} + +// save save commandRow to file +func (r *historyRow) save(dir string) error { + rBytes, err := json.Marshal(r) + if err != nil { + return err + } + + historyFile := getLatestHistoryFile(dir) + + f, err := os.OpenFile(historyFile.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return err + } + defer f.Close() + _, err = f.Write(append(rBytes, []byte("\n")...)) + return err +} + +// GetHistory get tiup history +func (env *Environment) GetHistory(count int, all bool) ([]*historyRow, error) { + fList, err := getHistoryFileList(env.LocalPath(HistoryDir)) + if err != nil { + return nil, err + } + rows := []*historyRow{} + for _, f := range fList { + rs, err := f.getHistory() + if err != nil { + return rows, err + } + if (len(rows)+len(rs)) > count && !all { + i := len(rows) + len(rs) - count + rows = append(rs[i:], rows...) + break + } + + rows = append(rs, rows...) + } + return rows, nil +} + +// DeleteHistory delete history file +func (env *Environment) DeleteHistory(retainDays int, skipConfirm bool) error { + if retainDays < 0 { + return errors.Errorf("retainDays cannot be less than 0") + } + + // history file before `DelBeforeTime` will be deleted + oneDayDuration, _ := time.ParseDuration("-24h") + delBeforeTime := time.Now().Add(oneDayDuration * time.Duration(retainDays)) + + if !skipConfirm { + fmt.Printf("History logs before %s will be %s!\n", + color.HiYellowString(delBeforeTime.Format("2006-01-02T15:04:05")), + color.HiYellowString("deleted"), + ) + if err := tui.PromptForConfirmOrAbortError("Do you want to continue? [y/N]:"); err != nil { + return err + } + } + + fList, err := getHistoryFileList(env.LocalPath(HistoryDir)) + if err != nil { + return err + } + + if len(fList) == 0 { + return nil + } + + for _, f := range fList { + if f.info.ModTime().Before(delBeforeTime) { + err := os.Remove(f.path) + if err != nil { + return err + } + continue + } + } + return nil +} + +// getHistory get tiup history execution row +func (i *historyItem) getHistory() ([]*historyRow, error) { + rows := []*historyRow{} + + fi, err := os.Open(i.path) + if err != nil { + return rows, err + } + defer fi.Close() + + br := bufio.NewReader(fi) + for { + a, _, c := br.ReadLine() + if c == io.EOF { + break + } + r := &historyRow{} + // ignore + err := json.Unmarshal(a, r) + if err != nil { + continue + } + rows = append(rows, r) + } + + return rows, nil +} + +// getHistoryFileList get the history file list +func getHistoryFileList(dir string) ([]historyItem, error) { + fileInfos, err := os.ReadDir(dir) + if err != nil { + return nil, err + } + + hfileList := []historyItem{} + for _, fi := range fileInfos { + if fi.IsDir() { + continue + } + + // another suffix + // ex: tiup-history-0.bak + i, err := strconv.Atoi((strings.TrimPrefix(fi.Name(), historyPrefix))) + if err != nil { + continue + } + + fInfo, _ := fi.Info() + hfileList = append(hfileList, historyItem{ + path: filepath.Join(dir, fi.Name()), + index: i, + info: fInfo, + }) + } + + sort.Slice(hfileList, func(i, j int) bool { + return hfileList[i].index > hfileList[j].index + }) + + return hfileList, nil +} + +// getLatestHistoryFile get the latest history file, use index 0 if it doesn't exist +func getLatestHistoryFile(dir string) (item historyItem) { + fileList, err := getHistoryFileList(dir) + // start from 0 + if len(fileList) == 0 || err != nil { + item.index = 0 + item.path = filepath.Join(dir, fmt.Sprintf("%s%s", historyPrefix, strconv.Itoa(item.index))) + return + } + + latestItem := fileList[0] + + if latestItem.info.Size() >= historySize { + item.index = latestItem.index + 1 + item.path = filepath.Join(dir, fmt.Sprintf("%s%s", historyPrefix, strconv.Itoa(item.index))) + } else { + item = latestItem + } + + return +} diff --git a/pkg/repository/model/model.go b/pkg/repository/model/model.go index 5024cdc74c..6352adc302 100644 --- a/pkg/repository/model/model.go +++ b/pkg/repository/model/model.go @@ -134,10 +134,22 @@ func (m *model) Rotate(manifest *v1manifest.Manifest) error { return err } + // write new 'root.json' file with verion prefix manifestFilename := fmt.Sprintf("%d.root.json", root.Version) if err := m.txn.WriteManifest(manifestFilename, manifest); err != nil { return err } + /* not yet update the 'root.json' without version prefix, as we don't + * have a '1.root.json', so the 'root.json' is playing the role of initial + * '1.root.json', clients are updating to the latest 'n.root.json' no + * matter older ones are expired or not + * maybe we could update the 'root.json' some day when we have many many + * versions of root.json available and the updating process from old clients + * are causing performance issues + */ + // if err := m.txn.WriteManifest("root.json", manifest); err != nil { + // return err + // } fi, err := m.txn.Stat(manifestFilename) if err != nil { diff --git a/pkg/repository/repository.go b/pkg/repository/repository.go index 8924ace8ef..0dd38fb3e1 100644 --- a/pkg/repository/repository.go +++ b/pkg/repository/repository.go @@ -22,7 +22,6 @@ type Repository struct { // Options represents options for a repository type Options struct { - SkipVersionCheck bool GOOS string GOARCH string DisableDecompress bool