Skip to content

Commit

Permalink
feature: implement attach method of stream server
Browse files Browse the repository at this point in the history
Signed-off-by: YaoZengzeng <yaozengzeng@zju.edu.cn>
  • Loading branch information
YaoZengzeng committed Mar 13, 2018
1 parent ce053c5 commit 23acbf1
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 74 deletions.
14 changes: 7 additions & 7 deletions cri/stream/remotecommand/attach.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package remotecommand

import (
"fmt"
"net/http"
"time"
)

// Attacher knows how to attach a running container in a pod.
type Attacher interface {
// Attach attaches to the running container in the pod.
Attach() error
Attach(containerID string, streamOpts *Options, streams *Streams) error
}

// ServeAttach handles requests to attach to a container. After creating/receiving the required
Expand All @@ -22,9 +21,10 @@ func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, co
}
defer ctx.conn.Close()

// Hardcode to pass CI, implement it later.
fmt.Fprintf(ctx.stdoutStream, "hello\n")

// Actuall it's a bug of cri-tools v1.0.0-alpha.0, workaround it.
time.Sleep(1 * time.Second)
attacher.Attach(container, streamOpts, &Streams{
StreamCh: make(chan struct{}, 1),
StdinStream: ctx.stdinStream,
StdoutStream: ctx.stdoutStream,
StderrStream: ctx.stderrStream,
})
}
6 changes: 3 additions & 3 deletions cri/stream/remotecommand/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, cont
defer ctx.conn.Close()

executor.Exec(container, cmd, streamOpts, &Streams{
StdinStream: ctx.stdinStream,
StdoutStream: ctx.stdoutStream,
StderrStream: ctx.stderrStream,
StdinStream: ctx.stdinStream,
StdoutStream: ctx.stdoutStream,
StderrStream: ctx.stderrStream,
})
}
54 changes: 51 additions & 3 deletions cri/stream/remotecommand/httpstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ type Options struct {
// Streams contains all the streams used to stdio for
// remote command execution.
type Streams struct {
StdinStream io.ReadCloser
StdoutStream io.WriteCloser
StderrStream io.WriteCloser
// Notified from StreamCh if streams broken.
StreamCh chan struct{}
StdinStream io.ReadCloser
StdoutStream io.WriteCloser
StderrStream io.WriteCloser
}

// context contains the connection and streams used when
Expand Down Expand Up @@ -90,6 +92,8 @@ func createHTTPStreamStreams(w http.ResponseWriter, req *http.Request, opts *Opt
case "":
logrus.Infof("Client did not request protocol negotiation. Falling back to %q", constant.StreamProtocolV1Name)
fallthrough
case constant.StreamProtocolV2Name:
handler = &v2ProtocolHandler{}
case constant.StreamProtocolV1Name:
handler = &v1ProtocolHandler{}
}
Expand Down Expand Up @@ -137,6 +141,50 @@ type protocolHandler interface {
waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error)
}

// v2ProtocolHandler implements the V2 protocol version for streaming command execution.
type v2ProtocolHandler struct{}

func (*v2ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
ctx := &context{}
receivedStreams := 0
replyChan := make(chan struct{})
stop := make(chan struct{})
defer close(stop)
WaitForStreams:
for {
select {
case stream := <-streams:
streamType := stream.Headers().Get(constant.StreamType)
switch streamType {
case constant.StreamTypeError:
go waitStreamReply(stream.replySent, replyChan, stop)
case constant.StreamTypeStdin:
ctx.stdinStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case constant.StreamTypeStdout:
ctx.stdoutStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case constant.StreamTypeStderr:
ctx.stderrStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
default:
logrus.Errorf("Unexpected stream type: %q", streamType)
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
}
case <-expired:
// TODO find a way to return the error to the user. Maybe use a separate
// stream to report errors?
return nil, fmt.Errorf("timed out waiting for client to create streams")
}
}

return ctx, nil
}

// v1ProtocolHandler implements the V1 protocol version for streaming command execution.
type v1ProtocolHandler struct{}

Expand Down
4 changes: 2 additions & 2 deletions cri/stream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
// TODO: StreamProtocolV2Name, StreamProtocolV3Name, StreamProtocolV4Name support.

// SupportedStreamingProtocols is the streaming protocols which server supports.
var SupportedStreamingProtocols = []string{constant.StreamProtocolV1Name}
var SupportedStreamingProtocols = []string{constant.StreamProtocolV1Name, constant.StreamProtocolV2Name}

// SupportedPortForwardProtocols is the portforward protocols which server supports.
var SupportedPortForwardProtocols = []string{constant.PortForwardProtocolV1Name}
Expand All @@ -56,7 +56,7 @@ type Runtime interface {
Exec(containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error

// Attach attaches to pod.
Attach() error
Attach(containerID string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error

// PortForward forward port to pod.
PortForward(name string, port int32, stream io.ReadWriteCloser) error
Expand Down
132 changes: 98 additions & 34 deletions daemon/containerio/container_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,19 @@ func NewIO(opt *Option) *IO {
}
}

// AddBackend adds more backends to container's stdio.
func (io *IO) AddBackend(opt *Option) {
backends := createBackend(opt)

for t, s := range map[stdioType]*ContainerIO{
stdin: io.Stdin,
stdout: io.Stdout,
stderr: io.Stderr,
} {
s.add(opt, t, backends)
}
}

// Close closes the container's io.
func (io *IO) Close() error {
io.Stderr.Close()
Expand All @@ -61,30 +74,59 @@ func (io *IO) Close() error {
// ContainerIO used to control the container's stdio.
type ContainerIO struct {
Option
backends map[string]containerBackend
backends []containerBackend
total int64
typ stdioType
closed bool
// The stdin of all backends should put into ring first.
ring *ringbuff.RingBuff
}

func (cio *ContainerIO) add(opt *Option, typ stdioType, backends map[string]containerBackend) {
if typ == stdin {
for _, b := range backends {
if b.backend.Name() == opt.stdinBackend {
cio.backends = append(cio.backends, b)
go func(b containerBackend) {
cio.converge(b.backend.Name(), opt.id, b.backend.In())
b.backend.Close()
}(b)
break
}
}
} else {
for _, b := range backends {
cio.backends = append(cio.backends, b)
}
}
}

func create(opt *Option, typ stdioType, backends map[string]containerBackend) *ContainerIO {
io := &ContainerIO{
backends: backends,
total: 0,
typ: typ,
closed: false,
Option: *opt,
total: 0,
typ: typ,
closed: false,
Option: *opt,
}

if typ == stdin {
io.backends = make(map[string]containerBackend)

io.ring = ringbuff.New(10)
for _, b := range backends {
if b.backend.Name() == opt.stdinBackend {
io.backends[opt.stdinBackend] = b
io.backends = append(io.backends, b)
go func(b containerBackend) {
// For backend with stdin, close it if stdin finished.
io.converge(b.backend.Name(), opt.id, b.backend.In())
b.backend.Close()
b.ring.Close()
}(b)
break
}
}
} else {
for _, b := range backends {
io.backends = append(io.backends, b)
}
}

return io
Expand Down Expand Up @@ -124,69 +166,70 @@ func createBackend(opt *Option) map[string]containerBackend {
}

// OpenStdin returns open container's stdin or not.
func (io *ContainerIO) OpenStdin() bool {
if io.typ != stdin {
func (cio *ContainerIO) OpenStdin() bool {
if cio.typ != stdin {
return false
}
if io.closed {
if cio.closed {
return false
}
return len(io.backends) != 0
return len(cio.backends) != 0
}

// Read implements the standard Read interface.
func (io *ContainerIO) Read(p []byte) (int, error) {
if io.typ != stdin {
return 0, fmt.Errorf("invalid container io type: %s, id: %s", io.typ, io.id)
func (cio *ContainerIO) Read(p []byte) (int, error) {
if cio.typ != stdin {
return 0, fmt.Errorf("invalid container io type: %s, id: %s", cio.typ, cio.id)
}
if io.closed {
if cio.closed {
return 0, fmt.Errorf("container io is closed")
}

if len(io.backends) == 0 {
block := make(chan struct{})
<-block
value, _ := cio.ring.Pop()
data, ok := value.([]byte)
if !ok {
return 0, nil
}
n := copy(p, data)

backend := io.backends[io.stdinBackend]

return backend.backend.In().Read(p)
return n, nil
}

// Write implements the standard Write interface.
func (io *ContainerIO) Write(data []byte) (int, error) {
if io.typ == stdin {
return 0, fmt.Errorf("invalid container io type: %s, id: %s", io.typ, io.id)
func (cio *ContainerIO) Write(data []byte) (int, error) {
if cio.typ == stdin {
return 0, fmt.Errorf("invalid container io type: %s, id: %s", cio.typ, cio.id)
}
if io.closed {
if cio.closed {
return 0, fmt.Errorf("container io is closed")
}

if io.typ == discard {
if cio.typ == discard {
return len(data), nil
}

for _, b := range io.backends {
for _, b := range cio.backends {
if cover := b.ring.Push(data); cover {
logrus.Warnf("cover data, backend: %s, id: %s", b.backend.Name(), io.id)
logrus.Warnf("cover data, backend: %s, id: %s", b.backend.Name(), cio.id)
}
}

return len(data), nil
}

// Close implements the standard Close interface.
func (io *ContainerIO) Close() error {
for name, b := range io.backends {
func (cio *ContainerIO) Close() error {
for _, b := range cio.backends {
// we need to close ringbuf before close backend, because close ring will flush
// the remain data into backend.
name := b.backend.Name()
b.ring.Close()
b.backend.Close()

logrus.Infof("close containerio backend: %s, id: %s", name, io.id)
logrus.Infof("close containerio backend: %s, id: %s", name, cio.id)
}

io.closed = true
cio.closed = true
return nil
}

Expand Down Expand Up @@ -215,3 +258,24 @@ func subscribe(name, id string, ring *ringbuff.RingBuff, out io.Writer) {

logrus.Infof("finished to subscribe io, backend: %s, id: %s", name, id)
}

// converge be called in a goroutine.
func (cio *ContainerIO) converge(name, id string, in io.Reader) {
// TODO: we should implement this function more elegant and robust.
logrus.Infof("start to converge io, backend: %s, id: %s", name, id)

data := make([]byte, 128)
for {
n, err := in.Read(data)
if err != nil {
logrus.Errorf("failed to read from backend: %s, id: %s, %v", name, id, err)
break
}
cover := cio.ring.Push(data[:n])
if cover {
logrus.Warnf("cover data, backend: %s, id: %s", name, id)
}
}

logrus.Infof("finished to converge io, backend: %s, id: %s", name, id)
}
2 changes: 1 addition & 1 deletion daemon/containerio/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Option struct {
hijackUpgrade bool
stdinBackend string
memBuffer *bytes.Buffer
streams *remotecommand.Streams
streams *remotecommand.Streams
}

// NewOption creates the Option instance.
Expand Down
Loading

0 comments on commit 23acbf1

Please sign in to comment.