Skip to content

Commit

Permalink
Enhance locking/serialization for concurrent exec (vmware#8180)
Browse files Browse the repository at this point in the history
  • Loading branch information
lcastellano authored Aug 10, 2018
1 parent b5678ad commit 67b0892
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 23 deletions.
28 changes: 22 additions & 6 deletions lib/apiservers/engine/backends/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,13 @@ func (c *ContainerBackend) ContainerExecCreate(name string, config *types.ExecCo

var eid string
operation := func() error {
if vc.TryLock(APITimeout) {
op.Debugf("ContainerExecCreate trying to lock vc: %s", name)
if vc.TryLock(time.Second) {
op.Debugf("ContainerExecCreate successfully locked vc: %s", name)
defer vc.Unlock()
} else {
op.Debugf("ContainerExecCreate cannot lock vc, retrying ..: %s", name)
return engerr.NewLockTimeoutError("ContainerExecCreate")
}

handle, err := c.Handle(id, name)
Expand Down Expand Up @@ -250,10 +255,11 @@ func (c *ContainerBackend) ContainerExecCreate(name string, config *types.ExecCo

// configure custom exec back off configure
backoffConf := retry.NewBackoffConfig()
backoffConf.MaxInterval = 2 * time.Second
backoffConf.MaxInterval = 5 * time.Second
backoffConf.InitialInterval = 500 * time.Millisecond
backoffConf.MaxElapsedTime = 20 * time.Minute

if err := retry.DoWithConfig(operation, engerr.IsConflictError, backoffConf); err != nil {
if err := retry.DoWithConfig(operation, engerr.IsLockTimeoutOrConflictError, backoffConf); err != nil {
op.Errorf("Failed to start Exec task for container(%s) due to error (%s)", id, err)
return "", err
}
Expand Down Expand Up @@ -351,7 +357,7 @@ func (c *ContainerBackend) ContainerExecResize(eid string, height, width int) er
// attachHelper performs some basic type transformation and makes blocking call to AttachStreams
// autoclose determines if stdin will be closed when both stdout and stderr have closed
func (c *ContainerBackend) attachHelper(op trace.Operation, ec *models.TaskInspectResponse, stdin io.ReadCloser, stdout, stderr io.Writer, autoclose bool) error {
defer trace.End(trace.Begin(ec.ID))
defer trace.End(trace.Begin(ec.ID, op))

ca := &backend.ContainerAttachConfig{
UseStdin: ec.OpenStdin,
Expand Down Expand Up @@ -382,6 +388,8 @@ func processAttachError(op trace.Operation, actor *eventtypes.Actor, err error)
return false, nil
}

defer trace.End(trace.Begin(err.Error(), op))

if _, ok := err.(engerr.DetachError); ok {
op.Infof("Detach detected")

Expand All @@ -406,6 +414,8 @@ func processAttachError(op trace.Operation, actor *eventtypes.Actor, err error)
// task data
// error if any
func (c *ContainerBackend) taskStartHelper(op trace.Operation, id, eid, name string) (*models.TaskInspectResponse, error) {
defer trace.End(trace.Begin(fmt.Sprintf("%s.%s", id, eid), op))

handle, err := c.Handle(id, name)
if err != nil {
op.Errorf("Failed to obtain handle during exec start for container(%s) due to error: %s", id, err)
Expand Down Expand Up @@ -509,6 +519,7 @@ func (c *ContainerBackend) ContainerExecStart(ctx context.Context, eid string, s
backoffConf := retry.NewBackoffConfig()
backoffConf.MaxInterval = 2 * time.Second
backoffConf.InitialInterval = 500 * time.Millisecond
backoffConf.MaxElapsedTime = 20 * time.Minute

// Look up the container name in the metadata cache to get long ID
vc := cache.ContainerCache().GetContainerFromExec(eid)
Expand All @@ -522,16 +533,21 @@ func (c *ContainerBackend) ContainerExecStart(ctx context.Context, eid string, s

var ec *models.TaskInspectResponse
operation := func() error {
if vc.TryLock(APITimeout) {
op.Debugf("ContainerExecStart trying to lock vc: %s", name)
if vc.TryLock(time.Second) {
op.Debugf("ContainerExecStart successfully locked vc: %s", name)
defer vc.Unlock()
} else {
op.Debugf("ContainerExecStart cannot lock vc, retryng ..: %s", name)
return engerr.NewLockTimeoutError("ContainerExecStart")
}

var err error
ec, err = c.taskStartHelper(op, id, eid, name)
return err
}

if err := retry.DoWithConfig(operation, engerr.IsConflictError, backoffConf); err != nil {
if err := retry.DoWithConfig(operation, engerr.IsLockTimeoutOrConflictError, backoffConf); err != nil {
op.Errorf("Failed to start Exec task for container(%s) due to error (%s)", id, err)
return err
}
Expand Down
22 changes: 22 additions & 0 deletions lib/apiservers/engine/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,25 @@ type DetachError struct{}
func (DetachError) Error() string {
return "detached from container"
}

// LockTimeoutError is returned when a tryLock operation times out
type LockTimeoutError struct {
Desc string
}

func (e LockTimeoutError) Error() string {
return fmt.Sprintf("LockTimeoutError error: %s", e.Desc)
}

func NewLockTimeoutError(desc string) error {
return LockTimeoutError{Desc: desc}
}

func IsLockTimeoutOrConflictError(err error) bool {
// Is Error is due to Timeout or a Conflict return true
if _, ok := err.(LockTimeoutError); ok {
return true
}

return IsConflictError(err)
}
7 changes: 0 additions & 7 deletions lib/portlayer/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,6 @@ func toggleActive(op *trace.Operation, h interface{}, id string, active bool) (i
return nil, fmt.Errorf("Type assertion failed for %#+v", handle)
}

op.Debugf("target task ID: %s", id)
op.Debugf("session tasks during inspect: %s", handle.ExecConfig.Sessions)
// print all of them, otherwise we will have to assemble the id list regardless of
// the log level at the moment. If there is a way to check the log level we should
// do that. since the other approach will slow down all calls to toggleActive.
op.Debugf("exec tasks during inspect: %s", handle.ExecConfig.Execs)

var task *executor.SessionConfig
if taskS, okS := handle.ExecConfig.Sessions[id]; okS {
task = taskS
Expand Down
4 changes: 0 additions & 4 deletions lib/portlayer/task/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ func Inspect(op *trace.Operation, h interface{}, id string) (*executor.SessionCo
stasks := handle.ExecConfig.Sessions
etasks := handle.ExecConfig.Execs

op.Debugf("target task ID: %s", id)
op.Debugf("session tasks during inspect: %s", stasks)
op.Debugf("exec tasks during inspect: %s", etasks)

if _, ok := stasks[id]; ok {
return stasks[id], nil
}
Expand Down
12 changes: 6 additions & 6 deletions tests/test-cases/Group1-Docker-Commands/1-38-Docker-Exec.robot
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ Concurrent Simple Exec
${rc} ${id}= Run And Return Rc And Output docker %{VCH-PARAMS} run -itd --name ${ExecSimpleContainer} ${busybox} /bin/top
Should Be Equal As Integers ${rc} 0

:FOR ${idx} IN RANGE 1 20
:FOR ${idx} IN RANGE 1 50
\ Start Process docker %{VCH-PARAMS} exec ${id} /bin/ls alias=exec-simple-%{VCH-NAME}-${idx} shell=true

:FOR ${idx} IN RANGE 1 20
\ ${result}= Wait For Process exec-simple-%{VCH-NAME}-${idx} timeout=40s
:FOR ${idx} IN RANGE 1 50
\ ${result}= Wait For Process exec-simple-%{VCH-NAME}-${idx} timeout=300s
\ Should Be Equal As Integers ${result.rc} 0
\ Verify LS Output For Busybox ${result.stdout}
# stop the container now that we have a successful series of concurrent execs
Expand All @@ -248,11 +248,11 @@ Concurrent Simple Exec Detached
${rc} ${id}= Run And Return Rc And Output docker %{VCH-PARAMS} run -itd --name ${ExecSimpleContainer} ${busybox} /bin/top
Should Be Equal As Integers ${rc} 0

:FOR ${idx} IN RANGE 1 20
:FOR ${idx} IN RANGE 1 50
\ Start Process docker %{VCH-PARAMS} exec -d ${id} touch /tmp/${idx} alias=exec-simple-detached-%{VCH-NAME}-${idx} shell=true

:FOR ${idx} IN RANGE 1 20
\ ${result}= Wait For Process exec-simple-detached-%{VCH-NAME}-${idx} timeout=40s
:FOR ${idx} IN RANGE 1 50
\ ${result}= Wait For Process exec-simple-detached-%{VCH-NAME}-${idx} timeout=300s
\ Should Be Equal As Integers ${result.rc} 0

### TODO: check inspect status and wait for execs to stop
Expand Down

0 comments on commit 67b0892

Please sign in to comment.