Skip to content

Commit

Permalink
feat: handle context cancelation during docker exec
Browse files Browse the repository at this point in the history
To allow interrupting docker exec (which could be long running)
we process the log output in a go routine and handle
context cancelation as well as command result.

In case of context cancelation a CTRL+C is written into the docker
container. This should be enough to terminate the running
command.

To make sure we do not get stuck during cleanup, we do
set the cleanup contexts with a timeout of 5 minutes

Co-authored-by: Björn Brauer <bjoern.brauer@new-work.se>
Co-authored-by: Philipp Hinrichsen <philipp.hinrichsen@new-work.se>

feat: handle SIGTERM signal and abort run

test: on context cancel, abort running command

This test makes sure that whenever the act Context was canceled, the
currently running docker exec is sent a 0x03 (ctrl+c).

Co-authored-by: Björn Brauer <zaubernerd@zaubernerd.de>

test: make sure the exec funcction handles command exit code

This test makes sure that the exec function does handle
docker command error results
  • Loading branch information
KnisterPeter authored and github-actions committed May 24, 2022
1 parent dd49b57 commit 4800ba4
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 14 deletions.
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"
"os/signal"
"syscall"

"github.com/nektos/act/cmd"
)
Expand All @@ -16,7 +17,7 @@ func main() {

// trap Ctrl+C and call cancel on the context
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
defer func() {
signal.Stop(c)
cancel()
Expand Down
55 changes: 43 additions & 12 deletions pkg/container/docker_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewContainer(input *NewContainerInput) Container {

// supportsContainerImagePlatform returns true if the underlying Docker server
// API version is 1.41 and beyond
func supportsContainerImagePlatform(ctx context.Context, cli *client.Client) bool {
func supportsContainerImagePlatform(ctx context.Context, cli client.APIClient) bool {
logger := common.Logger(ctx)
ver, err := cli.ServerVersion(ctx)
if err != nil {
Expand Down Expand Up @@ -209,12 +209,12 @@ func (cr *containerReference) ReplaceLogWriter(stdout io.Writer, stderr io.Write
}

type containerReference struct {
cli *client.Client
cli client.APIClient
id string
input *NewContainerInput
}

func GetDockerClient(ctx context.Context) (cli *client.Client, err error) {
func GetDockerClient(ctx context.Context) (cli client.APIClient, err error) {
// TODO: this should maybe need to be a global option, not hidden in here?
// though i'm not sure how that works out when there's another Executor :D
// I really would like something that works on OSX native for eg
Expand Down Expand Up @@ -243,7 +243,7 @@ func GetDockerClient(ctx context.Context) (cli *client.Client, err error) {
}

func GetHostInfo(ctx context.Context) (info types.Info, err error) {
var cli *client.Client
var cli client.APIClient
cli, err = GetDockerClient(ctx)
if err != nil {
return info, err
Expand Down Expand Up @@ -557,6 +557,30 @@ func (cr *containerReference) exec(cmd []string, env map[string]string, user, wo
}
defer resp.Close()

err = cr.waitForCommand(ctx, isTerminal, resp, idResp, user, workdir)
if err != nil {
return err
}

inspectResp, err := cr.cli.ContainerExecInspect(ctx, idResp.ID)
if err != nil {
return errors.WithStack(err)
}

if inspectResp.ExitCode == 0 {
return nil
}

return fmt.Errorf("exit with `FAILURE`: %v", inspectResp.ExitCode)
}
}

func (cr *containerReference) waitForCommand(ctx context.Context, isTerminal bool, resp types.HijackedResponse, idResp types.IDResponse, user string, workdir string) error {
logger := common.Logger(ctx)

cmdResponse := make(chan error)

go func() {
var outWriter io.Writer
outWriter = cr.input.Stdout
if outWriter == nil {
Expand All @@ -567,25 +591,32 @@ func (cr *containerReference) exec(cmd []string, env map[string]string, user, wo
errWriter = os.Stderr
}

var err error
if !isTerminal || os.Getenv("NORAW") != "" {
_, err = stdcopy.StdCopy(outWriter, errWriter, resp.Reader)
} else {
_, err = io.Copy(outWriter, resp.Reader)
}
if err != nil {
logger.Error(err)
}
cmdResponse <- err
}()

inspectResp, err := cr.cli.ContainerExecInspect(ctx, idResp.ID)
select {
case <-ctx.Done():
// send ctrl + c
_, err := resp.Conn.Write([]byte{3})
if err != nil {
return errors.WithStack(err)
logger.Warnf("Failed to send CTRL+C: %+s", err)
}

if inspectResp.ExitCode == 0 {
return nil
// we return the context canceled error to prevent other steps
// from executing
return ctx.Err()
case err := <-cmdResponse:
if err != nil {
logger.Error(err)
}

return fmt.Errorf("exit with `FAILURE`: %v", inspectResp.ExitCode)
return nil
}
}

Expand Down
118 changes: 118 additions & 0 deletions pkg/container/docker_run_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package container

import (
"bufio"
"context"
"io"
"net"
"strings"
"testing"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestDocker(t *testing.T) {
Expand Down Expand Up @@ -45,3 +53,113 @@ func TestDocker(t *testing.T) {
"CONFLICT_VAR": "I_EXIST_IN_MULTIPLE_PLACES",
}, env)
}

type mockDockerClient struct {
client.APIClient
mock.Mock
}

func (m *mockDockerClient) ContainerExecCreate(ctx context.Context, id string, opts types.ExecConfig) (types.IDResponse, error) {
args := m.Called(ctx, id, opts)
return args.Get(0).(types.IDResponse), args.Error(1)
}

func (m *mockDockerClient) ContainerExecAttach(ctx context.Context, id string, opts types.ExecStartCheck) (types.HijackedResponse, error) {
args := m.Called(ctx, id, opts)
return args.Get(0).(types.HijackedResponse), args.Error(1)
}

func (m *mockDockerClient) ContainerExecInspect(ctx context.Context, execID string) (types.ContainerExecInspect, error) {
args := m.Called(ctx, execID)
return args.Get(0).(types.ContainerExecInspect), args.Error(1)
}

type endlessReader struct {
io.Reader
}

func (r endlessReader) Read(p []byte) (n int, err error) {
return 1, nil
}

type mockConn struct {
net.Conn
mock.Mock
}

func (m *mockConn) Write(b []byte) (n int, err error) {
args := m.Called(b)
return args.Int(0), args.Error(1)
}

func (m *mockConn) Close() (err error) {
return nil
}

func TestDockerExecAbort(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

conn := &mockConn{}
conn.On("Write", mock.AnythingOfType("[]uint8")).Return(1, nil)

client := &mockDockerClient{}
client.On("ContainerExecCreate", ctx, "123", mock.AnythingOfType("types.ExecConfig")).Return(types.IDResponse{ID: "id"}, nil)
client.On("ContainerExecAttach", ctx, "id", mock.AnythingOfType("types.ExecStartCheck")).Return(types.HijackedResponse{
Conn: conn,
Reader: bufio.NewReader(endlessReader{}),
}, nil)

cr := &containerReference{
id: "123",
cli: client,
input: &NewContainerInput{
Image: "image",
},
}

channel := make(chan error)

go func() {
channel <- cr.exec([]string{""}, map[string]string{}, "user", "workdir")(ctx)
}()

time.Sleep(500 * time.Millisecond)

cancel()

err := <-channel
assert.ErrorIs(t, err, context.Canceled)

conn.AssertExpectations(t)
client.AssertExpectations(t)
}

func TestDockerExecFailure(t *testing.T) {
ctx := context.Background()

conn := &mockConn{}

client := &mockDockerClient{}
client.On("ContainerExecCreate", ctx, "123", mock.AnythingOfType("types.ExecConfig")).Return(types.IDResponse{ID: "id"}, nil)
client.On("ContainerExecAttach", ctx, "id", mock.AnythingOfType("types.ExecStartCheck")).Return(types.HijackedResponse{
Conn: conn,
Reader: bufio.NewReader(strings.NewReader("output")),
}, nil)
client.On("ContainerExecInspect", ctx, "id").Return(types.ContainerExecInspect{
ExitCode: 1,
}, nil)

cr := &containerReference{
id: "123",
cli: client,
input: &NewContainerInput{
Image: "image",
},
}

err := cr.exec([]string{""}, map[string]string{}, "user", "workdir")(ctx)
assert.Error(t, err, "exit with `FAILURE`: 1")

conn.AssertExpectations(t)
client.AssertExpectations(t)
}
12 changes: 11 additions & 1 deletion pkg/runner/job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package runner
import (
"context"
"fmt"
"time"

"github.com/nektos/act/pkg/common"
"github.com/nektos/act/pkg/model"
Expand Down Expand Up @@ -105,7 +106,16 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
pipeline = append(pipeline, steps...)

return common.NewPipelineExecutor(pipeline...).
Finally(postExecutor).
Finally(func(ctx context.Context) error {
var cancel context.CancelFunc
if ctx.Err() == context.Canceled {
// in case of an aborted run, we still should execute the
// post steps to allow cleanup.
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
}
return postExecutor(ctx)
}).
Finally(info.interpolateOutputs()).
Finally(info.closeContainer())
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"regexp"
"runtime"
"strings"
"time"

log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -172,7 +173,14 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
}

if runner.config.AutoRemove && isLastRunningContainer(s, r) {
var cancel context.CancelFunc
if ctx.Err() == context.Canceled {
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
}

logger.Infof("Cleaning up container for job %s", rc.JobName)

if err := rc.stopJobContainer()(ctx); err != nil {
logger.Errorf("Error while cleaning container: %v", err)
}
Expand Down

0 comments on commit 4800ba4

Please sign in to comment.