Skip to content

Commit

Permalink
feature: error stream for cri stream server
Browse files Browse the repository at this point in the history
Signed-off-by: YaoZengzeng <yaozengzeng@zju.edu.cn>
  • Loading branch information
YaoZengzeng committed Mar 29, 2018
1 parent 8ac4b7c commit 5d979ab
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 13 deletions.
7 changes: 7 additions & 0 deletions cri/stream/docs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package stream

// NOTE: the code in this package and its subpackage is almost copy from
// kubernete's official code base and make some modification to satisify
// our need.
// The reason why we do this is not to vendor so many packages which most
// code is useless and we want to keep pouch simple and clean :)
11 changes: 10 additions & 1 deletion cri/stream/remotecommand/attach.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package remotecommand

import (
"fmt"
"net/http"
"time"
)
Expand All @@ -21,10 +22,18 @@ func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, co
}
defer ctx.conn.Close()

attacher.Attach(container, streamOpts, &Streams{
err := attacher.Attach(container, streamOpts, &Streams{
StreamCh: make(chan struct{}, 1),
StdinStream: ctx.stdinStream,
StdoutStream: ctx.stdoutStream,
StderrStream: ctx.stderrStream,
})
if err != nil {
err = fmt.Errorf("error attaching to container: %v", err)
ctx.writeStatus(NewInternalError(err))
} else {
ctx.writeStatus(&StatusError{ErrStatus: Status{
Status: StatusSuccess,
}})
}
}
122 changes: 122 additions & 0 deletions cri/stream/remotecommand/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package remotecommand

import (
"fmt"
"net/http"
)

// Values of Status.Status
const (
StatusSuccess = "Success"
StatusFailure = "Failure"
)

// StatusReason is an enumeration of possible failure causes. Each StatusReason
// must map to a single HTTP status code, but multiple reasons may map
// to the same HTTP status code.
// TODO: move to apiserver
type StatusReason string

const (
// NonZeroExitCodeReason indicates that the command executing failed with non zero exit code.
NonZeroExitCodeReason StatusReason = "NonZeroExitCode"

// StatusReasonInternalError indicates that an internal error occurred, it is unexpected
// and the outcome of the call is unknown.
// Details (optional):
// "causes" - The original error
// Status code 500
StatusReasonInternalError StatusReason = "InternalError"
)

// CauseType is a machine readable value providing more detail about what
// occurred in a status response. An operation may have multiple causes for a
// status (whether Failure or Success).
type CauseType string

const (
// ExitCodeCauseType indicates that the status cause is the command's exit code is not zero.
ExitCodeCauseType CauseType = "ExitCode"
)

// StatusCause provides more information about an api.Status failure, including
// cases when multiple errors are encountered.
type StatusCause struct {
// A machine-readable description of the cause of the error. If this value is
// empty there is no information available.
// +optional
Type CauseType `json:"reason,omitempty" protobuf:"bytes,1,opt,name=reason,casttype=CauseType"`
// A human-readable description of the cause of the error. This field may be
// presented as-is to a reader.
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,2,opt,name=message"`
}

// StatusDetails is a set of additional properties that MAY be set by the
// server to provide additional information about a response. The Reason
// field of a Status object defines what attributes will be set. Clients
// must ignore fields that do not match the defined type of each attribute,
// and should assume that any attribute may be empty, invalid, or under
// defined.
type StatusDetails struct {
Causes []StatusCause `json:"causes,omitempty" protobuf:"bytes,4,rep,name=causes"`
}

// Status is a return value for calls that don't return other objects.
type Status struct {
// Status of the operation.
// One of: "Success" or "Failure".
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status
// +optional
Status string `json:"status,omitempty" protobuf:"bytes,2,opt,name=status"`
// A human-readable description of the status of this operation.
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,3,opt,name=message"`
// A machine-readable description of why this operation is in the
// "Failure" status. If this value is empty there
// is no information available. A Reason clarifies an HTTP status
// code but does not override it.
// +optional
Reason StatusReason `json:"reason,omitempty" protobuf:"bytes,4,opt,name=reason,casttype=StatusReason"`
// Extended data associated with the reason. Each reason may define its
// own extended details. This field is optional and the data returned
// is not guaranteed to conform to any schema except that defined by
// the reason type.
// +optional
Details *StatusDetails `json:"details,omitempty" protobuf:"bytes,5,opt,name=details"`
// Suggested HTTP return code for this status, 0 if not set.
// +optional
Code int32 `json:"code,omitempty" protobuf:"varint,6,opt,name=code"`
}

// StatusError is an error intended for consumption by a REST API server; it can also be
// reconstructed by clients from a REST response. Public to allow easy type switches.
type StatusError struct {
ErrStatus Status
}

var _ error = &StatusError{}

// Error implements the Error interface.
func (e *StatusError) Error() string {
return e.ErrStatus.Message
}

// Status allows access to e's status without having to know the detailed workings
// of StatusError.
func (e *StatusError) Status() Status {
return e.ErrStatus
}

// NewInternalError returns an error indicating the item is invalid and cannot be processed.
func NewInternalError(err error) *StatusError {
return &StatusError{Status{
Status: StatusFailure,
Code: http.StatusInternalServerError,
Reason: StatusReasonInternalError,
Details: &StatusDetails{
Causes: []StatusCause{{Message: err.Error()}},
},
Message: fmt.Sprintf("Internal error occurred: %v", err),
}}
}
27 changes: 25 additions & 2 deletions cri/stream/remotecommand/exec.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package remotecommand

import (
"fmt"
"net/http"
"time"
)

// Executor knows how to execute a command in a container of the pod.
type Executor interface {
// Exec executes a command in a container of the pod.
Exec(containerID string, cmd []string, streamOpts *Options, streams *Streams) error
Exec(containerID string, cmd []string, streamOpts *Options, streams *Streams) (uint32, error)
}

// ServeExec handles requests to execute a command in a container. After
Expand All @@ -22,9 +23,31 @@ func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, cont
}
defer ctx.conn.Close()

executor.Exec(container, cmd, streamOpts, &Streams{
exitCode, err := executor.Exec(container, cmd, streamOpts, &Streams{
StdinStream: ctx.stdinStream,
StdoutStream: ctx.stdoutStream,
StderrStream: ctx.stderrStream,
})
if err != nil {
err = fmt.Errorf("error executing command in container: %v", err)
ctx.writeStatus(NewInternalError(err))
} else if exitCode != 0 {
ctx.writeStatus(&StatusError{ErrStatus: Status{
Status: StatusFailure,
Reason: NonZeroExitCodeReason,
Details: &StatusDetails{
Causes: []StatusCause{
{
Type: ExitCodeCauseType,
Message: fmt.Sprintf("%d", exitCode),
},
},
},
Message: fmt.Sprintf("command terminated with non-zero exit code"),
}})
} else {
ctx.writeStatus(&StatusError{ErrStatus: Status{
Status: StatusSuccess,
}})
}
}
28 changes: 27 additions & 1 deletion cri/stream/remotecommand/httpstream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package remotecommand

import (
"encoding/json"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -41,6 +42,7 @@ type context struct {
stdoutStream io.WriteCloser
stderrStream io.WriteCloser
resizeStream io.ReadCloser
writeStatus func(status *StatusError) error
tty bool
}

Expand Down Expand Up @@ -165,6 +167,7 @@ WaitForStreams:
streamType := stream.Headers().Get(constant.StreamType)
switch streamType {
case constant.StreamTypeError:
ctx.writeStatus = v1WriteStatusFunc(stream)
go waitStreamReply(stream.replySent, replyChan, stop)
case constant.StreamTypeStdin:
ctx.stdinStream = stream
Expand Down Expand Up @@ -209,7 +212,7 @@ WaitForStreams:
streamType := stream.Headers().Get(constant.StreamType)
switch streamType {
case constant.StreamTypeError:
// ctx.writeStatus = v1WriteStatusFunc(stream)
ctx.writeStatus = v1WriteStatusFunc(stream)
go waitStreamReply(stream.replySent, replyChan, stop)
case constant.StreamTypeStdin:
ctx.stdinStream = stream
Expand Down Expand Up @@ -241,3 +244,26 @@ WaitForStreams:

return ctx, nil
}

func v1WriteStatusFunc(stream io.Writer) func(status *StatusError) error {
return func(status *StatusError) error {
if status.Status().Status == StatusSuccess {
return nil
}

_, err := stream.Write([]byte(status.Error()))
return err
}
}

func v4WriteStatusFunc(stream io.Writer) func(status *StatusError) error {
return func(status *StatusError) error {
bs, err := json.Marshal(status.Status())
if err != nil {
return err
}

_, err = stream.Write(bs)
return err
}
}
9 changes: 7 additions & 2 deletions cri/stream/remotecommand/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func createWebSocketStreams(w http.ResponseWriter, req *http.Request, opts *Opti
},
})
conn.SetIdleTimeout(idleTimeout)
_, streams, err := conn.Open(w, req)
negotiatedProtocol, streams, err := conn.Open(w, req)
if err != nil {
runtime.HandleError(fmt.Errorf("Unable to upgrade websocket connection: %v", err))
return nil, false
Expand All @@ -104,7 +104,12 @@ func createWebSocketStreams(w http.ResponseWriter, req *http.Request, opts *Opti
tty: opts.TTY,
}

// TODO: handle write status function.
switch negotiatedProtocol {
case v4BinaryWebsocketProtocol, v4Base64WebsocketProtocol:
ctx.writeStatus = v4WriteStatusFunc(streams[errorChannel])
default:
ctx.writeStatus = v1WriteStatusFunc(streams[errorChannel])
}

return ctx, true
}
2 changes: 1 addition & 1 deletion cri/stream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Server interface {
// Runtime is the interface to execute the commands and provide the streams.
type Runtime interface {
// Exec executes the command in pod.
Exec(containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error
Exec(containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error)

// Attach attaches to pod.
Attach(containerID string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error
Expand Down
12 changes: 6 additions & 6 deletions daemon/mgr/cri_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newStreamRuntime(ctrMgr ContainerMgr) stream.Runtime {
}

// Exec executes a command inside the container.
func (s *streamRuntime) Exec(containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error {
func (s *streamRuntime) Exec(containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error) {
createConfig := &apitypes.ExecCreateConfig{
Cmd: cmd,
AttachStdin: streamOpts.Stdin,
Expand All @@ -45,7 +45,7 @@ func (s *streamRuntime) Exec(containerID string, cmd []string, streamOpts *remot

execid, err := s.containerMgr.CreateExec(ctx, containerID, createConfig)
if err != nil {
return fmt.Errorf("failed to create exec for container %q: %v", containerID, err)
return 0, fmt.Errorf("failed to create exec for container %q: %v", containerID, err)
}

startConfig := &apitypes.ExecStartConfig{}
Expand All @@ -55,18 +55,18 @@ func (s *streamRuntime) Exec(containerID string, cmd []string, streamOpts *remot

err = s.containerMgr.StartExec(ctx, execid, startConfig, attachConfig)
if err != nil {
return fmt.Errorf("failed to start exec for container %q: %v", containerID, err)
return 0, fmt.Errorf("failed to start exec for container %q: %v", containerID, err)
}

ei, err := s.containerMgr.InspectExec(ctx, execid)
if err != nil {
return fmt.Errorf("failed to inspect exec for container %q: %v", containerID, err)
return 0, fmt.Errorf("failed to inspect exec for container %q: %v", containerID, err)
}

// Not return until exec finished.
<-ei.ExitCh
result := <-ei.ExitCh

return nil
return result.ExitCode(), nil
}

// Attach attaches to a running container.
Expand Down

0 comments on commit 5d979ab

Please sign in to comment.