Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set the maximum number of ssh connection to a ssh remote target to 15 #2357

Merged
merged 1 commit into from
Nov 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions core/os/device/remotessh/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,20 @@ import (
"github.com/google/gapid/core/os/device"
"github.com/google/gapid/core/os/shell"
"github.com/google/gapid/core/text"
"golang.org/x/crypto/ssh"
)

// remoteProcess is the interface to a running process, as started by a Target.
type remoteProcess struct {
session *ssh.Session
wg sync.WaitGroup
session *pooledSession
}

func (r *remoteProcess) Kill() error {
return r.session.Signal(ssh.SIGSEGV)
return r.session.kill()
}

func (r *remoteProcess) Wait(ctx context.Context) error {
ret := r.session.Wait()
ret := r.session.wait()
r.wg.Wait()
return ret
}
Expand All @@ -57,17 +56,17 @@ type sshShellTarget struct{ b *binding }

// Start starts the given command in the remote shell.
func (t sshShellTarget) Start(cmd shell.Cmd) (shell.Process, error) {
session, err := t.b.connection.NewSession()
pooled, err := t.b.newPooledSession()
if err != nil {
return nil, err
}
p := &remoteProcess{
session: session,
session: pooled,
wg: sync.WaitGroup{},
}

if cmd.Stdin != nil {
stdin, err := session.StdinPipe()
stdin, err := pooled.session.StdinPipe()
if err != nil {
return nil, err
}
Expand All @@ -78,7 +77,7 @@ func (t sshShellTarget) Start(cmd shell.Cmd) (shell.Process, error) {
}

if cmd.Stdout != nil {
stdout, err := session.StdoutPipe()
stdout, err := pooled.session.StdoutPipe()
if err != nil {
return nil, err
}
Expand All @@ -90,7 +89,7 @@ func (t sshShellTarget) Start(cmd shell.Cmd) (shell.Process, error) {
}

if cmd.Stderr != nil {
stderr, err := session.StderrPipe()
stderr, err := pooled.session.StderrPipe()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -121,7 +120,7 @@ func (t sshShellTarget) Start(cmd shell.Cmd) (shell.Process, error) {
}

val := prefix + cmd.Name + " " + strings.Join(cmd.Args, " ")
if err := session.Start(val); err != nil {
if err := pooled.session.Start(val); err != nil {
return nil, err
}

Expand Down
74 changes: 62 additions & 12 deletions core/os/device/remotessh/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type Device interface {
WriteFile(ctx context.Context, contents io.Reader, mode os.FileMode, destPath string) error
}

// MaxNumberOfSSHConnections defines the max number of ssh connections to each
// ssh remote device that can be used to run commands concurrently.
const MaxNumberOfSSHConnections = 15

// binding represents an attached SSH client.
type binding struct {
bind.Simple
Expand All @@ -59,6 +63,63 @@ type binding struct {
// We duplicate OS here because we need to use it
// before we get the rest of the information
os device.OSKind

// pool to limit the maximum number of connections
ch chan int
}

type pooledSession struct {
ch chan int
session *ssh.Session
}

func (p *pooledSession) kill() error {
select {
case <-p.ch:
default:
}
<-p.ch
return p.session.Signal(ssh.SIGSEGV)
}

func (p *pooledSession) wait() error {
ret := p.session.Wait()
select {
case <-p.ch:
default:
}
return ret
}

func newBinding(conn *ssh.Client, conf *Configuration, env *shell.Env) *binding {
b := &binding{
connection: conn,
configuration: conf,
env: env,
ch: make(chan int, MaxNumberOfSSHConnections),
Simple: bind.Simple{
To: &device.Instance{
Serial: "",
Configuration: &device.Configuration{},
},
LastStatus: bind.Status_Online,
},
}
return b
}

func (b *binding) newPooledSession() (*pooledSession, error) {
b.ch <- int(0)
session, err := b.connection.NewSession()
if err != nil {
<-b.ch
err = fmt.Errorf("New SSH Session Error: %v, Current maximum number of ssh connections GAPID can issue to each remote device is: %v", err, MaxNumberOfSSHConnections)
return nil, err
}
return &pooledSession{
ch: b.ch,
session: session,
}, nil
}

var _ Device = &binding{}
Expand Down Expand Up @@ -145,18 +206,7 @@ func GetConnectedDevice(ctx context.Context, c Configuration) (Device, error) {
env.Add(e)
}

b := &binding{
connection: connection,
configuration: &c,
env: env,
Simple: bind.Simple{
To: &device.Instance{
Serial: "",
Configuration: &device.Configuration{},
},
LastStatus: bind.Status_Online,
},
}
b := newBinding(connection, &c, env)

kind := device.UnknownOS

Expand Down