Skip to content
This repository has been archived by the owner on Feb 27, 2020. It is now read-only.

Implement video feature in docker engine #383

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions engines/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
)

type configType struct {
DockerSocket string `json:"dockerSocket"`
Privileged string `json:"privileged"`
DockerSocket string `json:"dockerSocket"`
Privileged string `json:"privileged"`
EnableDevices bool `json:"enableDevices"`
}

const (
Expand Down Expand Up @@ -49,6 +50,13 @@ var configSchema = schematypes.Object{
privilegedNever,
},
},
"enableDevices": schematypes.Boolean{
Title: "Enable host devices",
Description: util.Markdown(`
When true, this enables the support for host devices inside the container,
such as video and sound.
`),
},
},
Required: []string{
"privileged",
Expand Down
28 changes: 28 additions & 0 deletions engines/docker/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type engine struct {
config configType
networks *network.Pool
imageCache *imagecache.ImageCache
video *videoDeviceManager
}

type engineProvider struct {
Expand All @@ -42,6 +43,8 @@ func (p engineProvider) NewEngine(options engines.EngineOptions) (engines.Engine
var c configType
schematypes.MustValidateAndMap(configSchema, options.Config, &c)

debug(fmt.Sprintf("Devices enabled = %v", c.EnableDevices))

if c.DockerSocket == "" {
c.DockerSocket = "unix:///var/run/docker.sock" // default docker socket
}
Expand All @@ -54,20 +57,30 @@ func (p engineProvider) NewEngine(options engines.EngineOptions) (engines.Engine

env := options.Environment
monitor := options.Monitor
var video *videoDeviceManager
if c.EnableDevices {
video, err = newVideoDeviceManager()
if err != nil {
return nil, err
}
}

return &engine{
config: c,
docker: client,
Environment: env,
monitor: monitor,
networks: network.NewPool(client, monitor.WithPrefix("network-pool")),
imageCache: imagecache.New(client, env.GarbageCollector, monitor.WithPrefix("image-cache")),
video: video,
}, nil
}

type payloadType struct {
Image interface{} `json:"image"`
Command []string `json:"command"`
Privileged bool `json:"privileged"`
Devices []string `json:"devices"`
}

func (e *engine) PayloadSchema() schematypes.Object {
Expand All @@ -79,6 +92,16 @@ func (e *engine) PayloadSchema() schematypes.Object {
Description: "Command to run inside the container.",
Items: schematypes.String{},
},
"devices": schematypes.Array{
Title: "Devices",
Description: "List of host devices required.",
Items: schematypes.StringEnum{
Options: []string{
"video",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could just make the option of video be dependent on whether devices are enabled or not... anyways, nit... I'm still not sure what's ideal in either case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would suggest that we do:

devices: [
  {type: 'video'},
],

That way, if in some future we decide we want additional options, we can add them to the object...

},
},
Unique: true,
},
},
Required: []string{
"image",
Expand Down Expand Up @@ -107,6 +130,11 @@ func (e *engine) NewSandboxBuilder(options engines.SandboxOptions) (engines.Sand
var p payloadType
schematypes.MustValidateAndMap(e.PayloadSchema(), options.Payload, &p)

if len(p.Devices) > 0 && e.config.EnableDevices == false {
options.TaskContext.LogError(fmt.Sprintf("Task requests device %v, but device support is not enabled", p.Devices))
return nil, runtime.NewMalformedPayloadError("Devices feature is not enabled")
}

// Check if privileged == true is allowed
switch e.config.Privileged {
case privilegedAllow: // Check scope if p.Privileged is true
Expand Down
25 changes: 25 additions & 0 deletions engines/docker/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/taskcluster/taskcluster-worker/runtime"
"github.com/taskcluster/taskcluster-worker/runtime/atomics"
"github.com/taskcluster/taskcluster-worker/runtime/ioext"
funk "github.com/thoas/go-funk"
)

const dockerEngineKillTimeout = 5 * time.Second
Expand All @@ -32,6 +33,7 @@ type sandbox struct {
taskCtx *runtime.TaskContext
networkHandle *network.Handle
imageHandle *imagecache.ImageHandle
videoDev *device
}

func newSandbox(sb *sandboxBuilder) (*sandbox, error) {
Expand All @@ -52,6 +54,21 @@ func newSandbox(sb *sandboxBuilder) (*sandbox, error) {
return nil, errors.Wrap(err, "docker.CreateNetwork failed")
}

devices := []docker.Device{}
var dev *device
if funk.InStrings(sb.payload.Devices, "video") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for loop + a switch is likely better...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe when we add more options.

dev = sb.e.video.claim()
if dev == nil {
return nil, errors.New("No video device available")
}
debug(fmt.Sprintf("Claimed %s", dev.path))
devices = append(devices, docker.Device{
PathOnHost: dev.path,
PathInContainer: dev.path,
CgroupPermissions: "rwm",
})
}

// Create the container
container, err := sb.e.docker.CreateContainer(docker.CreateContainerOptions{
Config: &docker.Config{
Expand All @@ -70,6 +87,7 @@ func newSandbox(sb *sandboxBuilder) (*sandbox, error) {
// to the proxies added to proxyMux above..
ExtraHosts: []string{fmt.Sprintf("taskcluster:%s", networkHandle.Gateway())},
Mounts: sb.mounts,
Devices: devices,
},
NetworkingConfig: &docker.NetworkingConfig{
EndpointsConfig: map[string]*docker.EndpointConfig{
Expand All @@ -79,6 +97,7 @@ func newSandbox(sb *sandboxBuilder) (*sandbox, error) {
})
if err != nil {
imageHandle.Release()
sb.e.video.release(dev)
return nil, runtime.NewMalformedPayloadError(
"could not create container: " + err.Error())
}
Expand All @@ -87,6 +106,7 @@ func newSandbox(sb *sandboxBuilder) (*sandbox, error) {
storage, err := sb.e.Environment.TemporaryStorage.NewFolder()
if err != nil {
imageHandle.Release()
sb.e.video.release(dev)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably too soon to release, the container still exists...

monitor.ReportError(err, "failed to create temporary folder")
return nil, runtime.ErrFatalInternalError
}
Expand All @@ -101,6 +121,7 @@ func newSandbox(sb *sandboxBuilder) (*sandbox, error) {
"containerId": container.ID,
"networkId": networkHandle.NetworkID(),
}),
videoDev: dev,
}

// attach to the container before starting so that we get all the logs
Expand Down Expand Up @@ -277,5 +298,9 @@ func (s *sandbox) dispose() error {
if hasErr {
return runtime.ErrNonFatalInternalError
}

if s.videoDev != nil {
s.videoDev.claimed = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably best to decide what is ecapsulated where... You have a method videoDeviceManager.release

}
return nil
}
51 changes: 51 additions & 0 deletions engines/docker/video.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package dockerengine

import (
"path/filepath"
"regexp"

"github.com/pkg/errors"
funk "github.com/thoas/go-funk"
)

type device struct {
path string
claimed bool
}

type videoDeviceManager struct {
devices []device
}

func newVideoDeviceManager() (*videoDeviceManager, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest something like:

package dockerengine

import (
	"errors"
	"sync"

	"github.com/taskcluster/taskcluster-worker/runtime/atomics"
)

type deviceHandle interface {
	Path() string
	Release()
}

type videoDeviceManager struct {
	m       sync.Mutex
	devices []string
}

// newVideoDeviceManager creates a new videoDeviceManager
func newVideoDeviceManager() (*videoDeviceManager, error) {
	// TODO: Find or create devices
	devices := []string{"/dev/video0"}
	return &videoDeviceManager{devices: devices}, nil
}

// GetVideoDevice returns an idle video device
func (vdm *videoDeviceManager) GetVideoDevice() deviceHandle {
	vdm.m.Lock()
	defer vdm.m.Unlock()

	// ensure we have enough devices
	if len(vdm.devices) < 1 {
		panic(errors.New("insufficent number of video devices available"))
	}

	// Create a device handle, ensuring we can't release twice
	vdh := &videoDeviceHandle{owner: vdm}
	vdh.path, vdm.devices = vdm.devices[0], vdm.devices[1:]

	return vdh
}

// Dispose releases all resources assocated with the videoDeviceManager
func (vdm *videoDeviceManager) Dispose() error {
	vdm.m.Lock()
	defer vdm.m.Unlock()

	return nil // TODO: remove devices if any was created
}

type videoDeviceHandle struct {
	owner    *videoDeviceManager
	path     string
	released atomics.Once
}

func (vdh *videoDeviceHandle) Path() string {
	if vdh.released.IsDone() {
		panic(errors.New("videoDeviceHandle.Path() was called after release"))
	}
	return vdh.path
}

func (vdh *videoDeviceHandle) Release() string {
	ok := vdh.released.Do(func() {
		vdh.owner.m.Lock()
		defer vdh.owner.m.Unlock()

		vdh.owner.devices = append(vdh.owner.devices, vdh.path)
	})
	if !ok {
		debug("WARNING: harmless double release() of videoDeviceHandle")
	}
}

In the sandbox and resultset code you just release() the deviceHandle the same way we release imageHandle and pass it around the same way.

While techincally all this is in the same package, I only use methods starting with capital outside this file... the only exception being: newVideoDeviceManager(). This makes it easier to track life-cycles, and it is a motivation for moving device management into separate package, though that's a lot less important..

matches, err := filepath.Glob("/dev/video*")
if err != nil {
return nil, errors.Wrap(err, "Failed to call filepath.Glob function")
}

r := regexp.MustCompile("/dev/video[0-9]+")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to make the devices in here? Also I think a devices sub-package would be neat.

That way we can hide some of the locking and internals away, and the engine just deals with a device which is something that has a path... and is created when the array from payload is passed to the device manager...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know if I followed the idea, but let's keep things simple for now.

matches = funk.FilterString(matches, r.MatchString)

devices := make([]device, len(matches))
for i := range devices {
devices[i].path = matches[i]
}

return &videoDeviceManager{
devices: devices,
}, nil
}

func (d *videoDeviceManager) claim() *device {
for i := range d.devices {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

locking?

if !d.devices[i].claimed {
return &d.devices[i]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returning a reference to an object stored in a list... it's probably better to make a list pointers to devices..

Just so we don't have to think too much about happens when if we mutate the device... or something like that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At no point do you set, dev.claimed = true

}
}

return nil
}

func (d *videoDeviceManager) release(dev *device) {
dev.claimed = false
}
36 changes: 36 additions & 0 deletions engines/docker/video_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// +build dockervideo

package dockerengine

import (
"testing"

"github.com/taskcluster/taskcluster-worker/engines/enginetest"
)

// Image and tag used in test cases below
const (
videoDockerImageName = "alpine:3.6"
)

var videoProvider = &enginetest.EngineProvider{
Engine: "docker",
Config: `{
"privileged": "allow",
"enableDevices": true
}`,
}

func TestVideo(t *testing.T) {
c := enginetest.LoggingTestCase{
EngineProvider: videoProvider,
Target: "/dev/video0",
TargetPayload: `{
"command": ["sh", "-c", "ls /dev/video0"],
"devices": ["video"],
"image": "` + videoDockerImageName + `"
}`,
}

c.Test()
}
12 changes: 9 additions & 3 deletions engines/enginetest/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ func (c *LoggingTestCase) TestSilentTask() {

// Test will run all logging tests
func (c *LoggingTestCase) Test() {
c.TestLogTarget()
c.TestLogTargetWhenFailing()
c.TestSilentTask()
if len(c.TargetPayload) > 0 {
c.TestLogTarget()
}
if len(c.FailingPayload) > 0 {
c.TestLogTargetWhenFailing()
}
if len(c.SilentPayload) > 0 {
c.TestSilentTask()
}
}
6 changes: 6 additions & 0 deletions vendor/vendor.json
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,12 @@
"revision": "d4fa08268f573f25df477fedc813f26bfb833761",
"revisionTime": "2016-08-05T02:05:43Z"
},
{
"checksumSHA1": "UHPK5Fi61Zqvf/ctF4s9hwjHFGU=",
"path": "github.com/thoas/go-funk",
"revision": "d2deeb5709c1da54d5da8c76b2f65421f5ff8de4",
"revisionTime": "2018-05-05T20:14:24Z"
},
{
"checksumSHA1": "HN3pLd5cC+QXkX8j8FsCCB3FzSI=",
"path": "github.com/tinylib/msgp/msgp",
Expand Down