Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support native ssh client #615

Merged
merged 18 commits into from
Jul 27, 2020
Merged
3 changes: 3 additions & 0 deletions .github/workflows/integrate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ jobs:
- "test_scale_core"
- "test_scale_tools"
- "test_upgrade"
- "test_cmd_with_native_ssh"
- "test_scale_core_with_native_ssh"
- "test_scale_tools_with_native_ssh"
# - "test_dm_cmd"
env:
working-directory: ${{ github.workspace }}/go/src/github.com/${{ github.repository }}
Expand Down
3 changes: 3 additions & 0 deletions components/cluster/command/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, op
s.IdentityFile,
s.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.NativeSSH,
).
Mkdir(opt.user, inst.GetHost(), filepath.Join(task.CheckToolsPathDir, "bin")).
CopyComponent(
Expand Down Expand Up @@ -269,6 +270,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, op
s.IdentityFile,
s.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.NativeSSH,
).
Rmdir(inst.GetHost(), task.CheckToolsPathDir).
BuildAsStep(fmt.Sprintf(" - Cleanup check files on %s:%d", inst.GetHost(), inst.GetSSHPort()))
Expand Down Expand Up @@ -308,6 +310,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, op
s.IdentityFile,
s.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.NativeSSH,
)
resLines, err := handleCheckResults(ctx, host, opt, tf)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions components/cluster/command/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func deploy(clusterName, clusterVersion, topoFile string, opt deployOptions) err
sshConnProps.IdentityFile,
sshConnProps.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.NativeSSH,
).
EnvInit(inst.GetHost(), globalOptions.User).
Mkdir(globalOptions.User, inst.GetHost(), dirs...).
Expand All @@ -277,7 +278,7 @@ func deploy(clusterName, clusterVersion, topoFile string, opt deployOptions) err
// Deploy component
// prepare deployment server
t := task.NewBuilder().
UserSSH(inst.GetHost(), inst.GetSSHPort(), globalOptions.User, gOpt.SSHTimeout).
UserSSH(inst.GetHost(), inst.GetSSHPort(), globalOptions.User, gOpt.SSHTimeout, gOpt.NativeSSH).
Mkdir(globalOptions.User, inst.GetHost(),
deployDir, logDir,
filepath.Join(deployDir, "bin"),
Expand Down Expand Up @@ -416,7 +417,7 @@ func buildMonitoredDeployTask(
logDir := clusterutil.Abs(globalOptions.User, monitoredOptions.LogDir)
// Deploy component
t := task.NewBuilder().
UserSSH(host, info.ssh, globalOptions.User, gOpt.SSHTimeout).
UserSSH(host, info.ssh, globalOptions.User, gOpt.SSHTimeout, gOpt.NativeSSH).
Mkdir(globalOptions.User, host,
deployDir, dataDir, logDir,
filepath.Join(deployDir, "bin"),
Expand Down
4 changes: 2 additions & 2 deletions components/cluster/command/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func destroyTombstoneIfNeed(clusterName string, metadata *spec.ClusterMeta, opt
return perrs.AddStack(err)
}

err = ctx.SetClusterSSH(topo, metadata.User, gOpt.SSHTimeout)
err = ctx.SetClusterSSH(topo, metadata.User, gOpt.SSHTimeout, gOpt.NativeSSH)
if err != nil {
return perrs.AddStack(err)
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func displayClusterTopology(clusterName string, opt *operator.Options) error {
return perrs.AddStack(err)
}

err = ctx.SetClusterSSH(topo, metadata.User, gOpt.SSHTimeout)
err = ctx.SetClusterSSH(topo, metadata.User, gOpt.SSHTimeout, gOpt.NativeSSH)
if err != nil {
return perrs.AddStack(err)
}
Expand Down
4 changes: 2 additions & 2 deletions components/cluster/command/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func newImportCmd() *cobra.Command {
}

// parse config and import nodes
if err = ansible.ParseAndImportInventory(ansibleDir, ansibleCfgFile, clsMeta, inv, gOpt.SSHTimeout); err != nil {
if err = ansible.ParseAndImportInventory(ansibleDir, ansibleCfgFile, clsMeta, inv, gOpt.SSHTimeout, gOpt.NativeSSH); err != nil {
return err
}

Expand All @@ -124,7 +124,7 @@ func newImportCmd() *cobra.Command {
}

// copy config files form deployment servers
if err = ansible.ImportConfig(clsName, clsMeta, gOpt.SSHTimeout); err != nil {
if err = ansible.ImportConfig(clsName, clsMeta, gOpt.SSHTimeout, gOpt.NativeSSH); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions components/cluster/command/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func buildReloadTask(
logDir := clusterutil.Abs(metadata.User, inst.LogDir())

// Download and copy the latest component to remote if the cluster is imported from Ansible
tb := task.NewBuilder().UserSSH(inst.GetHost(), inst.GetSSHPort(), metadata.User, gOpt.SSHTimeout)
tb := task.NewBuilder().UserSSH(inst.GetHost(), inst.GetSSHPort(), metadata.User, gOpt.SSHTimeout, gOpt.NativeSSH)
if inst.IsImported() {
switch compName := inst.ComponentName(); compName {
case spec.ComponentGrafana, spec.ComponentPrometheus, spec.ComponentAlertManager:
Expand Down Expand Up @@ -198,7 +198,7 @@ func refreshMonitoredConfigTask(
logDir := clusterutil.Abs(globalOptions.User, monitoredOptions.LogDir)
// Generate configs
t := task.NewBuilder().
UserSSH(host, info.ssh, globalOptions.User, gOpt.SSHTimeout).
UserSSH(host, info.ssh, globalOptions.User, gOpt.SSHTimeout, gOpt.NativeSSH).
MonitoredConfig(
clusterName,
comp,
Expand Down
11 changes: 11 additions & 0 deletions components/cluster/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func init() {
flags.ShowBacktrace = len(os.Getenv("TIUP_BACKTRACE")) > 0
cobra.EnableCommandSorting = false

nativeEnvVar := strings.ToLower(os.Getenv(localdata.EnvNameNativeSSHClient))
if nativeEnvVar == "true" || nativeEnvVar == "1" || nativeEnvVar == "enable" {
gOpt.NativeSSH = true
}

rootCmd = &cobra.Command{
Use: cliutil.OsArgs0(),
Short: "Deploy a TiDB cluster for production",
Expand Down Expand Up @@ -106,6 +111,11 @@ func init() {

teleCommand = getParentNames(cmd)

if gOpt.NativeSSH {
zap.L().Info("Native ssh client will be used",
zap.String(localdata.EnvNameNativeSSHClient, os.Getenv(localdata.EnvNameNativeSSHClient)))
}

return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
Expand All @@ -123,6 +133,7 @@ func init() {
// start/stop operations is 90s, the default value of this argument is better be longer than that
rootCmd.PersistentFlags().Int64Var(&gOpt.OptTimeout, "wait-timeout", 120, "Timeout in seconds to wait for an operation to complete, ignored for operations that don't fit.")
rootCmd.PersistentFlags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'")
rootCmd.PersistentFlags().BoolVar(&gOpt.NativeSSH, "native-ssh", gOpt.NativeSSH, "Use the native SSH client installed on local system instead of the build-in one.")

rootCmd.AddCommand(
newCheckCmd(),
Expand Down
3 changes: 2 additions & 1 deletion components/cluster/command/scale_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func buildScaleOutTask(
sshConnProps.IdentityFile,
sshConnProps.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.NativeSSH,
).
EnvInit(instance.GetHost(), metadata.User).
Mkdir(globalOptions.User, instance.GetHost(), dirs...).
Expand All @@ -246,7 +247,7 @@ func buildScaleOutTask(

// Deploy component
tb := task.NewBuilder().
UserSSH(inst.GetHost(), inst.GetSSHPort(), metadata.User, gOpt.SSHTimeout).
UserSSH(inst.GetHost(), inst.GetSSHPort(), metadata.User, gOpt.SSHTimeout, gOpt.NativeSSH).
Mkdir(metadata.User, inst.GetHost(),
deployDir, logDir,
filepath.Join(deployDir, "bin"),
Expand Down
3 changes: 2 additions & 1 deletion components/dm/command/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func deploy(clusterName, clusterVersion, topoFile string, opt deployOptions) err
sshConnProps.IdentityFile,
sshConnProps.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.NativeSSH,
).
EnvInit(inst.GetHost(), globalOptions.User).
Mkdir(globalOptions.User, inst.GetHost(), dirs...).
Expand All @@ -232,7 +233,7 @@ func deploy(clusterName, clusterVersion, topoFile string, opt deployOptions) err
logDir := clusterutil.Abs(globalOptions.User, inst.LogDir())
// Deploy component
t := task.NewBuilder().
UserSSH(inst.GetHost(), inst.GetSSHPort(), globalOptions.User, gOpt.SSHTimeout).
UserSSH(inst.GetHost(), inst.GetSSHPort(), globalOptions.User, gOpt.SSHTimeout, gOpt.NativeSSH).
Mkdir(globalOptions.User, inst.GetHost(),
deployDir, logDir,
filepath.Join(deployDir, "bin"),
Expand Down
31 changes: 2 additions & 29 deletions components/playground/instance/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,9 @@ import (
"github.com/pingcap/tiup/pkg/environment"
tiupexec "github.com/pingcap/tiup/pkg/exec"
"github.com/pingcap/tiup/pkg/repository/v0manifest"
"github.com/pingcap/tiup/pkg/utils"
)

// ErrorWaitTimeout is used to represent timeout of a command
// Example:
// _ = syscall.Kill(cmd.Process.Pid, syscall.SIGKILL)
// if err := WaitContext(context.WithTimeout(context.Background(), 3), cmd); err == ErrorWaitTimeout {
// // Do something
// }
var ErrorWaitTimeout = errors.New("wait command timeout")

// Process represent process to be run by playground
type Process interface {
Start() error
Expand All @@ -46,7 +39,7 @@ func (p *process) Start() error {

// Wait implements Instance interface.
func (p *process) Wait(ctx context.Context) error {
return WaitContext(ctx, p.cmd)
return utils.WaitContext(ctx, p.cmd)
}

// Pid implements Instance interface.
Expand Down Expand Up @@ -97,23 +90,3 @@ func NewComponentProcess(ctx context.Context, dir, binPath, component string, ve

return &process{cmd: cmd}, nil
}

// WaitContext wrap cmd.Wait with context
func WaitContext(ctx context.Context, cmd *exec.Cmd) error {
// We use cmd.Process.Wait instead of cmd.Wait because cmd.Wait is not reenterable
c := make(chan error, 1)
go func() {
if cmd == nil || cmd.Process == nil {
c <- nil
} else {
_, err := cmd.Process.Wait()
c <- err
}
}()
select {
case <-ctx.Done():
return ErrorWaitTimeout
case err := <-c:
return err
}
}
6 changes: 3 additions & 3 deletions components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tiup/components/playground/instance"
"github.com/pingcap/tiup/components/playground/utils"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/cluster/clusterutil"
"github.com/pingcap/tiup/pkg/environment"
"github.com/pingcap/tiup/pkg/localdata"
"github.com/pingcap/tiup/pkg/repository/v0manifest"
"github.com/pingcap/tiup/pkg/utils"
"golang.org/x/mod/semver"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -925,7 +925,7 @@ func (p *Playground) terminate(sig syscall.Signal, extraCmds ...*exec.Cmd) {
}
ctx, cancel := context.WithTimeout(context.Background(), killDeadline)
defer cancel()
if err := inst.Wait(ctx); err == instance.ErrorWaitTimeout {
if err := inst.Wait(ctx); err == utils.ErrorWaitTimeout {
_ = syscall.Kill(inst.Pid(), syscall.SIGKILL)
}
return nil
Expand All @@ -936,7 +936,7 @@ func (p *Playground) terminate(sig syscall.Signal, extraCmds ...*exec.Cmd) {
}
ctx, cancel := context.WithTimeout(context.Background(), killDeadline)
defer cancel()
if err := instance.WaitContext(ctx, cmd); err == instance.ErrorWaitTimeout {
if err := utils.WaitContext(ctx, cmd); err == utils.ErrorWaitTimeout {
_ = syscall.Kill(cmd.Process.Pid, syscall.SIGKILL)
}
}
Expand Down
41 changes: 0 additions & 41 deletions components/playground/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@
package utils

import (
"bufio"
"fmt"
"os"
"time"

"github.com/pingcap/errors"
)

// RetryOption is options for Retry()
Expand Down Expand Up @@ -82,40 +78,3 @@ func Retry(doFunc func() error, opts ...RetryOption) error {

return fmt.Errorf("operation exceeds the max retry attempts of %d", cfg.Attempts)
}

// TailN try get the latest n line of the file.
func TailN(fname string, n int) (lines []string, err error) {
file, err := os.Open(fname)
if err != nil {
return nil, errors.AddStack(err)
}
defer file.Close()

estimateLineSize := 1024

stat, err := os.Stat(fname)
if err != nil {
return nil, errors.AddStack(err)
}

start := int(stat.Size()) - n*estimateLineSize
if start < 0 {
start = 0
}

_, err = file.Seek(int64(start), 0 /*means relative to the origin of the file*/)
if err != nil {
return nil, errors.AddStack(err)
}

scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}

if len(lines) > n {
lines = lines[len(lines)-n:]
}

return
}
6 changes: 3 additions & 3 deletions pkg/cluster/ansible/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// ImportConfig copies config files from cluster which deployed through tidb-ansible
func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout int64) error {
func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout int64, nativeClient bool) error {
// there may be already cluster dir, skip create
//if err := os.MkdirAll(meta.ClusterPath(name), 0755); err != nil {
// return err
Expand All @@ -42,7 +42,7 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout int64) erro
SSHKeySet(
spec.ClusterPath(name, "ssh", "id_rsa"),
spec.ClusterPath(name, "ssh", "id_rsa.pub")).
UserSSH(inst.GetHost(), inst.GetSSHPort(), clsMeta.User, sshTimeout).
UserSSH(inst.GetHost(), inst.GetSSHPort(), clsMeta.User, sshTimeout, nativeClient).
CopyFile(filepath.Join(inst.DeployDir(), "conf", inst.ComponentName()+".toml"),
spec.ClusterPath(name,
spec.AnsibleImportedConfigPath,
Expand All @@ -59,7 +59,7 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout int64) erro
SSHKeySet(
spec.ClusterPath(name, "ssh", "id_rsa"),
spec.ClusterPath(name, "ssh", "id_rsa.pub")).
UserSSH(inst.GetHost(), inst.GetSSHPort(), clsMeta.User, sshTimeout).
UserSSH(inst.GetHost(), inst.GetSSHPort(), clsMeta.User, sshTimeout, nativeClient).
CopyFile(filepath.Join(inst.DeployDir(), "conf", inst.ComponentName()+".toml"),
spec.ClusterPath(name,
spec.AnsibleImportedConfigPath,
Expand Down
10 changes: 5 additions & 5 deletions pkg/cluster/ansible/dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
)

// parseDirs sets values of directories of component
func parseDirs(user string, ins spec.InstanceSpec, sshTimeout int64) (spec.InstanceSpec, error) {
func parseDirs(user string, ins spec.InstanceSpec, sshTimeout int64, nativeClient bool) (spec.InstanceSpec, error) {
hostName, sshPort := ins.SSH()

e := executor.NewSSHExecutor(executor.SSHConfig{
Expand All @@ -40,7 +40,7 @@ func parseDirs(user string, ins spec.InstanceSpec, sshTimeout int64) (spec.Insta
User: user,
KeyFile: SSHKeyPath(), // ansible generated keyfile
Timeout: time.Second * time.Duration(sshTimeout),
}, false) // not using global sudo
}, false /* not using global sudo */, nativeClient)
log.Debugf("Detecting deploy paths on %s...", hostName)

stdout, err := readStartScript(e, ins.Role(), hostName, ins.GetMainPort())
Expand Down Expand Up @@ -242,7 +242,7 @@ func parseDirs(user string, ins spec.InstanceSpec, sshTimeout int64) (spec.Insta
return ins, nil
}

func parseTiflashConfig(e *executor.SSHExecutor, spec *spec.TiFlashSpec, fname string) error {
func parseTiflashConfig(e executor.Executor, spec *spec.TiFlashSpec, fname string) error {
data, err := readFile(e, fname)
if err != nil {
return errors.AddStack(err)
Expand Down Expand Up @@ -275,7 +275,7 @@ func parseTiflashConfigFromFileData(spec *spec.TiFlashSpec, data []byte) error {
return nil
}

func readFile(e *executor.SSHExecutor, fname string) (data []byte, err error) {
func readFile(e executor.Executor, fname string) (data []byte, err error) {
cmd := fmt.Sprintf("cat %s", fname)
stdout, stderr, err := e.Execute(cmd, false)
if err != nil {
Expand All @@ -285,7 +285,7 @@ func readFile(e *executor.SSHExecutor, fname string) (data []byte, err error) {
return stdout, nil
}

func readStartScript(e *executor.SSHExecutor, component, host string, port int) (string, error) {
func readStartScript(e executor.Executor, component, host string, port int) (string, error) {
serviceFile := fmt.Sprintf("%s/%s-%d.service",
systemdUnitPath,
component,
Expand Down
Loading