Skip to content

Commit

Permalink
Add containerd-shim plumbing for job containers
Browse files Browse the repository at this point in the history
* Add the necessary plumbing in containerd shim to be able to create a job container
if asked for via the annotation.

* Rework jobcontainers package a bit to return a resources struct to avoid some hacks during cleanup.
This was resource cleanup for wcow/lcow is the exact same for job containers in the shim.

* Change some of the layer code to handle taking in a volume mount point

Signed-off-by: Daniel Canter <dcanter@microsoft.com>
  • Loading branch information
dcantah committed Jun 23, 2021
1 parent 62680e0 commit bbf5589
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 137 deletions.
52 changes: 46 additions & 6 deletions cmd/containerd-shim-runhcs-v1/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func createPod(ctx context.Context, events publisher, req *task.CreateTaskReques
owner := filepath.Base(os.Args[0])
isWCOW := oci.IsWCOW(s)

p := pod{
events: events,
id: req.ID,
}

var parent *uvm.UtilityVM
if oci.IsIsolated(s) {
// Create the UVM parent
Expand Down Expand Up @@ -125,22 +130,41 @@ func createPod(ctx context.Context, events publisher, req *task.CreateTaskReques
parent.Close()
return nil, err
}
} else if oci.IsJobContainer(s) {
// If we're making a job container fake a task (i.e reuse the wcowPodSandbox logic)
p.sandboxTask = newWcowPodSandboxTask(ctx, events, req.ID, req.Bundle, parent, "")
if err := events.publishEvent(
ctx,
runtime.TaskCreateEventTopic,
&eventstypes.TaskCreate{
ContainerID: req.ID,
Bundle: req.Bundle,
Rootfs: req.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: req.Stdin,
Stdout: req.Stdout,
Stderr: req.Stderr,
Terminal: req.Terminal,
},
Checkpoint: "",
Pid: 0,
}); err != nil {
return nil, err
}
p.jobContainer = true
return &p, nil
} else if !isWCOW {
return nil, errors.Wrap(errdefs.ErrFailedPrecondition, "oci spec does not contain WCOW or LCOW spec")
}

defer func() {
// clean up the uvm if we fail any further operations
if err != nil && parent != nil {
parent.Close()
}
}()

p := pod{
events: events,
id: req.ID,
host: parent,
}

p.host = parent
if parent != nil {
cid := req.ID
if id, ok := s.Annotations[oci.AnnotationNcproxyContainerID]; ok {
Expand Down Expand Up @@ -232,6 +256,11 @@ type pod struct {
// It MUST be treated as read only in the lifetime of the pod.
host *uvm.UtilityVM

// jobContainer specifies whether this pod is for WCOW job containers only.
//
// It MUST be treated as read only in the lifetime of the pod.
jobContainer bool

workloadTasks sync.Map
}

Expand Down Expand Up @@ -263,6 +292,17 @@ func (p *pod) CreateTask(ctx context.Context, req *task.CreateTaskRequest, s *sp
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "task with id: '%s' already exists id pod: '%s'", req.ID, p.id)
}

if p.jobContainer {
// This is a short circuit to make sure that all containers in a pod will have
// the same IP address/be added to the same compartment.
//
// There will need to be OS work needed to support this scenario, so for now we need to block on
// this.
if !oci.IsJobContainer(s) {
return nil, errors.New("cannot create a normal process isolated container if the pod sandbox is a job container")
}
}

ct, sid, err := oci.GetSandboxTypeAndID(s.Annotations)
if err != nil {
return nil, err
Expand Down
52 changes: 37 additions & 15 deletions cmd/containerd-shim-runhcs-v1/task_hcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/Microsoft/hcsshim/internal/hcs/schema1"
hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2"
"github.com/Microsoft/hcsshim/internal/hcsoci"
"github.com/Microsoft/hcsshim/internal/jobcontainers"
"github.com/Microsoft/hcsshim/internal/log"
"github.com/Microsoft/hcsshim/internal/oci"
"github.com/Microsoft/hcsshim/internal/processorinfo"
Expand Down Expand Up @@ -113,6 +114,39 @@ func newHcsStandaloneTask(ctx context.Context, events publisher, req *task.Creat
return shim, nil
}

// createContainer is a generic call to return either a process/hypervisor isolated container, or a job container
// based on what is set in the OCI spec.
func createContainer(ctx context.Context, id, owner, netNS string, s *specs.Spec, parent *uvm.UtilityVM, shimOpts *runhcsopts.Options) (cow.Container, *resources.Resources, error) {
var (
err error
container cow.Container
resources *resources.Resources
)

if oci.IsJobContainer(s) {
container, resources, err = jobcontainers.Create(ctx, id, s)
if err != nil {
return nil, nil, err
}
} else {
opts := &hcsoci.CreateOptions{
ID: id,
Owner: owner,
Spec: s,
HostingSystem: parent,
NetworkNamespace: netNS,
}
if shimOpts != nil {
opts.ScaleCPULimitsToSandbox = shimOpts.ScaleCpuLimitsToSandbox
}
container, resources, err = hcsoci.CreateContainer(ctx, opts)
if err != nil {
return nil, nil, err
}
}
return container, resources, nil
}

// newHcsTask creates a container within `parent` and its init exec process in
// the `shimExecCreated` state and returns the task that tracks its lifetime.
//
Expand Down Expand Up @@ -152,19 +186,7 @@ func newHcsTask(
shimOpts = v.(*runhcsopts.Options)
}

opts := hcsoci.CreateOptions{
ID: req.ID,
Owner: owner,
Spec: s,
HostingSystem: parent,
NetworkNamespace: netNS,
}

if shimOpts != nil {
opts.ScaleCPULimitsToSandbox = shimOpts.ScaleCpuLimitsToSandbox
}

system, resources, err := hcsoci.CreateContainer(ctx, &opts)
container, resources, err := createContainer(ctx, req.ID, owner, netNS, s, parent, shimOpts)
if err != nil {
return nil, err
}
Expand All @@ -173,7 +195,7 @@ func newHcsTask(
events: events,
id: req.ID,
isWCOW: oci.IsWCOW(s),
c: system,
c: container,
cr: resources,
ownsHost: ownsParent,
host: parent,
Expand All @@ -186,7 +208,7 @@ func newHcsTask(
events,
req.ID,
parent,
system,
container,
req.ID,
req.Bundle,
ht.isWCOW,
Expand Down
3 changes: 3 additions & 0 deletions cmd/containerd-shim-runhcs-v1/task_wcow_podsandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ func (wpst *wcowPodSandboxTask) Share(ctx context.Context, req *shimdiag.ShareRe

func (wpst *wcowPodSandboxTask) Stats(ctx context.Context) (*stats.Statistics, error) {
stats := &stats.Statistics{}
if wpst.host == nil {
return stats, nil
}
vmStats, err := wpst.host.Stats(ctx)
if err != nil && !isStatsNotFound(err) {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions internal/hcsoci/resources_lcow.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func allocateLinuxResources(ctx context.Context, coi *createOptionsInternal, r *
containerRootInUVM := r.ContainerRootInUVM()
if coi.Spec.Windows != nil && len(coi.Spec.Windows.LayerFolders) > 0 {
log.G(ctx).Debug("hcsshim::allocateLinuxResources mounting storage")
rootPath, err := layers.MountContainerLayers(ctx, coi.Spec.Windows.LayerFolders, containerRootInUVM, coi.HostingSystem)
rootPath, err := layers.MountContainerLayers(ctx, coi.Spec.Windows.LayerFolders, containerRootInUVM, "", coi.HostingSystem)
if err != nil {
return errors.Wrap(err, "failed to mount container storage")
}
coi.Spec.Root.Path = rootPath
layers := layers.NewImageLayers(coi.HostingSystem, containerRootInUVM, coi.Spec.Windows.LayerFolders, isSandbox)
layers := layers.NewImageLayers(coi.HostingSystem, containerRootInUVM, coi.Spec.Windows.LayerFolders, "", isSandbox)
r.SetLayers(layers)
} else if coi.Spec.Root.Path != "" {
// This is the "Plan 9" root filesystem.
Expand Down
4 changes: 2 additions & 2 deletions internal/hcsoci/resources_wcow.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func allocateWindowsResources(ctx context.Context, coi *createOptionsInternal, r
if coi.Spec.Root.Path == "" && (coi.HostingSystem != nil || coi.Spec.Windows.HyperV == nil) {
log.G(ctx).Debug("hcsshim::allocateWindowsResources mounting storage")
containerRootInUVM := r.ContainerRootInUVM()
containerRootPath, err := layers.MountContainerLayers(ctx, coi.Spec.Windows.LayerFolders, containerRootInUVM, coi.HostingSystem)
containerRootPath, err := layers.MountContainerLayers(ctx, coi.Spec.Windows.LayerFolders, containerRootInUVM, "", coi.HostingSystem)
if err != nil {
return errors.Wrap(err, "failed to mount container storage")
}
coi.Spec.Root.Path = containerRootPath
layers := layers.NewImageLayers(coi.HostingSystem, containerRootInUVM, coi.Spec.Windows.LayerFolders, isSandbox)
layers := layers.NewImageLayers(coi.HostingSystem, containerRootInUVM, coi.Spec.Windows.LayerFolders, "", isSandbox)
r.SetLayers(layers)
}

Expand Down
90 changes: 26 additions & 64 deletions internal/jobcontainers/jobcontainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"github.com/Microsoft/hcsshim/internal/layers"
"github.com/Microsoft/hcsshim/internal/log"
"github.com/Microsoft/hcsshim/internal/queue"
"github.com/Microsoft/hcsshim/internal/resources"
"github.com/Microsoft/hcsshim/internal/winapi"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/windows"
)

Expand Down Expand Up @@ -66,7 +66,6 @@ type JobContainer struct {
spec *specs.Spec // OCI spec used to create the container
job *jobobject.JobObject // Object representing the job object the container owns
sandboxMount string // Path to where the sandbox is mounted on the host
m sync.Mutex
closedWaitOnce sync.Once
init initProc
startTimestamp time.Time
Expand All @@ -89,33 +88,21 @@ func newJobContainer(id string, s *specs.Spec) *JobContainer {
}

// Create creates a new JobContainer from `s`.
func Create(ctx context.Context, id string, s *specs.Spec) (_ cow.Container, err error) {
func Create(ctx context.Context, id string, s *specs.Spec) (_ cow.Container, _ *resources.Resources, err error) {
log.G(ctx).WithField("id", id).Debug("Creating job container")

if s == nil {
return nil, errors.New("Spec must be supplied")
return nil, nil, errors.New("Spec must be supplied")
}

if id == "" {
g, err := guid.NewV4()
if err != nil {
return nil, err
return nil, nil, err
}
id = g.String()
}

if err := mountLayers(ctx, s); err != nil {
return nil, errors.Wrap(err, "failed to mount container layers")
}

volumeGUIDRegex := `^\\\\\?\\(Volume)\{{0,1}[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}(\}){0,1}\}(|\\)$`
if matched, err := regexp.MatchString(volumeGUIDRegex, s.Root.Path); !matched || err != nil {
return nil, fmt.Errorf(`invalid container spec - Root.Path '%s' must be a volume GUID path in the format '\\?\Volume{GUID}\'`, s.Root.Path)
}
if s.Root.Path[len(s.Root.Path)-1] != '\\' {
s.Root.Path += `\` // Be nice to clients and make sure well-formed for back-compat
}

container := newJobContainer(id, s)

// Create the job object all processes will run in.
Expand All @@ -125,52 +112,50 @@ func Create(ctx context.Context, id string, s *specs.Spec) (_ cow.Container, err
}
job, err := jobobject.Create(ctx, options)
if err != nil {
return nil, errors.Wrap(err, "failed to create job object")
return nil, nil, errors.Wrap(err, "failed to create job object")
}

// Parity with how we handle process isolated containers. We set the same flag which
// behaves the same way for a silo.
if err := job.SetTerminateOnLastHandleClose(); err != nil {
return nil, errors.Wrap(err, "failed to set terminate on last handle close on job container")
return nil, nil, errors.Wrap(err, "failed to set terminate on last handle close on job container")
}
container.job = job

var path string
r := resources.NewContainerResources(id)
defer func() {
if err != nil {
container.Close()
if path != "" {
_ = removeSandboxMountPoint(ctx, path)
}
_ = resources.ReleaseResources(ctx, r, nil, true)
}
}()

limits, err := specToLimits(ctx, id, s)
if err != nil {
return nil, errors.Wrap(err, "failed to convert OCI spec to job object limits")
sandboxPath := fmt.Sprintf(sandboxMountFormat, id)
if err := mountLayers(ctx, s, sandboxPath); err != nil {
return nil, nil, errors.Wrap(err, "failed to mount container layers")
}
container.sandboxMount = sandboxPath

// Set resource limits on the job object based off of oci spec.
if err := job.SetResourceLimits(limits); err != nil {
return nil, errors.Wrap(err, "failed to set resource limits")
layers := layers.NewImageLayers(nil, "", s.Windows.LayerFolders, sandboxPath, false)
r.SetLayers(layers)

volumeGUIDRegex := `^\\\\\?\\(Volume)\{{0,1}[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}(\}){0,1}\}(|\\)$`
if matched, err := regexp.MatchString(volumeGUIDRegex, s.Root.Path); !matched || err != nil {
return nil, nil, fmt.Errorf(`invalid container spec - Root.Path '%s' must be a volume GUID path in the format '\\?\Volume{GUID}\'`, s.Root.Path)
}

// Setup directory sandbox volume will be mounted
sandboxPath := fmt.Sprintf(sandboxMountFormat, id)
if _, err := os.Stat(sandboxPath); os.IsNotExist(err) {
if err := os.MkdirAll(sandboxPath, 0777); err != nil {
return nil, errors.Wrap(err, "failed to create mounted folder")
}
limits, err := specToLimits(ctx, id, s)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to convert OCI spec to job object limits")
}
path = sandboxPath

if err := mountSandboxVolume(ctx, path, s.Root.Path); err != nil {
return nil, errors.Wrap(err, "failed to bind payload directory on host")
// Set resource limits on the job object based off of oci spec.
if err := job.SetResourceLimits(limits); err != nil {
return nil, nil, errors.Wrap(err, "failed to set resource limits")
}

container.sandboxMount = path
go container.waitBackground(ctx)
return container, nil
return container, r, nil
}

// CreateProcess creates a process on the host, starts it, adds it to the containers
Expand Down Expand Up @@ -283,29 +268,6 @@ func (c *JobContainer) Modify(ctx context.Context, config interface{}) (err erro
return errors.New("modify not supported for job containers")
}

// Release unmounts all of the container layers. Safe to call multiple times, if no storage
// is mounted this call will just return nil.
func (c *JobContainer) Release(ctx context.Context) error {
c.m.Lock()
defer c.m.Unlock()

log.G(ctx).WithFields(logrus.Fields{
"id": c.id,
"path": c.sandboxMount,
}).Warn("removing sandbox volume mount")

if c.sandboxMount != "" {
if err := removeSandboxMountPoint(ctx, c.sandboxMount); err != nil {
return errors.Wrap(err, "failed to remove sandbox volume mount path")
}
if err := layers.UnmountContainerLayers(ctx, c.spec.Windows.LayerFolders, "", nil, layers.UnmountOperationAll); err != nil {
return errors.Wrap(err, "failed to unmount container layers")
}
c.sandboxMount = ""
}
return nil
}

// Start starts the container. There's nothing to "start" for job containers, so this just
// sets the start timestamp.
func (c *JobContainer) Start(ctx context.Context) error {
Expand Down Expand Up @@ -484,7 +446,7 @@ func (c *JobContainer) waitBackground(ctx context.Context) {
// them to exit.
<-c.init.proc.waitBlock

ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := c.Shutdown(ctx); err != nil {
_ = c.Terminate(ctx)
Expand Down
Loading

0 comments on commit bbf5589

Please sign in to comment.