Skip to content

Commit

Permalink
Don't send TaskExit when blackout occurs
Browse files Browse the repository at this point in the history
  • Loading branch information
kevpar committed Nov 15, 2024
1 parent 4b58427 commit 84c2a4d
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 42 deletions.
16 changes: 12 additions & 4 deletions internal/core/linuxvm/ctr.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ type ctr struct {
io cmd.UpstreamIO
waitCh chan struct{}
waitErr error
waitCtx context.Context
}

var _ core.Ctr = (*ctr)(nil)

func newCtr(innerCtr cow.Container, io cmd.UpstreamIO) *ctr {
func newCtr(innerCtr cow.Container, io cmd.UpstreamIO, waitCtx context.Context) *ctr {
cmd := &cmd.Cmd{
Host: innerCtr,
}
Expand All @@ -32,6 +33,7 @@ func newCtr(innerCtr cow.Container, io cmd.UpstreamIO) *ctr {
innerCtr: innerCtr,
init: newProcess(cmd, io),
waitCh: make(chan struct{}),
waitCtx: waitCtx,
}
return ctr
}
Expand All @@ -58,11 +60,17 @@ func (c *ctr) Wait(ctx context.Context) error {

func (c *ctr) waitBackground() {
c.waitErr = func() error {
if err := c.init.Wait(context.Background()); err != nil {
if err := c.init.Wait(c.waitCtx); err == context.Canceled {
return ErrClose
} else if err != nil {
return err
}
<-c.innerCtr.WaitChannel()
return c.innerCtr.WaitError()
select {
case <-c.innerCtr.WaitChannel():
return c.innerCtr.WaitError()
case <-c.waitCtx.Done():
return ErrClose
}
}()
if c.io != nil {
c.io.Close(context.Background())
Expand Down
8 changes: 6 additions & 2 deletions internal/core/linuxvm/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (s *Sandbox) LMPrepare(ctx context.Context) (_ *statepkg.SandboxState, _ *c
}

func (s *Sandbox) LMTransfer(ctx context.Context, socket uintptr) (core.Migrated, error) {
s.waitCancel()
if err := s.vm.LMTransfer(ctx, socket, s.isLMSrc); err != nil {
return nil, err
}
Expand Down Expand Up @@ -201,6 +202,7 @@ func newSandbox(ctx context.Context, vm *vm.VM, agentConfig *statepkg.GCState, n
}
}

waitCtx, waitCancel := context.WithCancel(context.Background())
return &Sandbox{
vm: vm,
gm: gm,
Expand All @@ -211,7 +213,9 @@ func newSandbox(ctx context.Context, vm *vm.VM, agentConfig *statepkg.GCState, n
allowMigration: true,
resources: make(map[string]resourceUseLayers),
},
waitCh: make(chan struct{}),
ifaces: ifaces,
waitCh: make(chan struct{}),
ifaces: ifaces,
waitCtx: waitCtx,
waitCancel: waitCancel,
}, nil
}
61 changes: 32 additions & 29 deletions internal/core/linuxvm/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ package linuxvm
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"slices"
"strconv"
"strings"
"sync"

"github.com/Microsoft/go-winio"
"github.com/Microsoft/go-winio/pkg/guid"
"github.com/Microsoft/hcsshim/internal/cmd"
"github.com/Microsoft/hcsshim/internal/core"
"github.com/Microsoft/hcsshim/internal/cow"
"github.com/Microsoft/hcsshim/internal/guestmanager"
hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2"
"github.com/Microsoft/hcsshim/internal/hns"
Expand All @@ -29,12 +30,17 @@ import (
"github.com/opencontainers/runtime-spec/specs-go"
)

var (
ErrClose = errors.New("sandbox was closed")
)

type Sandbox struct {
vm *vmpkg.VM
gm *guestmanager.LinuxManager
gt *guestThing
translator *translator
pauseCtr *ctr
ctrsLock sync.Mutex
ctrs map[string]*ctr
waitCh chan struct{}
waitErr error
Expand All @@ -43,6 +49,8 @@ type Sandbox struct {
isLMSrc bool
ifaces []*statepkg.GuestInterface
state *statepkg.SandboxState
waitCtx context.Context
waitCancel func()
}

type translator struct {
Expand Down Expand Up @@ -215,21 +223,26 @@ func NewSandbox(ctx context.Context, id string, l *layers.LCOWLayers2, spec *spe
allowMigration: allowMigration,
resources: make(map[string]resourceUseLayers),
}
pauseCtr, err := createCtr(ctx, ctrConfig, translator, gt)
waitCtx, waitCancel := context.WithCancel(context.Background())
pauseCtr, err := createCtr(ctx, ctrConfig, translator, gt, waitCtx)
if err != nil {
return nil, err
}

return &Sandbox{
vm: vm,
gm: gm,
gt: gt,
translator: translator,
pauseCtr: pauseCtr,
ctrs: make(map[string]*ctr),
vm: vm,
gm: gm,
gt: gt,
translator: translator,
pauseCtr: pauseCtr,
ctrs: map[string]*ctr{
id: pauseCtr,
},
allowMigration: allowMigration,
waitCh: make(chan struct{}),
ifaces: ifaces,
waitCtx: waitCtx,
waitCancel: waitCancel,
}, nil
}

Expand Down Expand Up @@ -275,7 +288,14 @@ func (s *Sandbox) Start(ctx context.Context) error {
}

func (s *Sandbox) CreateLinuxContainer(ctx context.Context, c *core.LinuxCtrConfig) (_ core.Ctr, err error) {
return createCtr(ctx, c, s.translator, s.gt)
ctr, err := createCtr(ctx, c, s.translator, s.gt, s.waitCtx)
if err != nil {
return nil, err
}
s.ctrsLock.Lock()
s.ctrs[c.ID] = ctr
s.ctrsLock.Unlock()
return ctr, nil
}

func (s *Sandbox) RestoreLinuxContainer(ctx context.Context, cid string, pid uint32, myIO cmd.UpstreamIO) (core.Ctr, error) {
Expand All @@ -302,30 +322,13 @@ func (s *Sandbox) RestoreLinuxContainer(ctx context.Context, cid string, pid uin
init: p,
io: myIO,
waitCh: make(chan struct{}),
waitCtx: s.waitCtx,
}
go p.waitBackground()
go c.waitBackground()
return c, nil
}

func restoreCtr(innerCtr cow.Container, processSpec *specs.Process, io cmd.UpstreamIO) *ctr {
cmd := &cmd.Cmd{
Host: innerCtr,
Spec: processSpec,
}
if io != nil {
cmd.Stdin = io.Stdin()
cmd.Stdout = io.Stdout()
cmd.Stderr = io.Stderr()
}
ctr := &ctr{
innerCtr: innerCtr,
init: newProcess(cmd, io),
waitCh: make(chan struct{}),
}
return ctr
}

type cleanupSet []resources.ResourceCloser

func (cs cleanupSet) Release(ctx context.Context) (retErr error) {
Expand All @@ -337,7 +340,7 @@ func (cs cleanupSet) Release(ctx context.Context) (retErr error) {
return
}

func createCtr(ctx context.Context, c *core.LinuxCtrConfig, t *translator, gt *guestThing) (_ *ctr, err error) {
func createCtr(ctx context.Context, c *core.LinuxCtrConfig, t *translator, gt *guestThing, waitCtx context.Context) (_ *ctr, err error) {
gc, _, err := t.translate(ctx, c)
if err != nil {
return nil, err
Expand All @@ -346,7 +349,7 @@ func createCtr(ctx context.Context, c *core.LinuxCtrConfig, t *translator, gt *g
if err != nil {
return nil, err
}
return newCtr(ctr, c.IO), nil
return newCtr(ctr, c.IO, waitCtx), nil
}

func (t *translator) translate(ctx context.Context, c *core.LinuxCtrConfig) (_ *guestConfig, _ []resources.ResourceCloser, err error) {
Expand Down
17 changes: 11 additions & 6 deletions internal/taskserver/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ func (s *State) setExited(code uint32) {

type Sandbox struct {
*State
Sandbox core.Sandbox
m sync.Mutex
Tasks map[string]*Task
Sandbox core.Sandbox
m sync.Mutex
Tasks map[string]*Task
waitCtx context.Context
waitCancel func()
}

func (s *Sandbox) get(taskID, execID string) (core.GenericCompute, *State, error) {
Expand Down Expand Up @@ -178,10 +180,13 @@ func (s *service) newOCISandbox(ctx context.Context, shimOpts *runhcsopts.Option
return nil, err
}

waitCtx, waitCancel := context.WithCancel(context.Background())
return &Sandbox{
Sandbox: sandbox,
Tasks: make(map[string]*Task),
State: newTaskState(req),
Sandbox: sandbox,
Tasks: make(map[string]*Task),
State: newTaskState(req),
waitCtx: waitCtx,
waitCancel: waitCancel,
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions internal/taskserver/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ func (s *service) DialChannel(ctx context.Context, req *lmproto.DialChannelReque
}

func (s *service) TransferSandbox(ctx context.Context, req *lmproto.TransferSandboxRequest, stream lmproto.Migration_TransferSandboxServer) error {
if s.sandbox != nil {
logrus.Info("aborting task waits")
s.sandbox.waitCancel()
}
logrus.Info("TransferSandbox called")
if s.migState.c == 0 {
return fmt.Errorf("must set up channel before transferring")
Expand Down
19 changes: 18 additions & 1 deletion internal/taskserver/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,24 @@ func (s *service) Start(ctx context.Context, req *task.StartRequest) (*task.Star
}
}
go func() {
c.Wait(context.Background())
waitCh := make(chan error)
go func() {
waitCh <- c.Wait(context.Background())
}()
select {
case err := <-waitCh:
logrus.WithFields(logrus.Fields{
"taskID": req.ID,
"execID": req.ExecID,
logrus.ErrorKey: err,
}).Error("failed waiting for task exit")
case <-s.sandbox.waitCtx.Done():
logrus.WithFields(logrus.Fields{
"taskID": req.ID,
"execID": req.ExecID,
}).Info("aborted task wait")
return
}
state.setExited(uint32(c.Status().ExitCode()))
if err := s.publisher.PublishEvent(ctx, runtime.TaskExitEventTopic, &events.TaskExit{
ContainerID: state.TaskID,
Expand Down

0 comments on commit 84c2a4d

Please sign in to comment.