Skip to content

Commit

Permalink
Merge pull request #1741 from Starnop/cri-stream-context
Browse files Browse the repository at this point in the history
style: alert the name of struct context to streamContext
  • Loading branch information
allencloud authored Jul 16, 2018
2 parents 165899c + f647fbd commit 852541f
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 45 deletions.
14 changes: 7 additions & 7 deletions cri/stream/portforward/httpstream.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package portforward

import (
gocontext "context"
"context"
"fmt"
"net/http"
"strconv"
Expand Down Expand Up @@ -49,7 +49,7 @@ func httpStreamReceived(streams chan httpstream.Stream) func(httpstream.Stream,
}
}

func handleHTTPStreams(goctx gocontext.Context, w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, idleTimeout, streamCreationTimeout time.Duration, supportedPortForwardProtocols []string) error {
func handleHTTPStreams(ctx context.Context, w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, idleTimeout, streamCreationTimeout time.Duration, supportedPortForwardProtocols []string) error {
_, err := httpstream.Handshake(w, req, supportedPortForwardProtocols)
// Negotiated protocol isn't currently used server side, but could be in the future.
if err != nil {
Expand Down Expand Up @@ -77,7 +77,7 @@ func handleHTTPStreams(goctx gocontext.Context, w http.ResponseWriter, req *http
pod: podName,
forwarder: portForwarder,
}
h.run(goctx)
h.run(ctx)

return nil
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func (h *httpStreamHandler) requestID(stream httpstream.Stream) string {
// run is the main loop for the httpStreamHandler. It process new streams,
// invoking portForward for each complete stream pair. The loop exits
// when the httpstream.Connection is closed.
func (h *httpStreamHandler) run(goctx gocontext.Context) {
func (h *httpStreamHandler) run(ctx context.Context) {
logrus.Infof("(conn=%p) waiting for port forward streams", h.conn)

for {
Expand All @@ -172,23 +172,23 @@ func (h *httpStreamHandler) run(goctx gocontext.Context) {
msg := fmt.Sprintf("error processing stream for request %s: %v", requestID, err)
p.printError(msg)
} else if complete {
go h.portForward(goctx, p)
go h.portForward(ctx, p)
}
}
}
}

// portForward invokes the httpStreamHandler's forwarder.PortForward
// function for the given stream pair.
func (h *httpStreamHandler) portForward(goctx gocontext.Context, p *httpStreamPair) {
func (h *httpStreamHandler) portForward(ctx context.Context, p *httpStreamPair) {
defer p.dataStream.Close()
defer p.errorStream.Close()

portString := p.dataStream.Headers().Get(constant.PortHeader)
port, _ := strconv.ParseInt(portString, 10, 32)

logrus.Infof("(conn=%p, request=%s) invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
err := h.forwarder.PortForward(goctx, h.pod, int32(port), p.dataStream)
err := h.forwarder.PortForward(ctx, h.pod, int32(port), p.dataStream)
logrus.Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)

if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions cri/stream/portforward/portforward.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package portforward

import (
gocontext "context"
"context"
"io"
"net/http"
"time"
Expand All @@ -13,17 +13,17 @@ import (
// in a pod.
type PortForwarder interface {
// PortForwarder copies data between a data stream and a port in a pod.
PortForward(goctx gocontext.Context, name string, port int32, stream io.ReadWriteCloser) error
PortForward(ctx context.Context, name string, port int32, stream io.ReadWriteCloser) error
}

// ServePortForward handles a port forwarding request. A single request is
// kept alive as long as the client is still alive and the connection has not
// been timed out due to idleness. This function handles multiple forwarded
// connections; i.e., multiple `curl http://localhost:8888/` requests will be
// handled by a single invocation of ServePortForward.
func ServePortForward(goctx gocontext.Context, w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) {
func ServePortForward(ctx context.Context, w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) {
// TODO: support web socket stream.
err := handleHTTPStreams(goctx, w, req, portForwarder, podName, idleTimeout, streamCreationTimeout, supportedProtocols)
err := handleHTTPStreams(ctx, w, req, portForwarder, podName, idleTimeout, streamCreationTimeout, supportedProtocols)
if err != nil {
logrus.Errorf("failed to serve port forward: %v", err)
return
Expand Down
22 changes: 11 additions & 11 deletions cri/stream/remotecommand/attach.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package remotecommand

import (
gocontext "context"
"context"
"fmt"
"net/http"
"time"
Expand All @@ -10,30 +10,30 @@ import (
// Attacher knows how to attach a running container in a pod.
type Attacher interface {
// Attach attaches to the running container in the pod.
Attach(goctx gocontext.Context, containerID string, streamOpts *Options, streams *Streams) error
Attach(ctx context.Context, containerID string, streamOpts *Options, streams *Streams) error
}

// ServeAttach handles requests to attach to a container. After creating/receiving the required
// streams, it delegates the actual attaching to attacher.
func ServeAttach(goctx gocontext.Context, w http.ResponseWriter, req *http.Request, attacher Attacher, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
ctx, ok := createStreams(w, req, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
func ServeAttach(ctx context.Context, w http.ResponseWriter, req *http.Request, attacher Attacher, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
streamCtx, ok := createStreams(w, req, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
if !ok {
// Error is handled by createStreams.
return
}
defer ctx.conn.Close()
defer streamCtx.conn.Close()

err := attacher.Attach(goctx, container, streamOpts, &Streams{
err := attacher.Attach(ctx, container, streamOpts, &Streams{
StreamCh: make(chan struct{}, 1),
StdinStream: ctx.stdinStream,
StdoutStream: ctx.stdoutStream,
StderrStream: ctx.stderrStream,
StdinStream: streamCtx.stdinStream,
StdoutStream: streamCtx.stdoutStream,
StderrStream: streamCtx.stderrStream,
})
if err != nil {
err = fmt.Errorf("error attaching to container: %v", err)
ctx.writeStatus(NewInternalError(err))
streamCtx.writeStatus(NewInternalError(err))
} else {
ctx.writeStatus(&StatusError{ErrStatus: Status{
streamCtx.writeStatus(&StatusError{ErrStatus: Status{
Status: StatusSuccess,
}})
}
Expand Down
24 changes: 12 additions & 12 deletions cri/stream/remotecommand/exec.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package remotecommand

import (
gocontext "context"
"context"
"fmt"
"net/http"
"time"
Expand All @@ -10,30 +10,30 @@ import (
// Executor knows how to execute a command in a container of the pod.
type Executor interface {
// Exec executes a command in a container of the pod.
Exec(goctx gocontext.Context, containerID string, cmd []string, streamOpts *Options, streams *Streams) (uint32, error)
Exec(ctx context.Context, containerID string, cmd []string, streamOpts *Options, streams *Streams) (uint32, error)
}

// ServeExec handles requests to execute a command in a container. After
// creating/receiving the required streams, it delegates the actual execution
// to the executor.
func ServeExec(goctx gocontext.Context, w http.ResponseWriter, req *http.Request, executor Executor, container string, cmd []string, streamOpts *Options, supportedProtocols []string, idleTimeout time.Duration, streamCreationTimeout time.Duration) {
ctx, ok := createStreams(w, req, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
func ServeExec(ctx context.Context, w http.ResponseWriter, req *http.Request, executor Executor, container string, cmd []string, streamOpts *Options, supportedProtocols []string, idleTimeout time.Duration, streamCreationTimeout time.Duration) {
streamCtx, ok := createStreams(w, req, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
if !ok {
// Error is handled by createStreams.
return
}
defer ctx.conn.Close()
defer streamCtx.conn.Close()

exitCode, err := executor.Exec(goctx, container, cmd, streamOpts, &Streams{
StdinStream: ctx.stdinStream,
StdoutStream: ctx.stdoutStream,
StderrStream: ctx.stderrStream,
exitCode, err := executor.Exec(ctx, container, cmd, streamOpts, &Streams{
StdinStream: streamCtx.stdinStream,
StdoutStream: streamCtx.stdoutStream,
StderrStream: streamCtx.stderrStream,
})
if err != nil {
err = fmt.Errorf("error executing command in container: %v", err)
ctx.writeStatus(NewInternalError(err))
streamCtx.writeStatus(NewInternalError(err))
} else if exitCode != 0 {
ctx.writeStatus(&StatusError{ErrStatus: Status{
streamCtx.writeStatus(&StatusError{ErrStatus: Status{
Status: StatusFailure,
Reason: NonZeroExitCodeReason,
Details: &StatusDetails{
Expand All @@ -47,7 +47,7 @@ func ServeExec(goctx gocontext.Context, w http.ResponseWriter, req *http.Request
Message: fmt.Sprintf("command terminated with non-zero exit code"),
}})
} else {
ctx.writeStatus(&StatusError{ErrStatus: Status{
streamCtx.writeStatus(&StatusError{ErrStatus: Status{
Status: StatusSuccess,
}})
}
Expand Down
18 changes: 9 additions & 9 deletions cri/stream/remotecommand/httpstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Streams struct {

// context contains the connection and streams used when
// forwarding an attach or execute session into a container.
type context struct {
type streamContext struct {
conn io.Closer
stdinStream io.ReadCloser
stdoutStream io.WriteCloser
Expand All @@ -55,8 +55,8 @@ type streamAndReply struct {
replySent <-chan struct{}
}

func createStreams(w http.ResponseWriter, req *http.Request, opts *Options, supportedStreamProtocols []string, idleTimeout time.Duration, streamCreationTimeout time.Duration) (*context, bool) {
var ctx *context
func createStreams(w http.ResponseWriter, req *http.Request, opts *Options, supportedStreamProtocols []string, idleTimeout time.Duration, streamCreationTimeout time.Duration) (*streamContext, bool) {
var ctx *streamContext
var ok bool
if wsstream.IsWebSocketRequest(req) {
ctx, ok = createWebSocketStreams(w, req, opts, idleTimeout)
Expand All @@ -72,7 +72,7 @@ func createStreams(w http.ResponseWriter, req *http.Request, opts *Options, supp
return ctx, true
}

func createHTTPStreamStreams(w http.ResponseWriter, req *http.Request, opts *Options, supportedStreamProtocols []string, idleTimeout time.Duration, streamCreationTimeout time.Duration) (*context, bool) {
func createHTTPStreamStreams(w http.ResponseWriter, req *http.Request, opts *Options, supportedStreamProtocols []string, idleTimeout time.Duration, streamCreationTimeout time.Duration) (*streamContext, bool) {
protocol, err := httpstream.Handshake(w, req, supportedStreamProtocols)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -148,14 +148,14 @@ func waitStreamReply(replySent <-chan struct{}, notify chan<- struct{}, stop <-c
type protocolHandler interface {
// waitForStreams waits for the expected streams or a timeout, returning a
// remoteCommandContext if all the streams were received, or an error if not.
waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error)
waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*streamContext, error)
}

// v2ProtocolHandler implements the V2 protocol version for streaming command execution.
type v2ProtocolHandler struct{}

func (*v2ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
ctx := &context{}
func (*v2ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*streamContext, error) {
ctx := &streamContext{}
receivedStreams := 0
replyChan := make(chan struct{})
stop := make(chan struct{})
Expand Down Expand Up @@ -199,8 +199,8 @@ WaitForStreams:
// v1ProtocolHandler implements the V1 protocol version for streaming command execution.
type v1ProtocolHandler struct{}

func (*v1ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
ctx := &context{}
func (*v1ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*streamContext, error) {
ctx := &streamContext{}
receivedStreams := 0
replyChan := make(chan struct{})
stop := make(chan struct{})
Expand Down
4 changes: 2 additions & 2 deletions cri/stream/remotecommand/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func writeChannel(real bool) wsstream.ChannelType {

// createWebSocketStreams returns a context containing the websocket connection and
// streams needed to perform an exec or an attach.
func createWebSocketStreams(w http.ResponseWriter, req *http.Request, opts *Options, idleTimeout time.Duration) (*context, bool) {
func createWebSocketStreams(w http.ResponseWriter, req *http.Request, opts *Options, idleTimeout time.Duration) (*streamContext, bool) {
channels := createChannels(opts)
conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{
"": {
Expand Down Expand Up @@ -95,7 +95,7 @@ func createWebSocketStreams(w http.ResponseWriter, req *http.Request, opts *Opti
streams[errorChannel].Write([]byte{})
}

ctx := &context{
ctx := &streamContext{
conn: conn,
stdinStream: streams[stdinChannel],
stdoutStream: streams[stdoutChannel],
Expand Down

0 comments on commit 852541f

Please sign in to comment.