Skip to content

Commit

Permalink
feature: exec resize
Browse files Browse the repository at this point in the history
Signed-off-by: YaoZengzeng <yaozengzeng@zju.edu.cn>
  • Loading branch information
YaoZengzeng committed Aug 7, 2018
1 parent 3afedd0 commit 3148807
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 11 deletions.
4 changes: 2 additions & 2 deletions cri/stream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ const (
DefaultStreamCreationTimeout = 30 * time.Second
)

// TODO: StreamProtocolV2Name, StreamProtocolV3Name, StreamProtocolV4Name support.
// TODO: StreamProtocolV4Name support.

// SupportedStreamingProtocols is the streaming protocols which server supports.
var SupportedStreamingProtocols = []string{constant.StreamProtocolV1Name, constant.StreamProtocolV2Name}
var SupportedStreamingProtocols = []string{constant.StreamProtocolV1Name, constant.StreamProtocolV2Name, constant.StreamProtocolV3Name}

// SupportedPortForwardProtocols is the portforward protocols which server supports.
var SupportedPortForwardProtocols = []string{constant.PortForwardProtocolV1Name}
Expand Down
6 changes: 4 additions & 2 deletions cri/stream/remotecommand/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"fmt"
"net/http"
"time"

apitypes "github.com/alibaba/pouch/apis/types"
)

// Executor knows how to execute a command in a container of the pod.
type Executor interface {
// Exec executes a command in a container of the pod.
Exec(ctx context.Context, containerID string, cmd []string, streamOpts *Options, streams *Streams) (uint32, error)
Exec(ctx context.Context, containerID string, cmd []string, resizeChan <-chan apitypes.ResizeOptions, streamOpts *Options, streams *Streams) (uint32, error)
}

// ServeExec handles requests to execute a command in a container. After
Expand All @@ -24,7 +26,7 @@ func ServeExec(ctx context.Context, w http.ResponseWriter, req *http.Request, ex
}
defer streamCtx.conn.Close()

exitCode, err := executor.Exec(ctx, container, cmd, streamOpts, &Streams{
exitCode, err := executor.Exec(ctx, container, cmd, streamCtx.resizeChan, streamOpts, &Streams{
StdinStream: streamCtx.stdinStream,
StdoutStream: streamCtx.stdoutStream,
StderrStream: streamCtx.stderrStream,
Expand Down
89 changes: 85 additions & 4 deletions cri/stream/remotecommand/httpstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"time"

apitypes "github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/cri/stream/constant"
"github.com/alibaba/pouch/cri/stream/httpstream"
"github.com/alibaba/pouch/cri/stream/httpstream/spdy"
Expand Down Expand Up @@ -42,6 +43,7 @@ type streamContext struct {
stdoutStream io.WriteCloser
stderrStream io.WriteCloser
resizeStream io.ReadCloser
resizeChan chan apitypes.ResizeOptions
writeStatus func(status *StatusError) error
tty bool
}
Expand All @@ -67,11 +69,27 @@ func createStreams(w http.ResponseWriter, req *http.Request, opts *Options, supp
return nil, false
}

// TODO: resizeStream support.
if ctx.resizeStream != nil {
ctx.resizeChan = make(chan apitypes.ResizeOptions)
go handleResizeEvents(ctx.resizeStream, ctx.resizeChan)
}

return ctx, true
}

func handleResizeEvents(stream io.Reader, channel chan<- apitypes.ResizeOptions) {
decoder := json.NewDecoder(stream)
for {
size := apitypes.ResizeOptions{}
err := decoder.Decode(&size)
if err != nil {
close(channel)
break
}
channel <- size
}
}

func createHTTPStreamStreams(w http.ResponseWriter, req *http.Request, opts *Options, supportedStreamProtocols []string, idleTimeout time.Duration, streamCreationTimeout time.Duration) (*streamContext, bool) {
protocol, err := httpstream.Handshake(w, req, supportedStreamProtocols)
if err != nil {
Expand Down Expand Up @@ -102,10 +120,12 @@ func createHTTPStreamStreams(w http.ResponseWriter, req *http.Request, opts *Opt
case "":
logrus.Infof("Client did not request protocol negotiation. Falling back to %q", constant.StreamProtocolV1Name)
fallthrough
case constant.StreamProtocolV2Name:
handler = &v2ProtocolHandler{}
case constant.StreamProtocolV1Name:
handler = &v1ProtocolHandler{}
case constant.StreamProtocolV2Name:
handler = &v2ProtocolHandler{}
case constant.StreamProtocolV3Name:
handler = &v3ProtocolHandler{}
}

// Count the streams client asked for, starting with 1.
Expand All @@ -119,7 +139,9 @@ func createHTTPStreamStreams(w http.ResponseWriter, req *http.Request, opts *Opt
if opts.Stderr {
expectedStreams++
}
// TODO: handle opts.TTY
if opts.TTY && handler.supportsTerminalResizing() {
expectedStreams++
}

expired := time.NewTimer(streamCreationTimeout)
defer expired.Stop()
Expand Down Expand Up @@ -149,8 +171,61 @@ type protocolHandler interface {
// waitForStreams waits for the expected streams or a timeout, returning a
// remoteCommandContext if all the streams were received, or an error if not.
waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*streamContext, error)
// supportsTerminalResizing returns true if the protocol handler supports terminal resizing.
supportsTerminalResizing() bool
}

// v3ProtocolHandler implements the V3 protocol version for streaming command execution.
type v3ProtocolHandler struct{}

func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*streamContext, error) {
ctx := &streamContext{}
receivedStreams := 0
replyChan := make(chan struct{})
stop := make(chan struct{})
defer close(stop)
WaitForStreams:
for {
select {
case stream := <-streams:
streamType := stream.Headers().Get(constant.StreamType)
switch streamType {
case constant.StreamTypeError:
ctx.writeStatus = v1WriteStatusFunc(stream)
go waitStreamReply(stream.replySent, replyChan, stop)
case constant.StreamTypeStdin:
ctx.stdinStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case constant.StreamTypeStdout:
ctx.stdoutStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case constant.StreamTypeStderr:
ctx.stderrStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case constant.StreamTypeResize:
ctx.resizeStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
default:
logrus.Errorf("Unexpected stream type: %q", streamType)
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
}
case <-expired:
// TODO find a way to return the error to the user. Maybe use a separate
// stream to report errors?
return nil, fmt.Errorf("timed out waiting for client to create streams")
}
}

return ctx, nil
}

// supportsTerminalResizing returns true because v3ProtocolHandler supports it.
func (*v3ProtocolHandler) supportsTerminalResizing() bool { return true }

// v2ProtocolHandler implements the V2 protocol version for streaming command execution.
type v2ProtocolHandler struct{}

Expand Down Expand Up @@ -196,6 +271,9 @@ WaitForStreams:
return ctx, nil
}

// supportsTerminalResizing returns false because v2ProtocolHandler doesn't support it.
func (*v2ProtocolHandler) supportsTerminalResizing() bool { return false }

// v1ProtocolHandler implements the V1 protocol version for streaming command execution.
type v1ProtocolHandler struct{}

Expand Down Expand Up @@ -245,6 +323,9 @@ WaitForStreams:
return ctx, nil
}

// supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it.
func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false }

func v1WriteStatusFunc(stream io.Writer) func(status *StatusError) error {
return func(status *StatusError) error {
if status.Status().Status == StatusSuccess {
Expand Down
29 changes: 27 additions & 2 deletions cri/stream/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// Runtime is the interface to execute the commands and provide the streams.
type Runtime interface {
// Exec executes the command in pod.
Exec(ctx context.Context, containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error)
Exec(ctx context.Context, containerID string, cmd []string, resizeChan <-chan apitypes.ResizeOptions, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error)

// Attach attaches to pod.
Attach(ctx context.Context, containerID string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error
Expand All @@ -38,7 +38,7 @@ func NewStreamRuntime(ctrMgr mgr.ContainerMgr) Runtime {
}

// Exec executes a command inside the container.
func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error) {
func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []string, resizeChan <-chan apitypes.ResizeOptions, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error) {
createConfig := &apitypes.ExecCreateConfig{
Cmd: cmd,
AttachStdin: streamOpts.Stdin,
Expand All @@ -63,6 +63,13 @@ func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []stri
return 0, fmt.Errorf("failed to start exec for container %q: %v", containerID, err)
}

handleResizing(resizeChan, func(size apitypes.ResizeOptions) {
err := s.containerMgr.ResizeExec(ctx, execid, size)
if err != nil {
logrus.Errorf("failed to resize process %q console for container %q: %v", execid, containerID, err)
}
})

// TODO Find a better way instead of the dead loop
var ei *apitypes.ContainerExecInspect
for {
Expand All @@ -80,6 +87,24 @@ func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []stri
return uint32(ei.ExitCode), nil
}

// handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each
// remotecommand.TerminalSize received from the channel. The resize channel must be closed elsewhere to stop the
// goroutine.
func handleResizing(resizeChan <-chan apitypes.ResizeOptions, resizeFunc func(size apitypes.ResizeOptions)) {
if resizeChan == nil {
return
}
go func() {
for {
size, ok := <-resizeChan
if !ok {
return
}
resizeFunc(size)
}
}()
}

// Attach attaches to a running container.
func (s *streamRuntime) Attach(ctx context.Context, containerID string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error {
attachConfig := &mgr.AttachConfig{
Expand Down
18 changes: 17 additions & 1 deletion ctrd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,22 @@ func (c *Client) ExecContainer(ctx context.Context, process *Process) error {
return nil
}

// ResizeExec changes the size of the TTY of the exec process running
// in the container to the given height and width.
func (c *Client) ResizeExec(ctx context.Context, id string, execid string, opts types.ResizeOptions) error {
pack, err := c.watch.get(id)
if err != nil {
return err
}

execProcess, err := pack.task.LoadProcess(ctx, execid, nil)
if err != nil {
return err
}

return execProcess.Resize(ctx, uint32(opts.Width), uint32(opts.Height))
}

// ContainerPID returns the container's init process id.
func (c *Client) ContainerPID(ctx context.Context, id string) (int, error) {
pack, err := c.watch.get(id)
Expand Down Expand Up @@ -525,7 +541,7 @@ func (c *Client) ResizeContainer(ctx context.Context, id string, opts types.Resi
return err
}

return pack.task.Resize(ctx, uint32(opts.Height), uint32(opts.Width))
return pack.task.Resize(ctx, uint32(opts.Width), uint32(opts.Height))
}

// WaitContainer waits until container's status is stopped.
Expand Down
3 changes: 3 additions & 0 deletions ctrd/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type ContainerAPIClient interface {
ContainerStats(ctx context.Context, id string) (*containerdtypes.Metric, error)
// ExecContainer executes a process in container.
ExecContainer(ctx context.Context, process *Process) error
// ResizeContainer changes the size of the TTY of the exec process running
// in the container to the given height and width.
ResizeExec(ctx context.Context, id string, execid string, opts types.ResizeOptions) error
// RecoverContainer reload the container from metadata and watch it, if program be restarted.
RecoverContainer(ctx context.Context, id string, io *containerio.IO) error
// PauseContainer pause container.
Expand Down
3 changes: 3 additions & 0 deletions daemon/mgr/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ type ContainerMgr interface {
// CheckExecExist check if exec process `name` exist
CheckExecExist(ctx context.Context, name string) error

// ResizeExec resizes the size of exec process's tty.
ResizeExec(ctx context.Context, execid string, opts types.ResizeOptions) error

// 3. The following two function is related to network management.
// TODO: inconsistency, Connect/Disconnect operation is in newtork_bridge.go in upper API layer.
// Here we encapsualted them in container manager, inconsistency exists.
Expand Down
10 changes: 10 additions & 0 deletions daemon/mgr/container_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ func (mgr *ContainerManager) CreateExec(ctx context.Context, name string, config
return execid, nil
}

// ResizeExec resizes the size of exec process's tty.
func (mgr *ContainerManager) ResizeExec(ctx context.Context, execid string, opts types.ResizeOptions) error {
execConfig, err := mgr.GetExecConfig(ctx, execid)
if err != nil {
return err
}

return mgr.Client.ResizeExec(ctx, execConfig.ContainerID, execid, opts)
}

// StartExec executes a new process in container.
func (mgr *ContainerManager) StartExec(ctx context.Context, execid string, attach *AttachConfig) (err error) {
// GetExecConfig should not error, since we have done this before call StartExec
Expand Down

0 comments on commit 3148807

Please sign in to comment.