Skip to content

Commit

Permalink
Support native ssh client (#615)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucklove authored Jul 27, 2020
1 parent 4ce9c65 commit f1142b1
Show file tree
Hide file tree
Showing 34 changed files with 420 additions and 177 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/integrate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ jobs:
- "test_scale_core"
- "test_scale_tools"
- "test_upgrade"
- "test_cmd_with_native_ssh"
- "test_scale_core_with_native_ssh"
- "test_scale_tools_with_native_ssh"
# - "test_dm_cmd"
env:
working-directory: ${{ github.workspace }}/go/src/github.com/${{ github.repository }}
Expand Down
3 changes: 3 additions & 0 deletions components/cluster/command/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, op
s.IdentityFile,
s.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.NativeSSH,
).
Mkdir(opt.user, inst.GetHost(), filepath.Join(task.CheckToolsPathDir, "bin")).
CopyComponent(
Expand Down Expand Up @@ -269,6 +270,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, op
s.IdentityFile,
s.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.NativeSSH,
).
Rmdir(inst.GetHost(), task.CheckToolsPathDir).
BuildAsStep(fmt.Sprintf(" - Cleanup check files on %s:%d", inst.GetHost(), inst.GetSSHPort()))
Expand Down Expand Up @@ -308,6 +310,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, op
s.IdentityFile,
s.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.NativeSSH,
)
resLines, err := handleCheckResults(ctx, host, opt, tf)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions components/cluster/command/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func deploy(clusterName, clusterVersion, topoFile string, opt deployOptions) err
sshConnProps.IdentityFile,
sshConnProps.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.NativeSSH,
).
EnvInit(inst.GetHost(), globalOptions.User).
Mkdir(globalOptions.User, inst.GetHost(), dirs...).
Expand All @@ -277,7 +278,7 @@ func deploy(clusterName, clusterVersion, topoFile string, opt deployOptions) err
// Deploy component
// prepare deployment server
t := task.NewBuilder().
UserSSH(inst.GetHost(), inst.GetSSHPort(), globalOptions.User, gOpt.SSHTimeout).
UserSSH(inst.GetHost(), inst.GetSSHPort(), globalOptions.User, gOpt.SSHTimeout, gOpt.NativeSSH).
Mkdir(globalOptions.User, inst.GetHost(),
deployDir, logDir,
filepath.Join(deployDir, "bin"),
Expand Down Expand Up @@ -416,7 +417,7 @@ func buildMonitoredDeployTask(
logDir := clusterutil.Abs(globalOptions.User, monitoredOptions.LogDir)
// Deploy component
t := task.NewBuilder().
UserSSH(host, info.ssh, globalOptions.User, gOpt.SSHTimeout).
UserSSH(host, info.ssh, globalOptions.User, gOpt.SSHTimeout, gOpt.NativeSSH).
Mkdir(globalOptions.User, host,
deployDir, dataDir, logDir,
filepath.Join(deployDir, "bin"),
Expand Down
4 changes: 2 additions & 2 deletions components/cluster/command/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func destroyTombstoneIfNeed(clusterName string, metadata *spec.ClusterMeta, opt
return perrs.AddStack(err)
}

err = ctx.SetClusterSSH(topo, metadata.User, gOpt.SSHTimeout)
err = ctx.SetClusterSSH(topo, metadata.User, gOpt.SSHTimeout, gOpt.NativeSSH)
if err != nil {
return perrs.AddStack(err)
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func displayClusterTopology(clusterName string, opt *operator.Options) error {
return perrs.AddStack(err)
}

err = ctx.SetClusterSSH(topo, metadata.User, gOpt.SSHTimeout)
err = ctx.SetClusterSSH(topo, metadata.User, gOpt.SSHTimeout, gOpt.NativeSSH)
if err != nil {
return perrs.AddStack(err)
}
Expand Down
4 changes: 2 additions & 2 deletions components/cluster/command/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func newImportCmd() *cobra.Command {
}

// parse config and import nodes
if err = ansible.ParseAndImportInventory(ansibleDir, ansibleCfgFile, clsMeta, inv, gOpt.SSHTimeout); err != nil {
if err = ansible.ParseAndImportInventory(ansibleDir, ansibleCfgFile, clsMeta, inv, gOpt.SSHTimeout, gOpt.NativeSSH); err != nil {
return err
}

Expand All @@ -124,7 +124,7 @@ func newImportCmd() *cobra.Command {
}

// copy config files form deployment servers
if err = ansible.ImportConfig(clsName, clsMeta, gOpt.SSHTimeout); err != nil {
if err = ansible.ImportConfig(clsName, clsMeta, gOpt.SSHTimeout, gOpt.NativeSSH); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions components/cluster/command/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func buildReloadTask(
logDir := clusterutil.Abs(metadata.User, inst.LogDir())

// Download and copy the latest component to remote if the cluster is imported from Ansible
tb := task.NewBuilder().UserSSH(inst.GetHost(), inst.GetSSHPort(), metadata.User, gOpt.SSHTimeout)
tb := task.NewBuilder().UserSSH(inst.GetHost(), inst.GetSSHPort(), metadata.User, gOpt.SSHTimeout, gOpt.NativeSSH)
if inst.IsImported() {
switch compName := inst.ComponentName(); compName {
case spec.ComponentGrafana, spec.ComponentPrometheus, spec.ComponentAlertManager:
Expand Down Expand Up @@ -198,7 +198,7 @@ func refreshMonitoredConfigTask(
logDir := clusterutil.Abs(globalOptions.User, monitoredOptions.LogDir)
// Generate configs
t := task.NewBuilder().
UserSSH(host, info.ssh, globalOptions.User, gOpt.SSHTimeout).
UserSSH(host, info.ssh, globalOptions.User, gOpt.SSHTimeout, gOpt.NativeSSH).
MonitoredConfig(
clusterName,
comp,
Expand Down
11 changes: 11 additions & 0 deletions components/cluster/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func init() {
flags.ShowBacktrace = len(os.Getenv("TIUP_BACKTRACE")) > 0
cobra.EnableCommandSorting = false

nativeEnvVar := strings.ToLower(os.Getenv(localdata.EnvNameNativeSSHClient))
if nativeEnvVar == "true" || nativeEnvVar == "1" || nativeEnvVar == "enable" {
gOpt.NativeSSH = true
}

rootCmd = &cobra.Command{
Use: cliutil.OsArgs0(),
Short: "Deploy a TiDB cluster for production",
Expand Down Expand Up @@ -106,6 +111,11 @@ func init() {

teleCommand = getParentNames(cmd)

if gOpt.NativeSSH {
zap.L().Info("Native ssh client will be used",
zap.String(localdata.EnvNameNativeSSHClient, os.Getenv(localdata.EnvNameNativeSSHClient)))
}

return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
Expand All @@ -123,6 +133,7 @@ func init() {
// start/stop operations is 90s, the default value of this argument is better be longer than that
rootCmd.PersistentFlags().Int64Var(&gOpt.OptTimeout, "wait-timeout", 120, "Timeout in seconds to wait for an operation to complete, ignored for operations that don't fit.")
rootCmd.PersistentFlags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'")
rootCmd.PersistentFlags().BoolVar(&gOpt.NativeSSH, "native-ssh", gOpt.NativeSSH, "Use the native SSH client installed on local system instead of the build-in one.")

rootCmd.AddCommand(
newCheckCmd(),
Expand Down
3 changes: 2 additions & 1 deletion components/cluster/command/scale_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func buildScaleOutTask(
sshConnProps.IdentityFile,
sshConnProps.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.NativeSSH,
).
EnvInit(instance.GetHost(), metadata.User).
Mkdir(globalOptions.User, instance.GetHost(), dirs...).
Expand All @@ -246,7 +247,7 @@ func buildScaleOutTask(

// Deploy component
tb := task.NewBuilder().
UserSSH(inst.GetHost(), inst.GetSSHPort(), metadata.User, gOpt.SSHTimeout).
UserSSH(inst.GetHost(), inst.GetSSHPort(), metadata.User, gOpt.SSHTimeout, gOpt.NativeSSH).
Mkdir(metadata.User, inst.GetHost(),
deployDir, logDir,
filepath.Join(deployDir, "bin"),
Expand Down
3 changes: 2 additions & 1 deletion components/dm/command/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func deploy(clusterName, clusterVersion, topoFile string, opt deployOptions) err
sshConnProps.IdentityFile,
sshConnProps.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.NativeSSH,
).
EnvInit(inst.GetHost(), globalOptions.User).
Mkdir(globalOptions.User, inst.GetHost(), dirs...).
Expand All @@ -232,7 +233,7 @@ func deploy(clusterName, clusterVersion, topoFile string, opt deployOptions) err
logDir := clusterutil.Abs(globalOptions.User, inst.LogDir())
// Deploy component
t := task.NewBuilder().
UserSSH(inst.GetHost(), inst.GetSSHPort(), globalOptions.User, gOpt.SSHTimeout).
UserSSH(inst.GetHost(), inst.GetSSHPort(), globalOptions.User, gOpt.SSHTimeout, gOpt.NativeSSH).
Mkdir(globalOptions.User, inst.GetHost(),
deployDir, logDir,
filepath.Join(deployDir, "bin"),
Expand Down
31 changes: 2 additions & 29 deletions components/playground/instance/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,9 @@ import (
"github.com/pingcap/tiup/pkg/environment"
tiupexec "github.com/pingcap/tiup/pkg/exec"
"github.com/pingcap/tiup/pkg/repository/v0manifest"
"github.com/pingcap/tiup/pkg/utils"
)

// ErrorWaitTimeout is used to represent timeout of a command
// Example:
// _ = syscall.Kill(cmd.Process.Pid, syscall.SIGKILL)
// if err := WaitContext(context.WithTimeout(context.Background(), 3), cmd); err == ErrorWaitTimeout {
// // Do something
// }
var ErrorWaitTimeout = errors.New("wait command timeout")

// Process represent process to be run by playground
type Process interface {
Start() error
Expand All @@ -46,7 +39,7 @@ func (p *process) Start() error {

// Wait implements Instance interface.
func (p *process) Wait(ctx context.Context) error {
return WaitContext(ctx, p.cmd)
return utils.WaitContext(ctx, p.cmd)
}

// Pid implements Instance interface.
Expand Down Expand Up @@ -97,23 +90,3 @@ func NewComponentProcess(ctx context.Context, dir, binPath, component string, ve

return &process{cmd: cmd}, nil
}

// WaitContext wrap cmd.Wait with context
func WaitContext(ctx context.Context, cmd *exec.Cmd) error {
// We use cmd.Process.Wait instead of cmd.Wait because cmd.Wait is not reenterable
c := make(chan error, 1)
go func() {
if cmd == nil || cmd.Process == nil {
c <- nil
} else {
_, err := cmd.Process.Wait()
c <- err
}
}()
select {
case <-ctx.Done():
return ErrorWaitTimeout
case err := <-c:
return err
}
}
6 changes: 3 additions & 3 deletions components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tiup/components/playground/instance"
"github.com/pingcap/tiup/components/playground/utils"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/cluster/clusterutil"
"github.com/pingcap/tiup/pkg/environment"
"github.com/pingcap/tiup/pkg/localdata"
"github.com/pingcap/tiup/pkg/repository/v0manifest"
"github.com/pingcap/tiup/pkg/utils"
"golang.org/x/mod/semver"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -925,7 +925,7 @@ func (p *Playground) terminate(sig syscall.Signal, extraCmds ...*exec.Cmd) {
}
ctx, cancel := context.WithTimeout(context.Background(), killDeadline)
defer cancel()
if err := inst.Wait(ctx); err == instance.ErrorWaitTimeout {
if err := inst.Wait(ctx); err == utils.ErrorWaitTimeout {
_ = syscall.Kill(inst.Pid(), syscall.SIGKILL)
}
return nil
Expand All @@ -936,7 +936,7 @@ func (p *Playground) terminate(sig syscall.Signal, extraCmds ...*exec.Cmd) {
}
ctx, cancel := context.WithTimeout(context.Background(), killDeadline)
defer cancel()
if err := instance.WaitContext(ctx, cmd); err == instance.ErrorWaitTimeout {
if err := utils.WaitContext(ctx, cmd); err == utils.ErrorWaitTimeout {
_ = syscall.Kill(cmd.Process.Pid, syscall.SIGKILL)
}
}
Expand Down
41 changes: 0 additions & 41 deletions components/playground/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@
package utils

import (
"bufio"
"fmt"
"os"
"time"

"github.com/pingcap/errors"
)

// RetryOption is options for Retry()
Expand Down Expand Up @@ -82,40 +78,3 @@ func Retry(doFunc func() error, opts ...RetryOption) error {

return fmt.Errorf("operation exceeds the max retry attempts of %d", cfg.Attempts)
}

// TailN try get the latest n line of the file.
func TailN(fname string, n int) (lines []string, err error) {
file, err := os.Open(fname)
if err != nil {
return nil, errors.AddStack(err)
}
defer file.Close()

estimateLineSize := 1024

stat, err := os.Stat(fname)
if err != nil {
return nil, errors.AddStack(err)
}

start := int(stat.Size()) - n*estimateLineSize
if start < 0 {
start = 0
}

_, err = file.Seek(int64(start), 0 /*means relative to the origin of the file*/)
if err != nil {
return nil, errors.AddStack(err)
}

scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}

if len(lines) > n {
lines = lines[len(lines)-n:]
}

return
}
6 changes: 3 additions & 3 deletions pkg/cluster/ansible/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// ImportConfig copies config files from cluster which deployed through tidb-ansible
func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout int64) error {
func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout int64, nativeClient bool) error {
// there may be already cluster dir, skip create
//if err := os.MkdirAll(meta.ClusterPath(name), 0755); err != nil {
// return err
Expand All @@ -42,7 +42,7 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout int64) erro
SSHKeySet(
spec.ClusterPath(name, "ssh", "id_rsa"),
spec.ClusterPath(name, "ssh", "id_rsa.pub")).
UserSSH(inst.GetHost(), inst.GetSSHPort(), clsMeta.User, sshTimeout).
UserSSH(inst.GetHost(), inst.GetSSHPort(), clsMeta.User, sshTimeout, nativeClient).
CopyFile(filepath.Join(inst.DeployDir(), "conf", inst.ComponentName()+".toml"),
spec.ClusterPath(name,
spec.AnsibleImportedConfigPath,
Expand All @@ -59,7 +59,7 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout int64) erro
SSHKeySet(
spec.ClusterPath(name, "ssh", "id_rsa"),
spec.ClusterPath(name, "ssh", "id_rsa.pub")).
UserSSH(inst.GetHost(), inst.GetSSHPort(), clsMeta.User, sshTimeout).
UserSSH(inst.GetHost(), inst.GetSSHPort(), clsMeta.User, sshTimeout, nativeClient).
CopyFile(filepath.Join(inst.DeployDir(), "conf", inst.ComponentName()+".toml"),
spec.ClusterPath(name,
spec.AnsibleImportedConfigPath,
Expand Down
10 changes: 5 additions & 5 deletions pkg/cluster/ansible/dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
)

// parseDirs sets values of directories of component
func parseDirs(user string, ins spec.InstanceSpec, sshTimeout int64) (spec.InstanceSpec, error) {
func parseDirs(user string, ins spec.InstanceSpec, sshTimeout int64, nativeClient bool) (spec.InstanceSpec, error) {
hostName, sshPort := ins.SSH()

e := executor.NewSSHExecutor(executor.SSHConfig{
Expand All @@ -40,7 +40,7 @@ func parseDirs(user string, ins spec.InstanceSpec, sshTimeout int64) (spec.Insta
User: user,
KeyFile: SSHKeyPath(), // ansible generated keyfile
Timeout: time.Second * time.Duration(sshTimeout),
}, false) // not using global sudo
}, false /* not using global sudo */, nativeClient)
log.Debugf("Detecting deploy paths on %s...", hostName)

stdout, err := readStartScript(e, ins.Role(), hostName, ins.GetMainPort())
Expand Down Expand Up @@ -242,7 +242,7 @@ func parseDirs(user string, ins spec.InstanceSpec, sshTimeout int64) (spec.Insta
return ins, nil
}

func parseTiflashConfig(e *executor.SSHExecutor, spec *spec.TiFlashSpec, fname string) error {
func parseTiflashConfig(e executor.Executor, spec *spec.TiFlashSpec, fname string) error {
data, err := readFile(e, fname)
if err != nil {
return errors.AddStack(err)
Expand Down Expand Up @@ -275,7 +275,7 @@ func parseTiflashConfigFromFileData(spec *spec.TiFlashSpec, data []byte) error {
return nil
}

func readFile(e *executor.SSHExecutor, fname string) (data []byte, err error) {
func readFile(e executor.Executor, fname string) (data []byte, err error) {
cmd := fmt.Sprintf("cat %s", fname)
stdout, stderr, err := e.Execute(cmd, false)
if err != nil {
Expand All @@ -285,7 +285,7 @@ func readFile(e *executor.SSHExecutor, fname string) (data []byte, err error) {
return stdout, nil
}

func readStartScript(e *executor.SSHExecutor, component, host string, port int) (string, error) {
func readStartScript(e executor.Executor, component, host string, port int) (string, error) {
serviceFile := fmt.Sprintf("%s/%s-%d.service",
systemdUnitPath,
component,
Expand Down
Loading

0 comments on commit f1142b1

Please sign in to comment.