diff --git a/cri/stream/config.go b/cri/stream/config.go index ff091acdb3..4bc46c67cf 100644 --- a/cri/stream/config.go +++ b/cri/stream/config.go @@ -17,10 +17,10 @@ const ( DefaultStreamCreationTimeout = 30 * time.Second ) -// TODO: StreamProtocolV2Name, StreamProtocolV3Name, StreamProtocolV4Name support. +// TODO: StreamProtocolV4Name support. // SupportedStreamingProtocols is the streaming protocols which server supports. -var SupportedStreamingProtocols = []string{constant.StreamProtocolV1Name, constant.StreamProtocolV2Name} +var SupportedStreamingProtocols = []string{constant.StreamProtocolV1Name, constant.StreamProtocolV2Name, constant.StreamProtocolV3Name} // SupportedPortForwardProtocols is the portforward protocols which server supports. var SupportedPortForwardProtocols = []string{constant.PortForwardProtocolV1Name} diff --git a/cri/stream/remotecommand/exec.go b/cri/stream/remotecommand/exec.go index 441a3dcf48..40a8096aac 100644 --- a/cri/stream/remotecommand/exec.go +++ b/cri/stream/remotecommand/exec.go @@ -5,12 +5,14 @@ import ( "fmt" "net/http" "time" + + apitypes "github.com/alibaba/pouch/apis/types" ) // Executor knows how to execute a command in a container of the pod. type Executor interface { // Exec executes a command in a container of the pod. - Exec(ctx context.Context, containerID string, cmd []string, streamOpts *Options, streams *Streams) (uint32, error) + Exec(ctx context.Context, containerID string, cmd []string, resizeChan <-chan apitypes.ResizeOptions, streamOpts *Options, streams *Streams) (uint32, error) } // ServeExec handles requests to execute a command in a container. After @@ -24,7 +26,7 @@ func ServeExec(ctx context.Context, w http.ResponseWriter, req *http.Request, ex } defer streamCtx.conn.Close() - exitCode, err := executor.Exec(ctx, container, cmd, streamOpts, &Streams{ + exitCode, err := executor.Exec(ctx, container, cmd, streamCtx.resizeChan, streamOpts, &Streams{ StdinStream: streamCtx.stdinStream, StdoutStream: streamCtx.stdoutStream, StderrStream: streamCtx.stderrStream, diff --git a/cri/stream/remotecommand/httpstream.go b/cri/stream/remotecommand/httpstream.go index f18e59fc7d..03fe459897 100644 --- a/cri/stream/remotecommand/httpstream.go +++ b/cri/stream/remotecommand/httpstream.go @@ -7,6 +7,7 @@ import ( "net/http" "time" + apitypes "github.com/alibaba/pouch/apis/types" "github.com/alibaba/pouch/cri/stream/constant" "github.com/alibaba/pouch/cri/stream/httpstream" "github.com/alibaba/pouch/cri/stream/httpstream/spdy" @@ -42,6 +43,7 @@ type streamContext struct { stdoutStream io.WriteCloser stderrStream io.WriteCloser resizeStream io.ReadCloser + resizeChan chan apitypes.ResizeOptions writeStatus func(status *StatusError) error tty bool } @@ -67,11 +69,27 @@ func createStreams(w http.ResponseWriter, req *http.Request, opts *Options, supp return nil, false } - // TODO: resizeStream support. + if ctx.resizeStream != nil { + ctx.resizeChan = make(chan apitypes.ResizeOptions) + go handleResizeEvents(ctx.resizeStream, ctx.resizeChan) + } return ctx, true } +func handleResizeEvents(stream io.Reader, channel chan<- apitypes.ResizeOptions) { + decoder := json.NewDecoder(stream) + for { + size := apitypes.ResizeOptions{} + err := decoder.Decode(&size) + if err != nil { + close(channel) + break + } + channel <- size + } +} + func createHTTPStreamStreams(w http.ResponseWriter, req *http.Request, opts *Options, supportedStreamProtocols []string, idleTimeout time.Duration, streamCreationTimeout time.Duration) (*streamContext, bool) { protocol, err := httpstream.Handshake(w, req, supportedStreamProtocols) if err != nil { @@ -102,10 +120,12 @@ func createHTTPStreamStreams(w http.ResponseWriter, req *http.Request, opts *Opt case "": logrus.Infof("Client did not request protocol negotiation. Falling back to %q", constant.StreamProtocolV1Name) fallthrough - case constant.StreamProtocolV2Name: - handler = &v2ProtocolHandler{} case constant.StreamProtocolV1Name: handler = &v1ProtocolHandler{} + case constant.StreamProtocolV2Name: + handler = &v2ProtocolHandler{} + case constant.StreamProtocolV3Name: + handler = &v3ProtocolHandler{} } // Count the streams client asked for, starting with 1. @@ -119,7 +139,9 @@ func createHTTPStreamStreams(w http.ResponseWriter, req *http.Request, opts *Opt if opts.Stderr { expectedStreams++ } - // TODO: handle opts.TTY + if opts.TTY && handler.supportsTerminalResizing() { + expectedStreams++ + } expired := time.NewTimer(streamCreationTimeout) defer expired.Stop() @@ -149,8 +171,61 @@ type protocolHandler interface { // waitForStreams waits for the expected streams or a timeout, returning a // remoteCommandContext if all the streams were received, or an error if not. waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*streamContext, error) + // supportsTerminalResizing returns true if the protocol handler supports terminal resizing. + supportsTerminalResizing() bool } +// v3ProtocolHandler implements the V3 protocol version for streaming command execution. +type v3ProtocolHandler struct{} + +func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*streamContext, error) { + ctx := &streamContext{} + receivedStreams := 0 + replyChan := make(chan struct{}) + stop := make(chan struct{}) + defer close(stop) +WaitForStreams: + for { + select { + case stream := <-streams: + streamType := stream.Headers().Get(constant.StreamType) + switch streamType { + case constant.StreamTypeError: + ctx.writeStatus = v1WriteStatusFunc(stream) + go waitStreamReply(stream.replySent, replyChan, stop) + case constant.StreamTypeStdin: + ctx.stdinStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + case constant.StreamTypeStdout: + ctx.stdoutStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + case constant.StreamTypeStderr: + ctx.stderrStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + case constant.StreamTypeResize: + ctx.resizeStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + default: + logrus.Errorf("Unexpected stream type: %q", streamType) + } + case <-replyChan: + receivedStreams++ + if receivedStreams == expectedStreams { + break WaitForStreams + } + case <-expired: + // TODO find a way to return the error to the user. Maybe use a separate + // stream to report errors? + return nil, fmt.Errorf("timed out waiting for client to create streams") + } + } + + return ctx, nil +} + +// supportsTerminalResizing returns true because v3ProtocolHandler supports it. +func (*v3ProtocolHandler) supportsTerminalResizing() bool { return true } + // v2ProtocolHandler implements the V2 protocol version for streaming command execution. type v2ProtocolHandler struct{} @@ -196,6 +271,9 @@ WaitForStreams: return ctx, nil } +// supportsTerminalResizing returns false because v2ProtocolHandler doesn't support it. +func (*v2ProtocolHandler) supportsTerminalResizing() bool { return false } + // v1ProtocolHandler implements the V1 protocol version for streaming command execution. type v1ProtocolHandler struct{} @@ -245,6 +323,9 @@ WaitForStreams: return ctx, nil } +// supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it. +func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false } + func v1WriteStatusFunc(stream io.Writer) func(status *StatusError) error { return func(status *StatusError) error { if status.Status().Status == StatusSuccess { diff --git a/cri/stream/runtime.go b/cri/stream/runtime.go index 46f523089a..3a5ca5a03e 100644 --- a/cri/stream/runtime.go +++ b/cri/stream/runtime.go @@ -19,7 +19,7 @@ import ( // Runtime is the interface to execute the commands and provide the streams. type Runtime interface { // Exec executes the command in pod. - Exec(ctx context.Context, containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error) + Exec(ctx context.Context, containerID string, cmd []string, resizeChan <-chan apitypes.ResizeOptions, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error) // Attach attaches to pod. Attach(ctx context.Context, containerID string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error @@ -38,7 +38,7 @@ func NewStreamRuntime(ctrMgr mgr.ContainerMgr) Runtime { } // Exec executes a command inside the container. -func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error) { +func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []string, resizeChan <-chan apitypes.ResizeOptions, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error) { createConfig := &apitypes.ExecCreateConfig{ Cmd: cmd, AttachStdin: streamOpts.Stdin, @@ -63,6 +63,13 @@ func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []stri return 0, fmt.Errorf("failed to start exec for container %q: %v", containerID, err) } + handleResizing(resizeChan, func(size apitypes.ResizeOptions) { + err := s.containerMgr.ResizeExec(ctx, execid, size) + if err != nil { + logrus.Errorf("failed to resize process %q console for container %q: %v", execid, containerID, err) + } + }) + // TODO Find a better way instead of the dead loop var ei *apitypes.ContainerExecInspect for { @@ -80,6 +87,24 @@ func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []stri return uint32(ei.ExitCode), nil } +// handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each +// remotecommand.TerminalSize received from the channel. The resize channel must be closed elsewhere to stop the +// goroutine. +func handleResizing(resizeChan <-chan apitypes.ResizeOptions, resizeFunc func(size apitypes.ResizeOptions)) { + if resizeChan == nil { + return + } + go func() { + for { + size, ok := <-resizeChan + if !ok { + return + } + resizeFunc(size) + } + }() +} + // Attach attaches to a running container. func (s *streamRuntime) Attach(ctx context.Context, containerID string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error { attachConfig := &mgr.AttachConfig{ diff --git a/ctrd/container.go b/ctrd/container.go index 86c88cad83..148778ee89 100644 --- a/ctrd/container.go +++ b/ctrd/container.go @@ -121,6 +121,22 @@ func (c *Client) ExecContainer(ctx context.Context, process *Process) error { return nil } +// ResizeExec changes the size of the TTY of the exec process running +// in the container to the given height and width. +func (c *Client) ResizeExec(ctx context.Context, id string, execid string, opts types.ResizeOptions) error { + pack, err := c.watch.get(id) + if err != nil { + return err + } + + execProcess, err := pack.task.LoadProcess(ctx, execid, nil) + if err != nil { + return err + } + + return execProcess.Resize(ctx, uint32(opts.Width), uint32(opts.Height)) +} + // ContainerPID returns the container's init process id. func (c *Client) ContainerPID(ctx context.Context, id string) (int, error) { pack, err := c.watch.get(id) @@ -525,7 +541,7 @@ func (c *Client) ResizeContainer(ctx context.Context, id string, opts types.Resi return err } - return pack.task.Resize(ctx, uint32(opts.Height), uint32(opts.Width)) + return pack.task.Resize(ctx, uint32(opts.Width), uint32(opts.Height)) } // WaitContainer waits until container's status is stopped. diff --git a/ctrd/interface.go b/ctrd/interface.go index e0c92dba21..d784602f40 100644 --- a/ctrd/interface.go +++ b/ctrd/interface.go @@ -42,6 +42,9 @@ type ContainerAPIClient interface { ContainerStats(ctx context.Context, id string) (*containerdtypes.Metric, error) // ExecContainer executes a process in container. ExecContainer(ctx context.Context, process *Process) error + // ResizeContainer changes the size of the TTY of the exec process running + // in the container to the given height and width. + ResizeExec(ctx context.Context, id string, execid string, opts types.ResizeOptions) error // RecoverContainer reload the container from metadata and watch it, if program be restarted. RecoverContainer(ctx context.Context, id string, io *containerio.IO) error // PauseContainer pause container. diff --git a/daemon/mgr/container.go b/daemon/mgr/container.go index 85b2c8209a..7249c4bdeb 100644 --- a/daemon/mgr/container.go +++ b/daemon/mgr/container.go @@ -121,6 +121,9 @@ type ContainerMgr interface { // CheckExecExist check if exec process `name` exist CheckExecExist(ctx context.Context, name string) error + // ResizeExec resizes the size of exec process's tty. + ResizeExec(ctx context.Context, execid string, opts types.ResizeOptions) error + // 3. The following two function is related to network management. // TODO: inconsistency, Connect/Disconnect operation is in newtork_bridge.go in upper API layer. // Here we encapsualted them in container manager, inconsistency exists. diff --git a/daemon/mgr/container_exec.go b/daemon/mgr/container_exec.go index 1e0100b971..60910e15ba 100644 --- a/daemon/mgr/container_exec.go +++ b/daemon/mgr/container_exec.go @@ -39,6 +39,16 @@ func (mgr *ContainerManager) CreateExec(ctx context.Context, name string, config return execid, nil } +// ResizeExec resizes the size of exec process's tty. +func (mgr *ContainerManager) ResizeExec(ctx context.Context, execid string, opts types.ResizeOptions) error { + execConfig, err := mgr.GetExecConfig(ctx, execid) + if err != nil { + return err + } + + return mgr.Client.ResizeExec(ctx, execConfig.ContainerID, execid, opts) +} + // StartExec executes a new process in container. func (mgr *ContainerManager) StartExec(ctx context.Context, execid string, attach *AttachConfig) (err error) { // GetExecConfig should not error, since we have done this before call StartExec