Skip to content

Commit

Permalink
agent: add configurable container pipe size cmdline option
Browse files Browse the repository at this point in the history
Adds a cmdline option to configure the stdout/stderr pipe sizes.
Uses `F_SETPIPE_SZ` to resize the write side of the pipe after
creation.

Example Cmdline option: `agent.container_pipe_size=2097152`

fixes kata-containers#755

Signed-off-by: Alex Price <aprice@atlassian.com>
  • Loading branch information
awprice committed Mar 12, 2020
1 parent db22a98 commit b9be1ad
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 2 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,16 @@ Set `agent.unified_cgroup_hierarchy` to `0` or `false` to disable cgroups v2. Fo
example, `agent.unified_cgroup_hierarchy=0` will disable cgroups v2 in the guest.
By default cgroups v2 is disabled.

## Container Pipe Size

The agent will configure a [Pipe][3] for stdio (stdout, stderr, stdin) for each container. By default,
this will use the OS' defaults in terms of pipe capacity. However, some workloads may require a larger pipe
when writing to stdout/stderr in non-blocking mode.

The pipe's capacity for stdout/stderr can be modified by specifying the `agent.container_pipe_size` flag
to the guest kernel command line. For example, `agent.container_pipe_size=2097152` will set the stdout and stderr
pipes to 2097152 bytes.

[1]: https://github.com/firecracker-microvm/firecracker/blob/master/docs/vsock.md
[2]: https://golang.org/pkg/time/#ParseDuration
[3]: http://man7.org/linux/man-pages/man7/pipe.7.html
3 changes: 3 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ var logLevel = defaultLogLevel
// Specify whether the agent has to use cgroups v2 or not.
var unifiedCgroupHierarchy = false

// Size of the stdout/stderr pipes created for each container.
var containerPipeSize = uint32(0)

// commType is used to denote the communication channel type used.
type commType int

Expand Down
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
debugConsoleVPortFlag = optionPrefix + "debug_console_vport"
hotplugTimeoutFlag = optionPrefix + "hotplug_timeout"
unifiedCgroupHierarchyFlag = optionPrefix + "unified_cgroup_hierarchy"
containerPipeSizeFlag = optionPrefix + "container_pipe_size"
traceModeStatic = "static"
traceModeDynamic = "dynamic"
traceTypeIsolated = "isolated"
Expand Down Expand Up @@ -123,6 +124,12 @@ func parseCmdlineOption(option string) error {
if timeout > 0 {
hotplugTimeout = timeout
}
case containerPipeSizeFlag:
size, err := strconv.ParseUint(split[valuePosition], 10, 32)
if err != nil {
return err
}
containerPipeSize = uint32(size)
case traceModeFlag:
switch split[valuePosition] {
case traceTypeIsolated:
Expand Down
38 changes: 38 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,3 +448,41 @@ func TestParseCmdlineOptionUnifiedCgroupHierarchy(t *testing.T) {
assert.Equal(d.expected, unifiedCgroupHierarchy)
}
}

func TestParseCmdlineOptionContainerPipeSize(t *testing.T) {
assert := assert.New(t)

type testData struct {
option string
shouldErr bool
expectedContainerPipeSize uint32
}

data := []testData{
{"", false, 0},
{"container_pip_siz", false, 0},
{"container_pipe_size", false, 0},
{"container_pipe_size=3", false, 0},
{"agnt.container_pipe_size=3", false, 0},
{"agent.container_pipe_size=3", false, 3},
{"agent.container_pipe_size=2097152", false, 2097152},
{"agent.container_pipe_size=-1", true, 0},
{"agent.container_pipe_size=foobar", true, 0},
{"agent.container_pipe_size=5.0", true, 0},
{"agent.container_pipe_size=0", false, 0},
}

for i, d := range data {
// reset the container pipe size
containerPipeSize = uint32(0)

err := parseCmdlineOption(d.option)
if d.shouldErr {
assert.Error(err)
} else {
assert.NoError(err)
}

assert.Equal(d.expectedContainerPipeSize, containerPipeSize, "test %d (%+v)", i, d)
}
}
25 changes: 23 additions & 2 deletions grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,12 @@ func buildProcess(agentProcess *pb.Process, procID string, init bool) (*process,
return nil, err
}

rStdout, wStdout, err := os.Pipe()
rStdout, wStdout, err := createExtendedPipe()
if err != nil {
return nil, err
}

rStderr, wStderr, err := os.Pipe()
rStderr, wStderr, err := createExtendedPipe()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1791,3 +1791,24 @@ func (a *agentGRPC) StopTracing(ctx context.Context, req *pb.StopTracingRequest)

return emptyResp, nil
}

// createExtendedPipe creates a pipe.
// Optionally extends the pipe if containerPipeSize is positive.
func createExtendedPipe() (*os.File, *os.File, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, nil, err
}
if containerPipeSize > 0 {
extendPipe(r, w)
}
return r, w, nil
}

// extendPipe extends the write side of the pipe to value of containerPipeSize
func extendPipe(r, w *os.File) {
_, _, errNo := syscall.Syscall(syscall.SYS_FCNTL, w.Fd(), syscall.F_SETPIPE_SZ, uintptr(containerPipeSize))
if errNo != 0 {
agentLog.WithField("size", containerPipeSize).WithError(errNo).Error("Could not extend write side of pipe")
}
}
40 changes: 40 additions & 0 deletions grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"reflect"
"sort"
"strconv"
"strings"
"syscall"
"testing"
"time"
Expand Down Expand Up @@ -1800,3 +1801,42 @@ func TestLoadKernelModule(t *testing.T) {
err = loadKernelModule(m)
assert.NoError(err)
}

func TestCreateExtendedPipe(t *testing.T) {
assert := assert.New(t)

// Test the default
containerPipeSize = 0
_, _, err := createExtendedPipe()
assert.NoError(err)

// Test setting to the max size
maxSize, err := getPipeMaxSize()
assert.NoError(err)
containerPipeSize = maxSize
_, w, err := createExtendedPipe()
assert.NoError(err)
size, err := getPipeSize(w)
assert.Equal(syscall.Errno(0), err)
assert.Equal(containerPipeSize, size)
}

func getPipeSize(f *os.File) (uint32, error) {
r1, _, err := syscall.Syscall(syscall.SYS_FCNTL, f.Fd(), syscall.F_GETPIPE_SZ, 0)
return uint32(r1), err
}

func getPipeMaxSize() (uint32, error) {
f, err := os.Open("/proc/sys/fs/pipe-max-size")
if err != nil {
return 0, err
}
defer f.Close()
b, err := ioutil.ReadAll(f)
if err != nil {
return 0, err
}
s := strings.Trim(string(b), "\n")
u, err := strconv.ParseUint(s, 10, 32)
return uint32(u), err
}

0 comments on commit b9be1ad

Please sign in to comment.