Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: support resize the tty of cri exec process #2063

Merged
merged 1 commit into from
Aug 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cri/stream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ const (
DefaultStreamCreationTimeout = 30 * time.Second
)

// TODO: StreamProtocolV2Name, StreamProtocolV3Name, StreamProtocolV4Name support.
// TODO: StreamProtocolV4Name support.

// SupportedStreamingProtocols is the streaming protocols which server supports.
var SupportedStreamingProtocols = []string{constant.StreamProtocolV1Name, constant.StreamProtocolV2Name}
var SupportedStreamingProtocols = []string{
constant.StreamProtocolV1Name,
constant.StreamProtocolV2Name,
constant.StreamProtocolV3Name,
}

// SupportedPortForwardProtocols is the portforward protocols which server supports.
var SupportedPortForwardProtocols = []string{constant.PortForwardProtocolV1Name}
Expand Down
6 changes: 4 additions & 2 deletions cri/stream/remotecommand/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"fmt"
"net/http"
"time"

apitypes "github.com/alibaba/pouch/apis/types"
)

// Executor knows how to execute a command in a container of the pod.
type Executor interface {
// Exec executes a command in a container of the pod.
Exec(ctx context.Context, containerID string, cmd []string, streamOpts *Options, streams *Streams) (uint32, error)
Exec(ctx context.Context, containerID string, cmd []string, resizeChan <-chan apitypes.ResizeOptions, streamOpts *Options, streams *Streams) (uint32, error)
}

// ServeExec handles requests to execute a command in a container. After
Expand All @@ -24,7 +26,7 @@ func ServeExec(ctx context.Context, w http.ResponseWriter, req *http.Request, ex
}
defer streamCtx.conn.Close()

exitCode, err := executor.Exec(ctx, container, cmd, streamOpts, &Streams{
exitCode, err := executor.Exec(ctx, container, cmd, streamCtx.resizeChan, streamOpts, &Streams{
StdinStream: streamCtx.stdinStream,
StdoutStream: streamCtx.stdoutStream,
StderrStream: streamCtx.stderrStream,
Expand Down
101 changes: 93 additions & 8 deletions cri/stream/remotecommand/httpstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"time"

apitypes "github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/cri/stream/constant"
"github.com/alibaba/pouch/cri/stream/httpstream"
"github.com/alibaba/pouch/cri/stream/httpstream/spdy"
Expand Down Expand Up @@ -42,6 +43,7 @@ type streamContext struct {
stdoutStream io.WriteCloser
stderrStream io.WriteCloser
resizeStream io.ReadCloser
resizeChan chan apitypes.ResizeOptions
writeStatus func(status *StatusError) error
tty bool
}
Expand All @@ -67,11 +69,31 @@ func createStreams(w http.ResponseWriter, req *http.Request, opts *Options, supp
return nil, false
}

// TODO: resizeStream support.
if ctx.resizeStream != nil {
ctx.resizeChan = make(chan apitypes.ResizeOptions)
go handleResizeEvents(ctx.resizeStream, ctx.resizeChan)
}

return ctx, true
}

func handleResizeEvents(stream io.Reader, channel chan<- apitypes.ResizeOptions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add the ctx or event id for log stuff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we can't provide enough useful information in this function

So I do the log in function handleResizing from which we could know the container and exec id

var err error
decoder := json.NewDecoder(stream)
for {
size := apitypes.ResizeOptions{}
err = decoder.Decode(&size)
if err != nil {
break
}
channel <- size
}
if err != io.EOF {
logrus.Errorf("failed to decode resize request from resize stream: %v", err)
}
close(channel)
}

func createHTTPStreamStreams(w http.ResponseWriter, req *http.Request, opts *Options, supportedStreamProtocols []string, idleTimeout time.Duration, streamCreationTimeout time.Duration) (*streamContext, bool) {
protocol, err := httpstream.Handshake(w, req, supportedStreamProtocols)
if err != nil {
Expand Down Expand Up @@ -102,10 +124,12 @@ func createHTTPStreamStreams(w http.ResponseWriter, req *http.Request, opts *Opt
case "":
logrus.Infof("Client did not request protocol negotiation. Falling back to %q", constant.StreamProtocolV1Name)
fallthrough
case constant.StreamProtocolV2Name:
handler = &v2ProtocolHandler{}
case constant.StreamProtocolV1Name:
handler = &v1ProtocolHandler{}
case constant.StreamProtocolV2Name:
handler = &v2ProtocolHandler{}
case constant.StreamProtocolV3Name:
handler = &v3ProtocolHandler{}
}

// Count the streams client asked for, starting with 1.
Expand All @@ -119,7 +143,9 @@ func createHTTPStreamStreams(w http.ResponseWriter, req *http.Request, opts *Opt
if opts.Stderr {
expectedStreams++
}
// TODO: handle opts.TTY
if opts.TTY && handler.supportsTerminalResizing() {
expectedStreams++
}

expired := time.NewTimer(streamCreationTimeout)
defer expired.Stop()
Expand Down Expand Up @@ -149,8 +175,61 @@ type protocolHandler interface {
// waitForStreams waits for the expected streams or a timeout, returning a
// remoteCommandContext if all the streams were received, or an error if not.
waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*streamContext, error)
// supportsTerminalResizing returns true if the protocol handler supports terminal resizing.
supportsTerminalResizing() bool
}

// v3ProtocolHandler implements the V3 protocol version for streaming command execution.
type v3ProtocolHandler struct{}

func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*streamContext, error) {
ctx := &streamContext{}
receivedStreams := 0
replyChan := make(chan struct{})
stop := make(chan struct{})
defer close(stop)
done:
for {
select {
case stream := <-streams:
streamType := stream.Headers().Get(constant.StreamType)
switch streamType {
case constant.StreamTypeError:
ctx.writeStatus = v1WriteStatusFunc(stream)
go waitStreamReply(stream.replySent, replyChan, stop)
case constant.StreamTypeStdin:
ctx.stdinStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case constant.StreamTypeStdout:
ctx.stdoutStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case constant.StreamTypeStderr:
ctx.stderrStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case constant.StreamTypeResize:
ctx.resizeStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
default:
logrus.Errorf("Unexpected stream type: %q", streamType)
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break done
}
case <-expired:
// TODO find a way to return the error to the user. Maybe use a separate
// stream to report errors?
return nil, fmt.Errorf("timed out waiting for client to create streams")
}
}

return ctx, nil
}

// supportsTerminalResizing returns true because v3ProtocolHandler supports it.
func (*v3ProtocolHandler) supportsTerminalResizing() bool { return true }

// v2ProtocolHandler implements the V2 protocol version for streaming command execution.
type v2ProtocolHandler struct{}

Expand All @@ -160,7 +239,7 @@ func (*v2ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expected
replyChan := make(chan struct{})
stop := make(chan struct{})
defer close(stop)
WaitForStreams:
done:
for {
select {
case stream := <-streams:
Expand All @@ -184,7 +263,7 @@ WaitForStreams:
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
break done
}
case <-expired:
// TODO find a way to return the error to the user. Maybe use a separate
Expand All @@ -196,6 +275,9 @@ WaitForStreams:
return ctx, nil
}

// supportsTerminalResizing returns false because v2ProtocolHandler doesn't support it.
func (*v2ProtocolHandler) supportsTerminalResizing() bool { return false }

// v1ProtocolHandler implements the V1 protocol version for streaming command execution.
type v1ProtocolHandler struct{}

Expand All @@ -205,7 +287,7 @@ func (*v1ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expected
replyChan := make(chan struct{})
stop := make(chan struct{})
defer close(stop)
WaitForStreams:
done:
for {
select {
case stream := <-streams:
Expand All @@ -229,7 +311,7 @@ WaitForStreams:
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
break done
}
case <-expired:
// TODO find a way to return the error to the user. Maybe use a separate
Expand All @@ -245,6 +327,9 @@ WaitForStreams:
return ctx, nil
}

// supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it.
func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false }

func v1WriteStatusFunc(stream io.Writer) func(status *StatusError) error {
return func(status *StatusError) error {
if status.Status().Status == StatusSuccess {
Expand Down
32 changes: 30 additions & 2 deletions cri/stream/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// Runtime is the interface to execute the commands and provide the streams.
type Runtime interface {
// Exec executes the command in pod.
Exec(ctx context.Context, containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error)
Exec(ctx context.Context, containerID string, cmd []string, resizeChan <-chan apitypes.ResizeOptions, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error)

// Attach attaches to pod.
Attach(ctx context.Context, containerID string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error
Expand All @@ -38,7 +38,7 @@ func NewStreamRuntime(ctrMgr mgr.ContainerMgr) Runtime {
}

// Exec executes a command inside the container.
func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error) {
func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []string, resizeChan <-chan apitypes.ResizeOptions, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error) {
createConfig := &apitypes.ExecCreateConfig{
Cmd: cmd,
AttachStdin: streamOpts.Stdin,
Expand All @@ -63,6 +63,13 @@ func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []stri
return 0, fmt.Errorf("failed to start exec for container %q: %v", containerID, err)
}

handleResizing(containerID, execid, resizeChan, func(size apitypes.ResizeOptions) {
err := s.containerMgr.ResizeExec(ctx, execid, size)
if err != nil {
logrus.Errorf("failed to resize process %q console for container %q: %v", execid, containerID, err)
}
})

// TODO Find a better way instead of the dead loop
var ei *apitypes.ContainerExecInspect
for {
Expand All @@ -80,6 +87,27 @@ func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []stri
return uint32(ei.ExitCode), nil
}

// handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each
// remotecommand.TerminalSize received from the channel. The resize channel must be closed elsewhere to stop the
// goroutine.
func handleResizing(containerID, execID string, resizeChan <-chan apitypes.ResizeOptions, resizeFunc func(size apitypes.ResizeOptions)) {
if resizeChan == nil {
return
}
go func() {
for {
size, ok := <-resizeChan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the size is always valid?

if !ok {
return
}
if size.Height <= 0 || size.Width <= 0 {
continue
}
resizeFunc(size)
}
}()
}

// Attach attaches to a running container.
func (s *streamRuntime) Attach(ctx context.Context, containerID string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error {
attachConfig := &mgr.AttachConfig{
Expand Down
18 changes: 17 additions & 1 deletion ctrd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,22 @@ func (c *Client) ExecContainer(ctx context.Context, process *Process) error {
return nil
}

// ResizeExec changes the size of the TTY of the exec process running
// in the container to the given height and width.
func (c *Client) ResizeExec(ctx context.Context, id string, execid string, opts types.ResizeOptions) error {
pack, err := c.watch.get(id)
if err != nil {
return err
}

execProcess, err := pack.task.LoadProcess(ctx, execid, nil)
if err != nil {
return err
}

return execProcess.Resize(ctx, uint32(opts.Width), uint32(opts.Height))
}

// ContainerPID returns the container's init process id.
func (c *Client) ContainerPID(ctx context.Context, id string) (int, error) {
pack, err := c.watch.get(id)
Expand Down Expand Up @@ -525,7 +541,7 @@ func (c *Client) ResizeContainer(ctx context.Context, id string, opts types.Resi
return err
}

return pack.task.Resize(ctx, uint32(opts.Height), uint32(opts.Width))
return pack.task.Resize(ctx, uint32(opts.Width), uint32(opts.Height))
}

// WaitContainer waits until container's status is stopped.
Expand Down
3 changes: 3 additions & 0 deletions ctrd/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type ContainerAPIClient interface {
ContainerStats(ctx context.Context, id string) (*containerdtypes.Metric, error)
// ExecContainer executes a process in container.
ExecContainer(ctx context.Context, process *Process) error
// ResizeContainer changes the size of the TTY of the exec process running
// in the container to the given height and width.
ResizeExec(ctx context.Context, id string, execid string, opts types.ResizeOptions) error
// RecoverContainer reload the container from metadata and watch it, if program be restarted.
RecoverContainer(ctx context.Context, id string, io *containerio.IO) error
// PauseContainer pause container.
Expand Down
3 changes: 3 additions & 0 deletions daemon/mgr/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ type ContainerMgr interface {
// CheckExecExist check if exec process `name` exist
CheckExecExist(ctx context.Context, name string) error

// ResizeExec resizes the size of exec process's tty.
ResizeExec(ctx context.Context, execid string, opts types.ResizeOptions) error

// 3. The following two function is related to network management.
// TODO: inconsistency, Connect/Disconnect operation is in newtork_bridge.go in upper API layer.
// Here we encapsualted them in container manager, inconsistency exists.
Expand Down
10 changes: 10 additions & 0 deletions daemon/mgr/container_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ func (mgr *ContainerManager) CreateExec(ctx context.Context, name string, config
return execid, nil
}

// ResizeExec resizes the size of exec process's tty.
func (mgr *ContainerManager) ResizeExec(ctx context.Context, execid string, opts types.ResizeOptions) error {
execConfig, err := mgr.GetExecConfig(ctx, execid)
if err != nil {
return err
}

return mgr.Client.ResizeExec(ctx, execConfig.ContainerID, execid, opts)
}

// StartExec executes a new process in container.
func (mgr *ContainerManager) StartExec(ctx context.Context, execid string, attach *AttachConfig) (err error) {
// GetExecConfig should not error, since we have done this before call StartExec
Expand Down