From b6a3222708162887e877c27125b7d3839fbb1653 Mon Sep 17 00:00:00 2001 From: Adam Babik Date: Tue, 16 Jan 2024 19:59:08 +0100 Subject: [PATCH] Run commands natively in service when interactive=false --- .../api/runme/runner/v2alpha1/config.proto | 1 + internal/command/command_native.go | 23 +++- internal/command/command_native_test.go | 2 +- internal/command/command_unix.go | 4 + internal/command/command_virtual.go | 4 +- internal/command/command_virtual_test.go | 5 +- internal/command/command_windows.go | 27 +++++ internal/runnerv2service/execution.go | 103 +++++++++++++----- internal/runnerv2service/service_execute.go | 10 +- internal/runnerv2service/service_test.go | 48 +++++++- 10 files changed, 183 insertions(+), 44 deletions(-) create mode 100644 internal/command/command_windows.go diff --git a/internal/api/runme/runner/v2alpha1/config.proto b/internal/api/runme/runner/v2alpha1/config.proto index bdea71f27..fba0f8953 100644 --- a/internal/api/runme/runner/v2alpha1/config.proto +++ b/internal/api/runme/runner/v2alpha1/config.proto @@ -40,6 +40,7 @@ message ProgramConfig { } // interactive, if true, uses a pseudo-tty to execute the program. + // Otherwise, the program is executed using in-memory buffers for I/O. bool interactive = 7; // TODO(adamb): understand motivation for this. In theory, source diff --git a/internal/command/command_native.go b/internal/command/command_native.go index ace384451..14850918a 100644 --- a/internal/command/command_native.go +++ b/internal/command/command_native.go @@ -10,8 +10,8 @@ import ( "go.uber.org/zap" ) -// signalToProcessGroup is used in tests to disable sending signals to a process group. -var signalToProcessGroup = true +// SignalToProcessGroup is used in tests to disable sending signals to a process group. +var SignalToProcessGroup = true type NativeCommand struct { cfg *Config @@ -33,6 +33,21 @@ func newNativeCommand(cfg *Config, opts *NativeCommandOptions) *NativeCommand { } } +func (c *NativeCommand) Running() bool { + return c.cmd != nil && c.cmd.ProcessState == nil +} + +func (c *NativeCommand) Pid() int { + if c.cmd == nil || c.cmd.Process == nil { + return 0 + } + return c.cmd.Process.Pid +} + +func (c *NativeCommand) SetWinsize(rows, cols, x, y uint16) error { + return errors.New("unsupported") +} + func (c *NativeCommand) Start(ctx context.Context) (err error) { argsNormalizer := &argsNormalizer{ session: c.opts.Session, @@ -54,7 +69,7 @@ func (c *NativeCommand) Start(ctx context.Context) (err error) { if f, ok := stdin.(*os.File); ok && f != nil { // Duplicate /dev/stdin. - newStdinFd, err := syscall.Dup(int(f.Fd())) + newStdinFd, err := dup(int(f.Fd())) if err != nil { return errors.Wrap(err, "failed to dup stdin") } @@ -102,7 +117,7 @@ func (c *NativeCommand) Start(ctx context.Context) (err error) { } func (c *NativeCommand) StopWithSignal(sig os.Signal) error { - if signalToProcessGroup { + if SignalToProcessGroup { // Try to terminate the whole process group. If it fails, fall back to stdlib methods. err := signalPgid(c.cmd.Process.Pid, sig) if err == nil { diff --git a/internal/command/command_native_test.go b/internal/command/command_native_test.go index cd5f308e3..e03a42306 100644 --- a/internal/command/command_native_test.go +++ b/internal/command/command_native_test.go @@ -14,7 +14,7 @@ import ( func init() { // Set to false to disable sending signals to process groups in tests. // This can be turned on if setSysProcAttrPgid() is called in Start(). - signalToProcessGroup = false + SignalToProcessGroup = false } func TestNativeCommand(t *testing.T) { diff --git a/internal/command/command_unix.go b/internal/command/command_unix.go index 89adf8e32..4da20d0b0 100644 --- a/internal/command/command_unix.go +++ b/internal/command/command_unix.go @@ -30,6 +30,10 @@ func disableEcho(fd uintptr) error { return errors.Wrap(err, "failed to set tty attr") } +func dup(fd int) (int, error) { + return syscall.Dup(fd) +} + // func setSysProcAttrPgid(cmd *exec.Cmd) { // if cmd.SysProcAttr == nil { // cmd.SysProcAttr = &syscall.SysProcAttr{} diff --git a/internal/command/command_virtual.go b/internal/command/command_virtual.go index d3ce1299e..b2f786048 100644 --- a/internal/command/command_virtual.go +++ b/internal/command/command_virtual.go @@ -94,11 +94,11 @@ func newVirtualCommand(cfg *Config, opts *VirtualCommandOptions) *VirtualCommand } } -func (c *VirtualCommand) IsRunning() bool { +func (c *VirtualCommand) Running() bool { return c.cmd != nil && c.cmd.ProcessState == nil } -func (c *VirtualCommand) PID() int { +func (c *VirtualCommand) Pid() int { if c.cmd == nil || c.cmd.Process == nil { return 0 } diff --git a/internal/command/command_virtual_test.go b/internal/command/command_virtual_test.go index 21c72eff5..4217e0ee6 100644 --- a/internal/command/command_virtual_test.go +++ b/internal/command/command_virtual_test.go @@ -61,8 +61,9 @@ func TestVirtualCommand(t *testing.T) { }, nil) require.NoError(t, err) require.NoError(t, cmd.Start(context.Background())) - require.True(t, cmd.IsRunning()) - require.Greater(t, cmd.PID(), 1) + + require.True(t, cmd.Running()) + require.Greater(t, cmd.Pid(), 1) require.NoError(t, cmd.Wait()) }) diff --git a/internal/command/command_windows.go b/internal/command/command_windows.go new file mode 100644 index 000000000..8dd6be163 --- /dev/null +++ b/internal/command/command_windows.go @@ -0,0 +1,27 @@ +//go:build windows + +package runner + +import ( + "os" + "os/exec" + + "github.com/pkg/errors" +) + +func setSysProcAttrCtty(cmd *exec.Cmd) {} + +func setSysProcAttrPgid(cmd *exec.Cmd) {} + +func dup(fd int) (int, error) { + return fd, nil +} + +func disableEcho(fd uintptr) error { + return errors.New("Error: Environment not supported! " + + "Runme currently doesn't support PowerShell. " + + "Please go to https://github.com/stateful/runme/issues/173 to follow progress on this " + + "and join our Discord server at https://discord.gg/runme if you have further questions!") +} + +func signalPgid(pid int, sig os.Signal) error { return errors.New("unsupported") } diff --git a/internal/runnerv2service/execution.go b/internal/runnerv2service/execution.go index ec912df76..29b53235c 100644 --- a/internal/runnerv2service/execution.go +++ b/internal/runnerv2service/execution.go @@ -29,38 +29,59 @@ const ( msgBufferSize = 2048 << 10 // 2 MiB ) +type commandIface interface { + Pid() int + Running() bool + SetWinsize(uint16, uint16, uint16, uint16) error + Start(context.Context) error + StopWithSignal(os.Signal) error + Wait() error +} + type execution struct { ID string - Cmd *command.VirtualCommand + Cmd commandIface stdin io.Reader stdinWriter io.WriteCloser stdout *rbuffer.RingBuffer + stderr *rbuffer.RingBuffer logger *zap.Logger } func newExecution(id string, cfg *command.Config, logger *zap.Logger) (*execution, error) { + stdin, stdinWriter := io.Pipe() + stdout := rbuffer.NewRingBuffer(ringBufferSize) + stderr := rbuffer.NewRingBuffer(ringBufferSize) + var ( - stdin io.Reader - stdinWriter io.WriteCloser + cmd commandIface + err error ) if cfg.Interactive { - stdin, stdinWriter = io.Pipe() + cmd, err = command.NewVirtual( + cfg, + &command.VirtualCommandOptions{ + Stdin: stdin, + Stdout: stdout, + Logger: logger, + }, + ) + } else { + cmd, err = command.NewNative( + cfg, + &command.NativeCommandOptions{ + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Logger: logger, + }, + ) } - stdout := rbuffer.NewRingBuffer(ringBufferSize) - - cmd, err := command.NewVirtual( - cfg, - &command.VirtualCommandOptions{ - Stdin: stdin, - Stdout: stdout, - Logger: logger, - }, - ) if err != nil { return nil, err } @@ -72,6 +93,7 @@ func newExecution(id string, cfg *command.Config, logger *zap.Logger) (*executio stdin: stdin, stdinWriter: stdinWriter, stdout: stdout, + stderr: stderr, logger: logger, } @@ -84,9 +106,13 @@ func (e *execution) Start(ctx context.Context) error { } func (e *execution) Wait(ctx context.Context, sender sender) (int, error) { - errc := make(chan error, 1) + errc := make(chan error, 2) + go func() { - errc <- readSendLoop(e.stdout, sender) + errc <- readSendLoop(e.stdout, sender, func(b []byte) *runnerv2alpha1.ExecuteResponse { return &runnerv2alpha1.ExecuteResponse{StdoutData: b} }) + }() + go func() { + errc <- readSendLoop(e.stderr, sender, func(b []byte) *runnerv2alpha1.ExecuteResponse { return &runnerv2alpha1.ExecuteResponse{StderrData: b} }) }() waitErr := e.Cmd.Wait() @@ -96,9 +122,17 @@ func (e *execution) Wait(ctx context.Context, sender sender) (int, error) { // If waitErr is not nil, only log the errors but return waitErr. if waitErr != nil { + handlerErrors := 0 + + readSendHandlerForWaitErr: select { case err := <-errc: + handlerErrors++ e.logger.Info("readSendLoop finished; ignoring any errors because there was a wait error", zap.Error(err)) + // Wait for both errors, or nils. + if handlerErrors < 2 { + goto readSendHandlerForWaitErr + } case <-ctx.Done(): e.logger.Info("context canceled while waiting for the readSendLoop finish; ignoring any errors because there was a wait error") } @@ -108,8 +142,14 @@ func (e *execution) Wait(ctx context.Context, sender sender) (int, error) { // If waitErr is nil, wait for the readSendLoop to finish, // or the context being canceled. select { - case err := <-errc: - return exitCode, err + case err1 := <-errc: + // Wait for both errors, or nils. + select { + case err2 := <-errc: + e.logger.Info("another error from readSendLoop; won't be returned", zap.Error(err2)) + case <-ctx.Done(): + } + return exitCode, err1 case <-ctx.Done(): return exitCode, ctx.Err() } @@ -124,23 +164,32 @@ func (e *execution) SetWinsize(size *runnerv2alpha1.Winsize) error { return e.Cmd.SetWinsize(uint16(size.Cols), uint16(size.Rows), uint16(size.X), uint16(size.Y)) } -func (e *execution) closeIO() { - var err error - - if e.stdinWriter != nil { - err = e.stdinWriter.Close() - e.logger.Debug("closed stdin writer", zap.Error(err)) +func (e *execution) PostInitialRequest() { + // Close stdin writer for native commands after handling the initial request. + // Native commands do not support sending data continously. + if _, ok := e.Cmd.(*command.NativeCommand); ok { + if err := e.stdinWriter.Close(); err != nil { + e.logger.Info("failed to close stdin writer", zap.Error(err)) + } } +} + +func (e *execution) closeIO() { + err := e.stdinWriter.Close() + e.logger.Info("closed stdin writer", zap.Error(err)) err = e.stdout.Close() - e.logger.Debug("closed stdout writer", zap.Error(err)) + e.logger.Info("closed stdout writer", zap.Error(err)) + + err = e.stderr.Close() + e.logger.Info("closed stderr writer", zap.Error(err)) } type sender interface { Send(*runnerv2alpha1.ExecuteResponse) error } -func readSendLoop(reader io.Reader, sender sender) error { +func readSendLoop(reader io.Reader, sender sender, fn func([]byte) *runnerv2alpha1.ExecuteResponse) error { limitedReader := io.LimitReader(reader, msgBufferSize) for { @@ -156,7 +205,7 @@ func readSendLoop(reader io.Reader, sender sender) error { continue } - err = sender.Send(&runnerv2alpha1.ExecuteResponse{StdoutData: buf[:n]}) + err = sender.Send(fn(buf[:n])) if err != nil { return errors.WithStack(err) } diff --git a/internal/runnerv2service/service_execute.go b/internal/runnerv2service/service_execute.go index b3f52b02c..c5aa2a42e 100644 --- a/internal/runnerv2service/service_execute.go +++ b/internal/runnerv2service/service_execute.go @@ -44,7 +44,7 @@ func (r *runnerService) Execute(srv runnerv2alpha1.RunnerService_ExecuteServer) } if err := srv.Send(&runnerv2alpha1.ExecuteResponse{ Pid: &runnerv2alpha1.ProcessPID{ - Pid: int64(exec.Cmd.PID()), + Pid: int64(exec.Cmd.Pid()), }, }); err != nil { return err @@ -57,8 +57,8 @@ func (r *runnerService) Execute(srv runnerv2alpha1.RunnerService_ExecuteServer) for { var err error - if l := len(req.InputData); l > 0 { - logger.Info("received input data", zap.Int("len", l)) + if req.InputData != nil { + logger.Info("received input data", zap.Int("len", len(req.InputData))) _, err = exec.Write(req.InputData) if err != nil { logger.Info("failed to write to stdin; ignoring", zap.Error(err)) @@ -87,6 +87,8 @@ func (r *runnerService) Execute(srv runnerv2alpha1.RunnerService_ExecuteServer) return } + exec.PostInitialRequest() + req, err = srv.Recv() logger.Info("received request", zap.Any("req", req), zap.Error(err)) switch { @@ -99,7 +101,7 @@ func (r *runnerService) Execute(srv runnerv2alpha1.RunnerService_ExecuteServer) } return case status.Convert(err).Code() == codes.Canceled || status.Convert(err).Code() == codes.DeadlineExceeded: - if !exec.Cmd.IsRunning() { + if !exec.Cmd.Running() { logger.Info("stream canceled after the process finished; ignoring") } else { logger.Info("stream canceled while the process is still running; program will be stopped if non-background") diff --git a/internal/runnerv2service/service_test.go b/internal/runnerv2service/service_test.go index 7b2bb6325..405133538 100644 --- a/internal/runnerv2service/service_test.go +++ b/internal/runnerv2service/service_test.go @@ -16,9 +16,16 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" + "github.com/stateful/runme/internal/command" runnerv2alpha1 "github.com/stateful/runme/internal/gen/proto/go/runme/runner/v2alpha1" ) +func init() { + // Set to false to disable sending signals to process groups in tests. + // This can be turned on if setSysProcAttrPgid() is called in Start(). + command.SignalToProcessGroup = false +} + func TestRunnerServiceServerExecute(t *testing.T) { lis, stop := testStartRunnerServiceServer(t) t.Cleanup(stop) @@ -37,12 +44,27 @@ func TestRunnerServiceServerExecute(t *testing.T) { Source: &runnerv2alpha1.ProgramConfig_Commands{ Commands: &runnerv2alpha1.ProgramConfig_CommandList{ Items: []string{ - "echo -n test", + "echo test", }, }, }, }, - expectedOutput: "test", + expectedOutput: "test\n", + }, + { + name: "BasicInteractive", + programConfig: &runnerv2alpha1.ProgramConfig{ + ProgramName: "bash", + Source: &runnerv2alpha1.ProgramConfig_Commands{ + Commands: &runnerv2alpha1.ProgramConfig_CommandList{ + Items: []string{ + "echo test", + }, + }, + }, + Interactive: true, + }, + expectedOutput: "test\r\n", }, { name: "BasicSleep", @@ -58,10 +80,27 @@ func TestRunnerServiceServerExecute(t *testing.T) { }, }, }, - expectedOutput: "1\r\n2\r\n", + expectedOutput: "1\n2\n", }, { name: "BasicInput", + programConfig: &runnerv2alpha1.ProgramConfig{ + ProgramName: "bash", + Source: &runnerv2alpha1.ProgramConfig_Commands{ + Commands: &runnerv2alpha1.ProgramConfig_CommandList{ + Items: []string{ + "read name", + "echo \"My name is $name\"", + }, + }, + }, + Interactive: false, + }, + inputData: []byte("Frank\n"), + expectedOutput: "My name is Frank\n", + }, + { + name: "BasicInputInteractive", programConfig: &runnerv2alpha1.ProgramConfig{ ProgramName: "bash", Source: &runnerv2alpha1.ProgramConfig_Commands{ @@ -84,6 +123,7 @@ func TestRunnerServiceServerExecute(t *testing.T) { Source: &runnerv2alpha1.ProgramConfig_Script{ Script: "print('test')", }, + Interactive: true, }, expectedOutput: "test\r\n", }, @@ -95,7 +135,7 @@ func TestRunnerServiceServerExecute(t *testing.T) { Script: "console.log('1');\nconsole.log('2');", }, }, - expectedOutput: "1\r\n2\r\n", + expectedOutput: "1\n2\n", }, }