Skip to content

Commit

Permalink
Allow multiple CreateContainer operations at the same time. (microsof…
Browse files Browse the repository at this point in the history
…t#1355)

Prior to this change, GCS allowed only one CreateContainer operation
at a time. This isn't an issue in general case, however this doesn't
work properly with synchronization via OCI runtime hook.

Synchronization via runtime hook was introduced in:
microsoft#1258
It injects a `CreateRuntime` OCI hook, if security policy provides
wait paths.
This allows container-A to run after container-B, where container-B
writes to an empty directory volume shared between the two containers
to signal that it's done some setup container-A depends on.
In general case, container-A can be started before container-B which
results in a deadlock, because `CreateContainer` request holds a lock
to a map, which keeps track of running containers.

To resolve the issue, the code has been updated to do a more granular
locking when reading/updating the containers map:
  - Add a new "status" field to Container object and atomic setter/getter,
    which can be either "Created" or "Creating". New `uint32` type alias
    and constants were added to represent the values (`containerCreated`
    and `containerCreating`)
  - Remove locking from `CreateContainer` function
  - Rework `GetContainer` to `GetCreatedContainer`, which returns
    the container object only when it's in `containerCreated` state,
    otherwise either `gcserr.HrVmcomputeSystemNotFound` or
    `gcserr.HrVmcomputeInvalidState` error returned.
  - Add new `AddContainer(id, container)` function, which updates the
    containers map with new container instances.
  - Rework `CreateContainer` to initially add new container objects into
    the containers map and set the "status" to `containerCreating` at the
    start of the function and set it to `containerCreated` only when the
    container is successfully created in runtime.

Reworking `GetContainer` to `GetCreatedContainer` seemed to be the least
invasive change, which allows us to limit updates in the affected places.
If `GetContainer` is left unchanged, then handling of containers in status
"Creating" needs to take place and this requires handling cases when (e.g.)
a modification request is sent to a container which isn't yet running.

Additionally update synchronization CRI tests to use go routines
to properly reproduce the scenario.

Signed-off-by: Maksim An <maksiman@microsoft.com>
  • Loading branch information
anmaxvl authored Apr 22, 2022
1 parent 17607be commit 91f6771
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 32 deletions.
14 changes: 7 additions & 7 deletions guest/bridge/bridge_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (b *Bridge) execProcessV2(r *Request) (_ RequestResponse, err error) {
var c *hcsv2.Container
if params.IsExternal || request.ContainerID == hcsv2.UVMContainerID {
pid, err = b.hostState.RunExternalProcess(ctx, params, conSettings)
} else if c, err = b.hostState.GetContainer(request.ContainerID); err == nil {
} else if c, err = b.hostState.GetCreatedContainer(request.ContainerID); err == nil {
// We found a V2 container. Treat this as a V2 process.
if params.OCIProcess == nil {
pid, err = c.Start(ctx, conSettings)
Expand Down Expand Up @@ -267,7 +267,7 @@ func (b *Bridge) signalContainerV2(ctx context.Context, span *trace.Span, r *Req
b.quitChan <- true
b.hostState.Shutdown()
} else {
c, err := b.hostState.GetContainer(request.ContainerID)
c, err := b.hostState.GetCreatedContainer(request.ContainerID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -296,7 +296,7 @@ func (b *Bridge) signalProcessV2(r *Request) (_ RequestResponse, err error) {
trace.Int64Attribute("pid", int64(request.ProcessID)),
trace.Int64Attribute("signal", int64(request.Options.Signal)))

c, err := b.hostState.GetContainer(request.ContainerID)
c, err := b.hostState.GetCreatedContainer(request.ContainerID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -344,7 +344,7 @@ func (b *Bridge) getPropertiesV2(r *Request) (_ RequestResponse, err error) {
return nil, errors.New("getPropertiesV2 is not supported against the UVM")
}

c, err := b.hostState.GetContainer(request.ContainerID)
c, err := b.hostState.GetCreatedContainer(request.ContainerID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -407,7 +407,7 @@ func (b *Bridge) waitOnProcessV2(r *Request) (_ RequestResponse, err error) {
}
exitCodeChan, doneChan = p.Wait()
} else {
c, err := b.hostState.GetContainer(request.ContainerID)
c, err := b.hostState.GetCreatedContainer(request.ContainerID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -453,7 +453,7 @@ func (b *Bridge) resizeConsoleV2(r *Request) (_ RequestResponse, err error) {
trace.Int64Attribute("height", int64(request.Height)),
trace.Int64Attribute("width", int64(request.Width)))

c, err := b.hostState.GetContainer(request.ContainerID)
c, err := b.hostState.GetCreatedContainer(request.ContainerID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -514,7 +514,7 @@ func (b *Bridge) deleteContainerStateV2(r *Request) (_ RequestResponse, err erro
return nil, errors.Wrapf(err, "failed to unmarshal JSON in message \"%s\"", r.Message)
}

c, err := b.hostState.GetContainer(request.ContainerID)
c, err := b.hostState.GetCreatedContainer(request.ContainerID)
if err != nil {
return nil, err
}
Expand Down
25 changes: 25 additions & 0 deletions guest/runtime/hcsv2/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package hcsv2
import (
"context"
"sync"
"sync/atomic"
"syscall"

"github.com/containerd/cgroups"
Expand All @@ -28,6 +29,18 @@ import (
"github.com/Microsoft/hcsshim/internal/protocol/guestresource"
)

// containerStatus has been introduced to enable parallel container creation
type containerStatus uint32

const (
// containerCreating is the default status set on a Container object, when
// no underlying runtime container or init process has been assigned
containerCreating containerStatus = iota
// containerCreated is the status when a runtime container and init process
// have been assigned, but runtime start command has not been issued yet
containerCreated
)

type Container struct {
id string
vsock transport.Transport
Expand All @@ -43,6 +56,9 @@ type Container struct {

processesMutex sync.Mutex
processes map[uint32]*containerProcess

// Only access atomically through getStatus/setStatus.
status containerStatus
}

func (c *Container) Start(ctx context.Context, conSettings stdio.ConnectionSettings) (int, error) {
Expand Down Expand Up @@ -220,3 +236,12 @@ func (c *Container) GetStats(ctx context.Context) (*v1.Metrics, error) {
func (c *Container) modifyContainerConstraints(ctx context.Context, rt guestrequest.RequestType, cc *guestresource.LCOWContainerConstraints) (err error) {
return c.Update(ctx, cc.Linux)
}

func (c *Container) getStatus() containerStatus {
val := atomic.LoadUint32((*uint32)(&c.status))
return containerStatus(val)
}

func (c *Container) setStatus(st containerStatus) {
atomic.StoreUint32((*uint32)(&c.status), uint32(st))
}
63 changes: 38 additions & 25 deletions guest/runtime/hcsv2/uvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@ import (
"syscall"
"time"

"github.com/Microsoft/hcsshim/internal/guest/policy"
"github.com/mattn/go-shellwords"
"github.com/pkg/errors"

"github.com/Microsoft/hcsshim/internal/guest/gcserr"
"github.com/Microsoft/hcsshim/internal/guest/policy"
"github.com/Microsoft/hcsshim/internal/guest/prot"
"github.com/Microsoft/hcsshim/internal/guest/runtime"
"github.com/Microsoft/hcsshim/internal/guest/spec"
Expand All @@ -36,6 +33,8 @@ import (
"github.com/Microsoft/hcsshim/internal/protocol/guestresource"
"github.com/Microsoft/hcsshim/pkg/annotations"
"github.com/Microsoft/hcsshim/pkg/securitypolicy"
"github.com/mattn/go-shellwords"
"github.com/pkg/errors"
)

// UVMContainerID is the ContainerID that will be sent on any prot.MessageBase
Expand Down Expand Up @@ -123,19 +122,30 @@ func (h *Host) RemoveContainer(id string) {
delete(h.containers, id)
}

func (h *Host) getContainerLocked(id string) (*Container, error) {
func (h *Host) GetCreatedContainer(id string) (*Container, error) {
h.containersMutex.Lock()
defer h.containersMutex.Unlock()

if c, ok := h.containers[id]; !ok {
return nil, gcserr.NewHresultError(gcserr.HrVmcomputeSystemNotFound)
} else {
if c.getStatus() != containerCreated {
return nil, fmt.Errorf("container is not in state \"created\": %w",
gcserr.NewHresultError(gcserr.HrVmcomputeInvalidState))
}
return c, nil
}
}

func (h *Host) GetContainer(id string) (*Container, error) {
func (h *Host) AddContainer(id string, c *Container) error {
h.containersMutex.Lock()
defer h.containersMutex.Unlock()

return h.getContainerLocked(id)
if _, ok := h.containers[id]; ok {
return gcserr.NewHresultError(gcserr.HrVmcomputeSystemAlreadyExists)
}
h.containers[id] = c
return nil
}

func setupSandboxMountsPath(id string) (err error) {
Expand All @@ -162,26 +172,37 @@ func setupSandboxHugePageMountsPath(id string) error {
}

func (h *Host) CreateContainer(ctx context.Context, id string, settings *prot.VMHostedContainerSettingsV2) (_ *Container, err error) {
h.containersMutex.Lock()
defer h.containersMutex.Unlock()
criType, isCRI := settings.OCISpecification.Annotations[annotations.KubernetesContainerType]
c := &Container{
id: id,
vsock: h.vsock,
spec: settings.OCISpecification,
isSandbox: criType == "sandbox",
exitType: prot.NtUnexpectedExit,
processes: make(map[uint32]*containerProcess),
status: containerCreating,
}

if _, ok := h.containers[id]; ok {
return nil, gcserr.NewHresultError(gcserr.HrVmcomputeSystemAlreadyExists)
if err := h.AddContainer(id, c); err != nil {
return nil, err
}
defer func() {
if err != nil {
h.RemoveContainer(id)
}
}()

err = h.securityPolicyEnforcer.EnforceCreateContainerPolicy(
id,
settings.OCISpecification.Process.Args,
settings.OCISpecification.Process.Env,
settings.OCISpecification.Process.Cwd,
)

if err != nil {
return nil, errors.Wrapf(err, "container creation denied due to policy")
}

var namespaceID string
criType, isCRI := settings.OCISpecification.Annotations[annotations.KubernetesContainerType]
// for sandbox container sandboxID is same as container id
sandboxID := id
if isCRI {
Expand Down Expand Up @@ -290,15 +311,7 @@ func (h *Host) CreateContainer(ctx context.Context, id string, settings *prot.VM
return nil, errors.Wrapf(err, "failed to get container init process")
}

c := &Container{
id: id,
vsock: h.vsock,
spec: settings.OCISpecification,
isSandbox: criType == "sandbox",
container: con,
exitType: prot.NtUnexpectedExit,
processes: make(map[uint32]*containerProcess),
}
c.container = con
c.initProcess = newProcess(c, settings.OCISpecification.Process, init, uint32(c.container.Pid()), true)

// Sandbox or standalone, move the networks to the container namespace
Expand All @@ -318,7 +331,7 @@ func (h *Host) CreateContainer(ctx context.Context, id string, settings *prot.VM
}
}

h.containers[id] = c
c.setStatus(containerCreated)
return c, nil
}

Expand All @@ -337,7 +350,7 @@ func (h *Host) modifyHostSettings(ctx context.Context, containerID string, req *
case guestresource.ResourceTypeVPCIDevice:
return modifyMappedVPCIDevice(ctx, req.RequestType, req.Settings.(*guestresource.LCOWMappedVPCIDevice))
case guestresource.ResourceTypeContainerConstraints:
c, err := h.GetContainer(containerID)
c, err := h.GetCreatedContainer(containerID)
if err != nil {
return err
}
Expand All @@ -355,7 +368,7 @@ func (h *Host) modifyHostSettings(ctx context.Context, containerID string, req *
}

func (h *Host) modifyContainerSettings(ctx context.Context, containerID string, req *guestrequest.ModificationRequest) error {
c, err := h.GetContainer(containerID)
c, err := h.GetCreatedContainer(containerID)
if err != nil {
return err
}
Expand Down

0 comments on commit 91f6771

Please sign in to comment.