Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ bin/kind: curl-installed bin
deps: bin bin/jq bin/golangci-lint bin/gofumpt bin/kind

test: go-installed docker-installed bin/kind
# ./hack/run_tests.sh
echo "Skip go tests!!!"
./hack/run_tests.sh
echo "Run go tests!"
$(MAKE) clean/test

lint: bin/golangci-lint
Expand Down
3 changes: 2 additions & 1 deletion hack/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ source "$(pwd)/hack/utils.sh"

check_all_deps
check_go
pull_image

run_tests=""

Expand Down Expand Up @@ -80,7 +81,7 @@ function run_tests_in_dir() {

echo "Run tests in $full_pkg_path"
cd "$full_pkg_path"
if ! echo "test -v -p 1 $run_tests" | xargs go; then
if ! echo "test -timeout 30m -v -p 1 $run_tests" | xargs go; then
all_failed_tests="$(echo -e "${all_failed_tests}\nTests in ${p} failed")"
fi
done <<< "$packages"
Expand Down
6 changes: 5 additions & 1 deletion hack/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,8 @@ function check_kind() {

function check_all_deps() {
check_docker && check_kind
}
}

function pull_image() {
docker pull lscr.io/linuxserver/openssh-server:10.0_p1-r9-ls209
}
3 changes: 2 additions & 1 deletion pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ package provider
import (
"context"

"github.com/hashicorp/go-multierror"

connection "github.com/deckhouse/lib-connection/pkg"
"github.com/deckhouse/lib-connection/pkg/settings"
sshconfig "github.com/deckhouse/lib-connection/pkg/ssh/config"
"github.com/hashicorp/go-multierror"
)

type DefaultProvider struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/provider/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,12 @@ func (p *DefaultSSHProvider) newSession(parent *session.Session, privateKeys []s
}

func (p *DefaultSSHProvider) useGoSSH(shouldLog bool) bool {
logDebug := func(format string, v ...any) {
logDebug := func(format string) {
if !shouldLog {
return
}

p.debug(format, v...)
p.debug(format)
}

if p.options.ForceGoSSH {
Expand Down
4 changes: 2 additions & 2 deletions pkg/provider/ssh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func TestSSHProviderClient(t *testing.T) {
assertSwitchClient(t, params, defaultClient)
}

getProvider := func(sett settings.Settings, config *sshconfig.ConnectionConfig, opts ...SSHClientOption) *DefaultSSHProvider {
getProvider := func(sett settings.Settings, config *sshconfig.ConnectionConfig) *DefaultSSHProvider {
provider := NewDefaultSSHProvider(sett, config)
provider.goSSHStopWait = 3 * time.Second
return provider
Expand Down Expand Up @@ -994,7 +994,7 @@ func assertClientStopped(t *testing.T, client connection.SSHClient, shouldStop b

func assertSwitchClient(t *testing.T, params assertSwitchClientParams, defaultClient connection.SSHClient) connection.SSHClient {
switchClientSession := defaultSession(params.host, params.port)
var privateKeys []session.AgentPrivateKey
privateKeys := make([]session.AgentPrivateKey, 0, len(params.additionalPrivateKeys))
for _, key := range params.additionalPrivateKeys {
privateKeys = append(privateKeys, session.AgentPrivateKey{
Key: key.Key,
Expand Down
13 changes: 5 additions & 8 deletions pkg/ssh/clissh/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ func (a *Agent) Start() error {

logger := a.sshSettings.Logger()

logger.DebugLn("agent: start ssh-agent")
logger.DebugF("agent: start ssh-agent")
err := a.agent.Start()
if err != nil {
return fmt.Errorf("Start ssh-agent: %v", err)
}

logger.DebugLn("agent: run ssh-add for keys")
logger.DebugF("agent: run ssh-add for keys")
err = a.AddKeys(a.agentSettings.PrivateKeys)
if err != nil {
return fmt.Errorf("Agent error: %v", err)
Expand All @@ -128,7 +128,7 @@ func (a *Agent) AddKeys(keys []session.AgentPrivateKey) error {
logger := a.sshSettings.Logger()

if a.sshSettings.IsDebug() {
logger.DebugLn("list added keys")
logger.DebugF("list added keys")
listCmd := cmd.NewSSHAdd(a.sshSettings, a.agentSettings).ListCmd()

output, err := listCmd.CombinedOutput()
Expand Down Expand Up @@ -163,11 +163,8 @@ func (a *Agent) addKeys(authSock string, keys []session.AgentPrivateKey) error {

agentClient := agent.NewClient(conn)

for i, key := range keys {
privateKey, _, err := utils.ParseSSHPrivateKey(
[]byte(key.Key),
fmt.Sprintf("index %d", i),
utils.NewDefaultPassphraseOnlyConsumer(key.Passphrase))
for _, key := range keys {
privateKey, _, err := utils.ParseSSHPrivateKeyFile(key.Key, key.Passphrase, a.sshSettings.Logger())
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ssh/clissh/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func (s *Client) Session() *session.Session {
return s.SessionSettings
}

func (s *Client) Settings() settings.Settings {
return s.settings
}

func (s *Client) PrivateKeys() []session.AgentPrivateKey {
return s.privateKeys
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ssh/clissh/cmd/scp.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ func (s *SCP) SCP(ctx context.Context) *SCP {
dstPath,
}...)

scpArgs := append(sshPathArgs, args...)
scpArgs := sshPathArgs
scpArgs = append(scpArgs, args...)
s.scpCmd = exec.CommandContext(ctx, "scp", scpArgs...)
s.scpCmd.Env = env
// scpCmd.Stdout = os.Stdout
Expand Down
2 changes: 1 addition & 1 deletion pkg/ssh/clissh/cmd/ssh-add.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (s *SSHAdd) AddKeys(keys []string) error {
}

if s.settings.IsDebug() {
logger.DebugLn("list added keys")
logger.DebugF("List added keys")
env := []string{
s.AgentSettings.AuthSockEnv(),
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ssh/clissh/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,20 @@ func (c *Command) Sudo(ctx context.Context) {
}
if !passSent {
// send pass through stdin
logger.DebugLn("Send become pass to cmd")
logger.DebugF("Send become pass to cmd")
_, _ = c.Executor.Stdin.Write([]byte(becomePass + "\n"))
passSent = true
} else {
// Second prompt is error!
logger.ErrorLn("Bad sudo password")
logger.ErrorF("Bad sudo password")
// sending wrong password again will raise an error in process.Run()
_, _ = c.Executor.Stdin.Write([]byte(becomePass + "\n"))
// os.Exit(1)
}
return "reset"
}
if pattern == "SUDO-SUCCESS" {
logger.DebugLn("Got SUCCESS")
logger.DebugF("Got SUCCESS")
if c.onCommandStart != nil {
c.onCommandStart()
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/ssh/clissh/kube-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewKubeProxy(sett settings.Settings, sess *session.Session) *KubeProxy {
}
}

func (k *KubeProxy) Start(useLocalPort int) (port string, err error) {
func (k *KubeProxy) Start(useLocalPort int) (string, error) {
startID := rand.Int()

logger := k.settings.Logger()
Expand Down Expand Up @@ -114,13 +114,12 @@ func (k *KubeProxy) StopAll() {
}

func (k *KubeProxy) Stop(startID int) {
logger := k.settings.Logger()

if k == nil {
logger.DebugF("[%d] Stop kube-proxy: kube proxy object is nil. Skip.\n", startID)
return
}

logger := k.settings.Logger()

if k.stop {
logger.DebugF("[%d] Stop kube-proxy: kube proxy already stopped. Skip.\n", startID)
return
Expand Down Expand Up @@ -244,7 +243,7 @@ func (k *KubeProxy) upTunnel(
useLocalPort int,
tunnelErrorCh chan error,
startID int,
) (tun *Tunnel, localPort int, err error) {
) (*Tunnel, int, error) {
logger := k.settings.Logger()

logger.DebugF(
Expand All @@ -255,7 +254,7 @@ func (k *KubeProxy) upTunnel(
)

rewriteLocalPort := false
localPort = useLocalPort
localPort := useLocalPort

if useLocalPort < 1 {
logger.DebugF(
Expand All @@ -271,6 +270,7 @@ func (k *KubeProxy) upTunnel(
maxRetries := 5
retries := 0
var lastError error
var tun *Tunnel
for {
logger.DebugF("[%d] Start %d iteration for up tunnel\n", startID, retries)

Expand Down Expand Up @@ -327,13 +327,13 @@ func (k *KubeProxy) upTunnel(
func (k *KubeProxy) runKubeProxy(
waitCh chan error,
startID int,
) (proxy *Command, port string, err error) {
) (*Command, string, error) {
logger := k.settings.Logger()

logger.DebugF("[%d] Begin starting proxy\n", startID)
proxy = k.proxyCMD(startID)
proxy := k.proxyCMD(startID)

port = ""
port := ""
portReady := make(chan struct{}, 1)
portRe := regexp.MustCompile(`Starting to serve on .*?:(\d+)`)

Expand All @@ -358,7 +358,7 @@ func (k *KubeProxy) runKubeProxy(
})

logger.DebugF("[%d] Start proxy command\n", startID)
err = proxy.Start()
err := proxy.Start()
if err != nil {
logger.DebugF("[%d] Start proxy command error: %v\n", startID, err)
return nil, "", fmt.Errorf("start kubectl proxy: %w", err)
Expand Down
19 changes: 10 additions & 9 deletions pkg/ssh/clissh/process/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (e *Executor) StderrBytes() []byte {
return nil
}

func (e *Executor) SetupStreamHandlers() (err error) {
func (e *Executor) SetupStreamHandlers() error {
// stderr goes to console (commented because ssh writes only "Connection closed" messages to stderr)
// e.Cmd.Stderr = os.Stderr
// connect console's stdin
Expand All @@ -239,9 +239,10 @@ func (e *Executor) SetupStreamHandlers() (err error) {
// setup stdout stream handlers
if e.Live && e.StdoutBuffer == nil && e.StdoutHandler == nil && len(e.Matchers) == 0 {
e.cmd.Stdout = os.Stdout
return
return nil
}

var err error
var stdoutReadPipe *os.File
var stdoutHandlerWritePipe *os.File
var stdoutHandlerReadPipe *os.File
Expand Down Expand Up @@ -328,8 +329,8 @@ func (e *Executor) SetupStreamHandlers() (err error) {
return
}

logger.DebugLn("Start reading from stderr pipe")
defer logger.DebugLn("Stop reading from stderr pipe")
logger.DebugF("Start reading from stderr pipe")
defer logger.DebugF("Stop reading from stderr pipe")

buf := make([]byte, 16)
for {
Expand Down Expand Up @@ -366,7 +367,7 @@ func (e *Executor) SetupStreamHandlers() (err error) {
func (e *Executor) readFromStreams(stdoutReadPipe io.Reader, stdoutHandlerWritePipe io.Writer) {
logger := e.settings.Logger()

defer logger.DebugLn("stop readFromStreams")
defer logger.DebugF("stop readFromStreams")

if stdoutReadPipe == nil || reflect.ValueOf(stdoutReadPipe).IsNil() {
return
Expand All @@ -375,7 +376,7 @@ func (e *Executor) readFromStreams(stdoutReadPipe io.Reader, stdoutHandlerWriteP
logger.DebugF("Start read from streams for command: ", e.cmd.String())

buf := make([]byte, 16)
matchersDone := false
var matchersDone bool
if len(e.Matchers) == 0 {
matchersDone = true
}
Expand Down Expand Up @@ -429,7 +430,7 @@ func (e *Executor) readFromStreams(stdoutReadPipe io.Reader, stdoutHandlerWriteP
}

if err == io.EOF {
logger.DebugLn("readFromStreams: EOF")
logger.DebugF("readFromStreams: EOF")
break
}
}
Expand Down Expand Up @@ -536,8 +537,8 @@ func (e *Executor) ProcessWait() {
func (e *Executor) closePipes() {
logger := e.settings.Logger()

logger.DebugLn("Starting close piped")
defer logger.DebugLn("Stop close piped")
logger.DebugF("Starting close piped")
defer logger.DebugF("Stop close piped")

e.pipesMutex.Lock()
defer e.pipesMutex.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ssh/clissh/process/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *Session) Stop() {
s.Stop()
}(stopable)
}
//log.DebugF("Wait while %d processes stops\n", count)
// log.DebugF("Wait while %d processes stops\n", count)
wg.Wait()
}

Expand Down
9 changes: 4 additions & 5 deletions pkg/ssh/clissh/reverse-tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import (
"sync"
"time"

"github.com/deckhouse/lib-dhctl/pkg/retry"

connection "github.com/deckhouse/lib-connection/pkg"
"github.com/deckhouse/lib-connection/pkg/settings"
"github.com/deckhouse/lib-connection/pkg/ssh/clissh/cmd"
"github.com/deckhouse/lib-connection/pkg/ssh/session"
"github.com/deckhouse/lib-dhctl/pkg/retry"
)

type tunnelWaitResult struct {
Expand Down Expand Up @@ -146,7 +147,6 @@ func (t *ReverseTunnel) StartHealthMonitor(ctx context.Context, checker connecti
logger := t.settings.Logger()

checkReverseTunnel := func(id int) bool {

logger.DebugF("[%d] Start Check reverse tunnel\n", id)

err := retry.NewSilentLoop("Check reverse tunnel", 2, 2*time.Second).RunContext(ctx, func() error {
Expand All @@ -169,7 +169,7 @@ func (t *ReverseTunnel) StartHealthMonitor(ctx context.Context, checker connecti
}

go func() {
logger.DebugLn("Start health monitor")
logger.DebugF("Start health monitor")
// we need chan for restarting because between restarting we can get stop signal
restartCh := make(chan int, 1024)
id := -1
Expand All @@ -180,14 +180,13 @@ func (t *ReverseTunnel) StartHealthMonitor(ctx context.Context, checker connecti
logger.DebugF("[%d] Signal was sent. Chan len: %d\n", id, len(restartCh))
}
for {

if !checkReverseTunnel(id) {
go restart(id)
}

select {
case <-t.stopCh:
logger.DebugLn("Stop health monitor")
logger.DebugF("Stop health monitor")
return
case oldId := <-restartCh:
restartsCount++
Expand Down
Loading