From 05b557a8c0f6fb940dff595a207e1672b4728016 Mon Sep 17 00:00:00 2001 From: Alex Price Date: Mon, 2 Mar 2020 13:59:48 +1100 Subject: [PATCH] agent: add configurable container pipe size cmdline option Adds a cmdline option to configure the stdout/stderr pipe sizes. Uses `F_SETPIPE_SZ` to resize the write side of the pipe after creation. Example Cmdline option: `agent.container_pipe_size=2097152` fixes #755 Signed-off-by: Alex Price --- README.md | 11 +++++++++++ agent.go | 3 +++ config.go | 7 +++++++ config_test.go | 38 ++++++++++++++++++++++++++++++++++++++ grpc.go | 25 +++++++++++++++++++++++-- grpc_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 122 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 8cc42f3f4..cec65a4c0 100644 --- a/README.md +++ b/README.md @@ -88,5 +88,16 @@ Set `agent.unified_cgroup_hierarchy` to `0` or `false` to disable cgroups v2. Fo example, `agent.unified_cgroup_hierarchy=0` will disable cgroups v2 in the guest. By default cgroups v2 is disabled. +## Container Pipe Size + +The agent will configure a [Pipe][3] for stdio (stdout, stderr, stdin) for each container. By default, +this will use the OS' defaults in terms of pipe capacity. However, some workloads may require a larger pipe +when writing to stdout/stderr in non-blocking mode. + +The pipe's capacity for stdout/stderr can be modified by specifying the `agent.container_pipe_size` flag +to the guest kernel command line. For example, `agent.container_pipe_size=2097152` will set the stdout and stderr +pipes to 2097152 bytes. + [1]: https://github.com/firecracker-microvm/firecracker/blob/master/docs/vsock.md [2]: https://golang.org/pkg/time/#ParseDuration +[3]: http://man7.org/linux/man-pages/man7/pipe.7.html diff --git a/agent.go b/agent.go index 3393f8b2f..ce16f6d00 100644 --- a/agent.go +++ b/agent.go @@ -187,6 +187,9 @@ var logLevel = defaultLogLevel // Specify whether the agent has to use cgroups v2 or not. var unifiedCgroupHierarchy = false +// Size in bytes of the stdout/stderr pipes created for each container. +var containerPipeSize = uint32(0) + // commType is used to denote the communication channel type used. type commType int diff --git a/config.go b/config.go index a53978939..45300961b 100644 --- a/config.go +++ b/config.go @@ -28,6 +28,7 @@ const ( debugConsoleVPortFlag = optionPrefix + "debug_console_vport" hotplugTimeoutFlag = optionPrefix + "hotplug_timeout" unifiedCgroupHierarchyFlag = optionPrefix + "unified_cgroup_hierarchy" + containerPipeSizeFlag = optionPrefix + "container_pipe_size" traceModeStatic = "static" traceModeDynamic = "dynamic" traceTypeIsolated = "isolated" @@ -123,6 +124,12 @@ func parseCmdlineOption(option string) error { if timeout > 0 { hotplugTimeout = timeout } + case containerPipeSizeFlag: + size, err := strconv.ParseUint(split[valuePosition], 10, 32) + if err != nil { + return err + } + containerPipeSize = uint32(size) case traceModeFlag: switch split[valuePosition] { case traceTypeIsolated: diff --git a/config_test.go b/config_test.go index c9a743031..2a231336d 100644 --- a/config_test.go +++ b/config_test.go @@ -448,3 +448,41 @@ func TestParseCmdlineOptionUnifiedCgroupHierarchy(t *testing.T) { assert.Equal(d.expected, unifiedCgroupHierarchy) } } + +func TestParseCmdlineOptionContainerPipeSize(t *testing.T) { + assert := assert.New(t) + + type testData struct { + option string + shouldErr bool + expectedContainerPipeSize uint32 + } + + data := []testData{ + {"", false, 0}, + {"container_pip_siz", false, 0}, + {"container_pipe_size", false, 0}, + {"container_pipe_size=3", false, 0}, + {"agnt.container_pipe_size=3", false, 0}, + {"agent.container_pipe_size=3", false, 3}, + {"agent.container_pipe_size=2097152", false, 2097152}, + {"agent.container_pipe_size=-1", true, 0}, + {"agent.container_pipe_size=foobar", true, 0}, + {"agent.container_pipe_size=5.0", true, 0}, + {"agent.container_pipe_size=0", false, 0}, + } + + for i, d := range data { + // reset the container pipe size + containerPipeSize = uint32(0) + + err := parseCmdlineOption(d.option) + if d.shouldErr { + assert.Error(err) + } else { + assert.NoError(err) + } + + assert.Equal(d.expectedContainerPipeSize, containerPipeSize, "test %d (%+v)", i, d) + } +} diff --git a/grpc.go b/grpc.go index 4c0dea39e..59572314c 100644 --- a/grpc.go +++ b/grpc.go @@ -344,12 +344,12 @@ func buildProcess(agentProcess *pb.Process, procID string, init bool) (*process, return nil, err } - rStdout, wStdout, err := os.Pipe() + rStdout, wStdout, err := createExtendedPipe() if err != nil { return nil, err } - rStderr, wStderr, err := os.Pipe() + rStderr, wStderr, err := createExtendedPipe() if err != nil { return nil, err } @@ -1791,3 +1791,24 @@ func (a *agentGRPC) StopTracing(ctx context.Context, req *pb.StopTracingRequest) return emptyResp, nil } + +// createExtendedPipe creates a pipe. +// Optionally extends the pipe if containerPipeSize is positive. +func createExtendedPipe() (*os.File, *os.File, error) { + r, w, err := os.Pipe() + if err != nil { + return nil, nil, err + } + if containerPipeSize > 0 { + extendPipe(r, w) + } + return r, w, nil +} + +// extendPipe extends the write side of the pipe to value of containerPipeSize +func extendPipe(r, w *os.File) { + _, _, errNo := syscall.Syscall(syscall.SYS_FCNTL, w.Fd(), syscall.F_SETPIPE_SZ, uintptr(containerPipeSize)) + if errNo != 0 { + agentLog.WithField("size", containerPipeSize).WithError(errNo).Error("Could not extend write side of pipe") + } +} diff --git a/grpc_test.go b/grpc_test.go index 23e264074..e69102b05 100644 --- a/grpc_test.go +++ b/grpc_test.go @@ -16,6 +16,7 @@ import ( "reflect" "sort" "strconv" + "strings" "syscall" "testing" "time" @@ -1800,3 +1801,42 @@ func TestLoadKernelModule(t *testing.T) { err = loadKernelModule(m) assert.NoError(err) } + +func TestCreateExtendedPipe(t *testing.T) { + assert := assert.New(t) + + // Test the default + containerPipeSize = 0 + _, _, err := createExtendedPipe() + assert.NoError(err) + + // Test setting to the max size + maxSize, err := getPipeMaxSize() + assert.NoError(err) + containerPipeSize = maxSize + _, w, err := createExtendedPipe() + assert.NoError(err) + size, err := getPipeSize(w) + assert.Equal(syscall.Errno(0), err) + assert.Equal(containerPipeSize, size) +} + +func getPipeSize(f *os.File) (uint32, error) { + r1, _, err := syscall.Syscall(syscall.SYS_FCNTL, f.Fd(), syscall.F_GETPIPE_SZ, 0) + return uint32(r1), err +} + +func getPipeMaxSize() (uint32, error) { + f, err := os.Open("/proc/sys/fs/pipe-max-size") + if err != nil { + return 0, err + } + defer f.Close() + b, err := ioutil.ReadAll(f) + if err != nil { + return 0, err + } + s := strings.Trim(string(b), "\n") + u, err := strconv.ParseUint(s, 10, 32) + return uint32(u), err +}