Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: seperate stdout & stderr of container io and support host network mode for sandbox #945

Merged
merged 2 commits into from
Mar 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion daemon/containerio/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ type Backend interface {
// Init initializes the backend io.
Init(opt *Option) error

// Out returns the stdout/stderr.
// Out returns the stdout.
Out() io.Writer

// In returns the stdin.
In() io.Reader

// Err returns the stderr.
Err() io.Writer

// Close closes the io.
Close() error
}
Expand Down
32 changes: 23 additions & 9 deletions daemon/containerio/container_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func create(opt *Option, typ stdioType, backends map[string]containerBackend) *C
// For backend with stdin, close it if stdin finished.
io.converge(b.backend.Name(), opt.id, b.backend.In())
b.backend.Close()
b.ring.Close()
}(b)
break
}
Expand Down Expand Up @@ -149,16 +148,20 @@ func createBackend(opt *Option) map[string]containerBackend {

backends[backend.Name()] = containerBackend{
backend: backend,
ring: ringbuff.New(10),
outRing: ringbuff.New(10),
errRing: ringbuff.New(10),
}
}

// start to subscribe ring buffer.
// start to subscribe stdout and stderr ring buffer.
for _, b := range backends {

// the goroutine don't exit forever.
go func(b containerBackend) {
subscribe(b.backend.Name(), opt.id, b.ring, b.backend.Out())
subscribe(b.backend.Name(), opt.id, b.outRing, b.backend.Out())
}(b)
go func(b containerBackend) {
subscribe(b.backend.Name(), opt.id, b.errRing, b.backend.Err())
}(b)
}

Expand Down Expand Up @@ -208,9 +211,18 @@ func (cio *ContainerIO) Write(data []byte) (int, error) {
return len(data), nil
}

for _, b := range cio.backends {
if cover := b.ring.Push(data); cover {
logrus.Warnf("cover data, backend: %s, id: %s", b.backend.Name(), cio.id)
switch cio.typ {
case stdout:
for _, b := range cio.backends {
if cover := b.outRing.Push(data); cover {
logrus.Warnf("cover data, backend: %s, id: %s", b.backend.Name(), cio.id)
}
}
case stderr:
for _, b := range cio.backends {
if cover := b.errRing.Push(data); cover {
logrus.Warnf("cover data, backend: %s, id: %s", b.backend.Name(), cio.id)
}
}
}

Expand All @@ -223,7 +235,8 @@ func (cio *ContainerIO) Close() error {
// we need to close ringbuf before close backend, because close ring will flush
// the remain data into backend.
name := b.backend.Name()
b.ring.Close()
b.outRing.Close()
b.errRing.Close()
b.backend.Close()

logrus.Infof("close containerio backend: %s, id: %s", name, cio.id)
Expand All @@ -235,7 +248,8 @@ func (cio *ContainerIO) Close() error {

type containerBackend struct {
backend Backend
ring *ringbuff.RingBuff
outRing *ringbuff.RingBuff
errRing *ringbuff.RingBuff
}

// subscribe be called in a groutine.
Expand Down
28 changes: 19 additions & 9 deletions daemon/containerio/cri_log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ func init() {
}

type criLogFile struct {
file *os.File
pipeWriter *io.PipeWriter
pipeReader *io.PipeReader
closed bool
file *os.File
outPipeWriter *io.PipeWriter
outPipeReader *io.PipeReader
errPipeWriter *io.PipeWriter
errPipeReader *io.PipeReader
closed bool
}

func (c *criLogFile) Name() string {
Expand All @@ -55,9 +57,10 @@ func (c *criLogFile) Name() string {

func (c *criLogFile) Init(opt *Option) error {
c.file = opt.criLogFile
c.pipeReader, c.pipeWriter = io.Pipe()
// TODO: redirect stderr.
go redirectLogs(c.file, c.pipeReader, Stdout)
c.outPipeReader, c.outPipeWriter = io.Pipe()
c.errPipeReader, c.errPipeWriter = io.Pipe()
go redirectLogs(c.file, c.outPipeReader, Stdout)
go redirectLogs(c.file, c.errPipeReader, Stderr)
return nil
}

Expand Down Expand Up @@ -86,6 +89,7 @@ func redirectLogs(w io.WriteCloser, r io.ReadCloser, stream StreamType) {
timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano)
data := bytes.Join([][]byte{timestampBytes, streamBytes, tagBytes, lineBytes}, delimiterBytes)
data = append(data, eol)
// TODO: maybe lock here?
_, err = w.Write(data)
if err != nil {
logrus.Errorf("failed to write %q log to log file: %v", stream, err)
Expand All @@ -94,7 +98,11 @@ func redirectLogs(w io.WriteCloser, r io.ReadCloser, stream StreamType) {
}

func (c *criLogFile) Out() io.Writer {
return c.pipeWriter
return c.outPipeWriter
}

func (c *criLogFile) Err() io.Writer {
return c.errPipeWriter
}

func (c *criLogFile) In() io.Reader {
Expand All @@ -107,5 +115,7 @@ func (c *criLogFile) Close() error {
return nil
}
c.closed = true
return c.pipeWriter.Close()
c.outPipeWriter.Close()
c.errPipeWriter.Close()
return nil
}
4 changes: 4 additions & 0 deletions daemon/containerio/discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func (d *discardIO) Out() io.Writer {
return d
}

func (d *discardIO) Err() io.Writer {
return d
}

func (d *discardIO) In() io.Reader {
return d
}
Expand Down
4 changes: 4 additions & 0 deletions daemon/containerio/hijack_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (h *hijackConn) In() io.Reader {
return h
}

func (h *hijackConn) Err() io.Writer {
return h
}

func (h *hijackConn) Close() error {
if h.closed {
return nil
Expand Down
4 changes: 4 additions & 0 deletions daemon/containerio/mem_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (b *memBuffer) In() io.Reader {
return b.buffer
}

func (b *memBuffer) Err() io.Writer {
return b.buffer
}

func (b *memBuffer) Close() error {
// Don't need to close bytes.Buffer.
return nil
Expand Down
4 changes: 4 additions & 0 deletions daemon/containerio/raw_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (r *rawFile) In() io.Reader {
return r.file
}

func (r *rawFile) Err() io.Writer {
return r.file
}

func (r *rawFile) Close() error {
if r.closed {
return nil
Expand Down
4 changes: 4 additions & 0 deletions daemon/containerio/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (s *streamIO) In() io.Reader {
return s.streams.StdinStream
}

func (s *streamIO) Err() io.Writer {
return s.streams.StderrStream
}

func (s *streamIO) Close() error {
if s.closed {
return nil
Expand Down
9 changes: 9 additions & 0 deletions daemon/mgr/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ func (c *CriManager) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
return nil, fmt.Errorf("failed to setup sandbox files: %v", err)
}

securityContext := config.GetLinux().GetSecurityContext()
hostNet := securityContext.GetNamespaceOptions().GetHostNetwork()
// If it is in host network, no need to configure the network of sandbox.
if hostNet {
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil
}

// Step 4: Setup networking for the sandbox.
container, err := c.ContainerMgr.Get(ctx, id)
if err != nil {
Expand Down Expand Up @@ -234,6 +241,8 @@ func (c *CriManager) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandb
return nil, fmt.Errorf("failed to parse metadata of sandbox %q from container name: %v", podSandboxID, err)
}

// TODO: how to figure out if the network is in host mode?
// Maybe we need to store some configuration of sandbox.
err = c.CniMgr.TearDownPodNetwork(&ocicni.PodNetwork{
Name: metadata.GetName(),
Namespace: metadata.GetNamespace(),
Expand Down
4 changes: 2 additions & 2 deletions hack/cri-test/test-cri.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ POUCH_SOCK="/var/run/pouchcri.sock"

# CRI_FOCUS focuses the test to run.
# With the CRI manager completes its function, we may need to expand this field.
CRI_FOCUS=${CRI_FOCUS:-"PodSandbox|AppArmor|Runtime info|Container|Networking|Streaming|Security Context"}
CRI_FOCUS=${CRI_FOCUS:-}

# CRI_SKIP skips the test to skip.
CRI_SKIP=${CRI_SKIP:-"RunAsUserName|HostNetwork|ReadOnlyRootfs is true|seccomp localhost"}
CRI_SKIP=${CRI_SKIP:-"RunAsUserName|seccomp localhost|SELinux|public image with digest|listImage should get exactly 2 repoTags"}
# REPORT_DIR is the the directory to store test logs.
REPORT_DIR=${REPORT_DIR:-"/tmp/test-cri"}

Expand Down