Skip to content

Commit

Permalink
Allow multiple CreateContainer operations at the same time.
Browse files Browse the repository at this point in the history
Prior to this change, GCS allowed only one CreateContainer operation
at a time. This isn't an issue in general case, however this doesn't
work properly with synchronization via OCI runtime hook.

Synchronization via runtime hook was introduced in:
microsoft#1258
It injects a CreateRuntime OCI hook, if security policy provides
wait paths.
This allows container-A to run after container-B, where container-B
writes to an empty directory volume shared between the two containers
to signal that it's done some setup container-A depends on.
In general case, container-A can be started before container-B which
results in a deadlock, because CreateContainer request holds a lock
to a map, which keeps track of running containers.

To resolve the issue, the code has been updated to do a more granular
locking when reading/updating the containers map:
  - Add a new "Status" field to Container object, which can be either
    "Running" or "Creating".
  - Remove locking from CreateContainer function
  - Rework "GetContainer" to "GetRunningContainer", which returns
    the container object only when it's in "Running" state, otherwise
    either `gcserr.HrVmcomputeSystemNotFound` or `gcserr.HrVmcomputeInvalidState`
    error returned.
  - Add new "AddContainer(id, container)" function, which updates the
    containers map with new container instances.
  - Rework CreateContainer to initially add new container objects into
    the containers map and set the "Status" to "Creating" at the start
    of the function and set it to "Running" only when the container
    successfully starts.

Reworking "GetContainer" to "GetRunningContainer" seemed to be the least
invasive change, which allows us to limit updates in the affected places.
If "GetContainer" is left unchanged, then handling of containers in status
"Creating" needs to take place and this requires handling cases when (e.g.)
a modification request is sent to a container which isn't yet running.

Signed-off-by: Maksim An <maksiman@microsoft.com>
  • Loading branch information
anmaxvl committed Apr 15, 2022
1 parent ccec73f commit 2fba6ce
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 35 deletions.
14 changes: 7 additions & 7 deletions internal/guest/bridge/bridge_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (b *Bridge) execProcessV2(r *Request) (_ RequestResponse, err error) {
var c *hcsv2.Container
if params.IsExternal || request.ContainerID == hcsv2.UVMContainerID {
pid, err = b.hostState.RunExternalProcess(ctx, params, conSettings)
} else if c, err = b.hostState.GetContainer(request.ContainerID); err == nil {
} else if c, err = b.hostState.GetRunningContainer(request.ContainerID); err == nil {
// We found a V2 container. Treat this as a V2 process.
if params.OCIProcess == nil {
pid, err = c.Start(ctx, conSettings)
Expand Down Expand Up @@ -267,7 +267,7 @@ func (b *Bridge) signalContainerV2(ctx context.Context, span *trace.Span, r *Req
b.quitChan <- true
b.hostState.Shutdown()
} else {
c, err := b.hostState.GetContainer(request.ContainerID)
c, err := b.hostState.GetRunningContainer(request.ContainerID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -296,7 +296,7 @@ func (b *Bridge) signalProcessV2(r *Request) (_ RequestResponse, err error) {
trace.Int64Attribute("pid", int64(request.ProcessID)),
trace.Int64Attribute("signal", int64(request.Options.Signal)))

c, err := b.hostState.GetContainer(request.ContainerID)
c, err := b.hostState.GetRunningContainer(request.ContainerID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -344,7 +344,7 @@ func (b *Bridge) getPropertiesV2(r *Request) (_ RequestResponse, err error) {
return nil, errors.New("getPropertiesV2 is not supported against the UVM")
}

c, err := b.hostState.GetContainer(request.ContainerID)
c, err := b.hostState.GetRunningContainer(request.ContainerID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -407,7 +407,7 @@ func (b *Bridge) waitOnProcessV2(r *Request) (_ RequestResponse, err error) {
}
exitCodeChan, doneChan = p.Wait()
} else {
c, err := b.hostState.GetContainer(request.ContainerID)
c, err := b.hostState.GetRunningContainer(request.ContainerID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -453,7 +453,7 @@ func (b *Bridge) resizeConsoleV2(r *Request) (_ RequestResponse, err error) {
trace.Int64Attribute("height", int64(request.Height)),
trace.Int64Attribute("width", int64(request.Width)))

c, err := b.hostState.GetContainer(request.ContainerID)
c, err := b.hostState.GetRunningContainer(request.ContainerID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -514,7 +514,7 @@ func (b *Bridge) deleteContainerStateV2(r *Request) (_ RequestResponse, err erro
return nil, errors.Wrapf(err, "failed to unmarshal JSON in message \"%s\"", r.Message)
}

c, err := b.hostState.GetContainer(request.ContainerID)
c, err := b.hostState.GetRunningContainer(request.ContainerID)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/guest/gcserr/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ func BaseStackTrace(e error) errors.StackTrace {
return tracer.StackTrace()
}

type baseHresultError struct {
type BaseHresultError struct {
hresult Hresult
}

func (e *baseHresultError) Error() string {
func (e *BaseHresultError) Error() string {
return fmt.Sprintf("HRESULT: 0x%x", uint32(e.Hresult()))
}
func (e *baseHresultError) Hresult() Hresult {
func (e *BaseHresultError) Hresult() Hresult {
return e.hresult
}

Expand Down Expand Up @@ -139,7 +139,7 @@ func (e *wrappingHresultError) StackTrace() errors.StackTrace {

// NewHresultError produces a new error with the given HRESULT.
func NewHresultError(hresult Hresult) error {
return &baseHresultError{hresult: hresult}
return &BaseHresultError{hresult: hresult}
}

// WrapHresult produces a new error with the given HRESULT and wrapping the
Expand Down
9 changes: 9 additions & 0 deletions internal/guest/runtime/hcsv2/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ import (
"github.com/Microsoft/hcsshim/internal/protocol/guestresource"
)

type containerStatus string

const (
containerCreating containerStatus = "Creating"
containerRunning containerStatus = "Running"
)

type Container struct {
id string
vsock transport.Transport
Expand All @@ -42,6 +49,8 @@ type Container struct {

processesMutex sync.Mutex
processes map[uint32]*containerProcess

Status containerStatus
}

func (c *Container) Start(ctx context.Context, conSettings stdio.ConnectionSettings) (int, error) {
Expand Down
62 changes: 42 additions & 20 deletions internal/guest/runtime/hcsv2/uvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,29 @@ func (h *Host) RemoveContainer(id string) {
delete(h.containers, id)
}

func (h *Host) getContainerLocked(id string) (*Container, error) {
func (h *Host) GetRunningContainer(id string) (*Container, error) {
h.containersMutex.Lock()
defer h.containersMutex.Unlock()

if c, ok := h.containers[id]; !ok {
return nil, gcserr.NewHresultError(gcserr.HrVmcomputeSystemNotFound)
} else {
if c.Status != containerRunning {
return nil, gcserr.NewHresultError(gcserr.HrVmcomputeInvalidState)
}
return c, nil
}
}

func (h *Host) GetContainer(id string) (*Container, error) {
func (h *Host) AddContainer(id string, c *Container) error {
h.containersMutex.Lock()
defer h.containersMutex.Unlock()

return h.getContainerLocked(id)
if _, ok := h.containers[id]; ok {
return gcserr.NewHresultError(gcserr.HrVmcomputeSystemAlreadyExists)
}
h.containers[id] = c
return nil
}

func setupSandboxMountsPath(id string) (err error) {
Expand All @@ -156,11 +166,13 @@ func setupSandboxHugePageMountsPath(id string) error {
}

func (h *Host) CreateContainer(ctx context.Context, id string, settings *prot.VMHostedContainerSettingsV2) (_ *Container, err error) {
h.containersMutex.Lock()
defer h.containersMutex.Unlock()

if _, ok := h.containers[id]; ok {
if _, err := h.GetRunningContainer(id); err == nil {
return nil, gcserr.NewHresultError(gcserr.HrVmcomputeSystemAlreadyExists)
} else {
herr := err.(*gcserr.BaseHresultError)
if herr.Hresult() == gcserr.HrVmcomputeInvalidState {
return nil, gcserr.NewHresultError(gcserr.HrVmcomputeSystemAlreadyExists)
}
}

err = h.securityPolicyEnforcer.EnforceCreateContainerPolicy(
Expand All @@ -174,8 +186,26 @@ func (h *Host) CreateContainer(ctx context.Context, id string, settings *prot.VM
return nil, errors.Wrapf(err, "container creation denied due to policy")
}

var namespaceID string
criType, isCRI := settings.OCISpecification.Annotations[annotations.KubernetesContainerType]
c := &Container{
id: id,
vsock: h.vsock,
spec: settings.OCISpecification,
isSandbox: criType == "sandbox",
exitType: prot.NtUnexpectedExit,
processes: make(map[uint32]*containerProcess),
Status: containerCreating,
}
if err := h.AddContainer(id, c); err != nil {
return nil, err
}
defer func() {
if err != nil {
h.RemoveContainer(id)
}
}()

var namespaceID string
if isCRI {
switch criType {
case "sandbox":
Expand Down Expand Up @@ -271,15 +301,7 @@ func (h *Host) CreateContainer(ctx context.Context, id string, settings *prot.VM
return nil, errors.Wrapf(err, "failed to get container init process")
}

c := &Container{
id: id,
vsock: h.vsock,
spec: settings.OCISpecification,
isSandbox: criType == "sandbox",
container: con,
exitType: prot.NtUnexpectedExit,
processes: make(map[uint32]*containerProcess),
}
c.container = con
c.initProcess = newProcess(c, settings.OCISpecification.Process, init, uint32(c.container.Pid()), true)

// Sandbox or standalone, move the networks to the container namespace
Expand All @@ -299,7 +321,7 @@ func (h *Host) CreateContainer(ctx context.Context, id string, settings *prot.VM
}
}

h.containers[id] = c
c.Status = containerRunning
return c, nil
}

Expand All @@ -318,7 +340,7 @@ func (h *Host) modifyHostSettings(ctx context.Context, containerID string, req *
case guestresource.ResourceTypeVPCIDevice:
return modifyMappedVPCIDevice(ctx, req.RequestType, req.Settings.(*guestresource.LCOWMappedVPCIDevice))
case guestresource.ResourceTypeContainerConstraints:
c, err := h.GetContainer(containerID)
c, err := h.GetRunningContainer(containerID)
if err != nil {
return err
}
Expand All @@ -336,7 +358,7 @@ func (h *Host) modifyHostSettings(ctx context.Context, containerID string, req *
}

func (h *Host) modifyContainerSettings(ctx context.Context, containerID string, req *guestrequest.ModificationRequest) error {
c, err := h.GetContainer(containerID)
c, err := h.GetRunningContainer(containerID)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions test/cri-containerd/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,13 @@ func Test_RunContainers_WithSyncHooks_ValidWaitPath(t *testing.T) {
cidWriter := createContainer(t, client, ctx, writerReq)
cidWaiter := createContainer(t, client, ctx, waiterReq)

startContainer(t, client, ctx, cidWriter)
defer removeContainer(t, client, ctx, cidWriter)
defer stopContainer(t, client, ctx, cidWriter)

startContainer(t, client, ctx, cidWaiter)
defer removeContainer(t, client, ctx, cidWaiter)
defer stopContainer(t, client, ctx, cidWaiter)

startContainer(t, client, ctx, cidWriter)
defer removeContainer(t, client, ctx, cidWriter)
defer stopContainer(t, client, ctx, cidWriter)
}

func Test_RunContainers_WithSyncHooks_InvalidWaitPath(t *testing.T) {
Expand Down

0 comments on commit 2fba6ce

Please sign in to comment.