Skip to content

Commit

Permalink
Better error handling for Tap (#177)
Browse files Browse the repository at this point in the history
Previously, running `$conduit tap` would return a `Unexpected EOF` error when the server wasn't available. This was due to a few problems with the way we were handling errors all the way down the tap server. This change fixes that and cleans some of the protobuf-over-HTTP code.

- first step towards #49
- closes #106
  • Loading branch information
pcalcado authored Jan 25, 2018
1 parent d0a0bb2 commit 9410da4
Show file tree
Hide file tree
Showing 13 changed files with 1,176 additions and 414 deletions.
2 changes: 1 addition & 1 deletion cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func init() {

func addControlPlaneNetworkingArgs(cmd *cobra.Command) {
// Use the same argument name as `kubectl` (see the output of `kubectl options`).
//TODO: move these to init() as they are globally applicable
cmd.PersistentFlags().StringVar(&kubeconfigPath, "kubeconfig", "", "Path to the kubeconfig file to use for CLI requests")

cmd.PersistentFlags().StringVar(&apiAddr, "api-addr", "", "Override kubeconfig and communicate directly with the control plane at host:port (mostly for testing)")
}

Expand Down
6 changes: 3 additions & 3 deletions cli/cmd/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var tapCmd = &cobra.Command{
Valid targets include:
* Pods (default/hello-world-h4fb2)
* Deployments (default/hello-world)`,
RunE: func(cmd *cobra.Command, args []string) error {
Run: exitSilentlyOnError(func(cmd *cobra.Command, args []string) error {
if len(args) != 2 {
return errors.New("please specify a target")
}
Expand Down Expand Up @@ -68,7 +68,7 @@ Valid targets include:
}

return requestTapFromApi(os.Stdout, client, args[1], validatedResourceType, partialReq)
},
}),
}

func init() {
Expand Down Expand Up @@ -102,6 +102,7 @@ func requestTapFromApi(w io.Writer, client pb.ApiClient, targetName string, reso

rsp, err := client.Tap(context.Background(), req)
if err != nil {
fmt.Fprintln(w, err.Error())
return err
}

Expand All @@ -117,7 +118,6 @@ func renderTap(w io.Writer, tapClient pb.Api_TapClient) error {
tableWriter.Flush()

return nil

}

func writeTapEventsToBuffer(tapClient pb.Api_TapClient, w *tabwriter.Writer) error {
Expand Down
137 changes: 65 additions & 72 deletions controller/api/public/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package public
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"net/http"
"net/url"

Expand All @@ -23,10 +21,7 @@ import (
const (
ApiRoot = "/" // Must be absolute (with a leading slash).
ApiVersion = "v1"
JsonContentType = "application/json"
ApiPrefix = "api/" + ApiVersion + "/" // Must be relative (without a leading slash).
ProtobufContentType = "application/octet-stream"
ErrorHeader = "conduit-error"
ConduitApiSubsystemName = "conduit-api"
)

Expand All @@ -35,11 +30,6 @@ type grpcOverHttpClient struct {
httpClient *http.Client
}

type tapClient struct {
ctx context.Context
reader *bufio.Reader
}

func (c *grpcOverHttpClient) Stat(ctx context.Context, req *pb.MetricRequest, _ ...grpc.CallOption) (*pb.MetricResponse, error) {
var msg pb.MetricResponse
err := c.apiRequest(ctx, "Stat", req, &msg)
Expand All @@ -66,54 +56,47 @@ func (c *grpcOverHttpClient) ListPods(ctx context.Context, req *pb.Empty, _ ...g

func (c *grpcOverHttpClient) Tap(ctx context.Context, req *pb.TapRequest, _ ...grpc.CallOption) (pb.Api_TapClient, error) {
url := c.endpointNameToPublicApiUrl("Tap")
log.Debugf("Making streaming gRPC-over-HTTP call to [%s]", url.String())
rsp, err := c.post(ctx, url, req)
httpRsp, err := c.post(ctx, url, req)
if err != nil {
return nil, err
}

if err = checkIfResponseHasConduitError(httpRsp); err != nil {
httpRsp.Body.Close()
return nil, err
}

go func() {
<-ctx.Done()
log.Debug("Closing response body after context marked as done")
rsp.Body.Close()
httpRsp.Body.Close()
}()

return &tapClient{ctx: ctx, reader: bufio.NewReader(rsp.Body)}, nil
return &tapClient{ctx: ctx, reader: bufio.NewReader(httpRsp.Body)}, nil
}
func (c tapClient) Recv() (*common.TapEvent, error) {
var msg common.TapEvent
err := fromByteStreamToProtocolBuffers(c.reader, "", &msg)
return &msg, err
}

// satisfy the pb.Api_TapClient interface
func (c tapClient) Header() (metadata.MD, error) { return nil, nil }
func (c tapClient) Trailer() metadata.MD { return nil }
func (c tapClient) CloseSend() error { return nil }
func (c tapClient) Context() context.Context { return c.ctx }
func (c tapClient) SendMsg(interface{}) error { return nil }
func (c tapClient) RecvMsg(interface{}) error { return nil }

func (c *grpcOverHttpClient) apiRequest(ctx context.Context, endpoint string, req proto.Message, rsp proto.Message) error {
func (c *grpcOverHttpClient) apiRequest(ctx context.Context, endpoint string, req proto.Message, protoResponse proto.Message) error {
url := c.endpointNameToPublicApiUrl(endpoint)

log.Debugf("Making gRPC-over-HTTP call to [%s]", url.String())
httpRsp, err := c.post(ctx, url, req)
if err != nil {
return err
}

defer httpRsp.Body.Close()
log.Debugf("gRPC-over-HTTP call returned status [%s] and content length [%d]", httpRsp.Status, httpRsp.ContentLength)

clientSideErrorStatusCode := httpRsp.StatusCode >= 400 && httpRsp.StatusCode <= 499
if clientSideErrorStatusCode {
return fmt.Errorf("POST to Conduit API endpoint [%s] returned HTTP status [%s]", url, httpRsp.Status)
}

defer httpRsp.Body.Close()
if err = checkIfResponseHasConduitError(httpRsp); err != nil {
return err
}

reader := bufio.NewReader(httpRsp.Body)
errorMsg := httpRsp.Header.Get(ErrorHeader)
return fromByteStreamToProtocolBuffers(reader, errorMsg, rsp)
return fromByteStreamToProtocolBuffers(reader, protoResponse)
}

func (c *grpcOverHttpClient) post(ctx context.Context, url *url.URL, req proto.Message) (*http.Response, error) {
Expand All @@ -131,35 +114,51 @@ func (c *grpcOverHttpClient) post(ctx context.Context, url *url.URL, req proto.M
return nil, err
}

return c.httpClient.Do(httpReq.WithContext(ctx))
rsp, err := c.httpClient.Do(httpReq.WithContext(ctx))
if err != nil {
log.Debugf("Error invoking [%s]: %v", url.String(), err)
} else {
log.Debugf("Response from [%s] had headers: %v", url.String(), rsp.Header)
}

return rsp, err
}

func (c *grpcOverHttpClient) endpointNameToPublicApiUrl(endpoint string) *url.URL {
return c.serverURL.ResolveReference(&url.URL{Path: endpoint})
}

func NewInternalClient(kubernetesApiHost string) (pb.ApiClient, error) {
apiURL := &url.URL{
Scheme: "http",
Host: kubernetesApiHost,
Path: "/",
}
type tapClient struct {
ctx context.Context
reader *bufio.Reader
}

return newClient(apiURL, http.DefaultClient)
func (c tapClient) Recv() (*common.TapEvent, error) {
var msg common.TapEvent
err := fromByteStreamToProtocolBuffers(c.reader, &msg)
return &msg, err
}

func NewExternalClient(controlPlaneNamespace string, kubeApi k8s.KubernetesApi) (pb.ApiClient, error) {
apiURL, err := kubeApi.UrlFor(controlPlaneNamespace, "/services/http:api:http/proxy/")
// satisfy the pb.Api_TapClient interface
func (c tapClient) Header() (metadata.MD, error) { return nil, nil }
func (c tapClient) Trailer() metadata.MD { return nil }
func (c tapClient) CloseSend() error { return nil }
func (c tapClient) Context() context.Context { return c.ctx }
func (c tapClient) SendMsg(interface{}) error { return nil }
func (c tapClient) RecvMsg(interface{}) error { return nil }

func fromByteStreamToProtocolBuffers(byteStreamContainingMessage *bufio.Reader, out proto.Message) error {
messageAsBytes, err := deserializePayloadFromReader(byteStreamContainingMessage)
if err != nil {
return nil, err
return fmt.Errorf("error reading byte stream header: %v", err)
}

httpClientToUse, err := kubeApi.NewClient()
err = proto.Unmarshal(messageAsBytes, out)
if err != nil {
return nil, err
return fmt.Errorf("error unmarshalling array of [%d] bytes error: %v", len(messageAsBytes), err)
}

return newClient(apiURL, httpClientToUse)
return nil
}

func newClient(apiURL *url.URL, httpClientToUse *http.Client) (pb.ApiClient, error) {
Expand All @@ -168,42 +167,36 @@ func newClient(apiURL *url.URL, httpClientToUse *http.Client) (pb.ApiClient, err
return nil, fmt.Errorf("server URL must be absolute, was [%s]", apiURL.String())
}

serverUrl := apiURL.ResolveReference(&url.URL{Path: ApiPrefix})

log.Debugf("Expecting Conduit Public API to be served over [%s]", serverUrl)

return &grpcOverHttpClient{
serverURL: apiURL.ResolveReference(&url.URL{Path: ApiPrefix}),
serverURL: serverUrl,
httpClient: httpClientToUse,
}, nil
}

func fromByteStreamToProtocolBuffers(byteStreamContainingMessage *bufio.Reader, errorMessageReturnedAsMetadata string, out proto.Message) error {
//TODO: why the magic number 4?
byteSize := make([]byte, 4)

//TODO: why is this necessary?
_, err := byteStreamContainingMessage.Read(byteSize)
if err != nil {
return fmt.Errorf("error reading byte stream header: %v", err)
func NewInternalClient(kubernetesApiHost string) (pb.ApiClient, error) {
apiURL := &url.URL{
Scheme: "http",
Host: kubernetesApiHost,
Path: "/",
}

size := binary.LittleEndian.Uint32(byteSize)
bytes := make([]byte, size)
_, err = io.ReadFull(byteStreamContainingMessage, bytes)
if err != nil {
return fmt.Errorf("error reading byte stream content: %v", err)
}
return newClient(apiURL, http.DefaultClient)
}

if errorMessageReturnedAsMetadata != "" {
var apiError pb.ApiError
err = proto.Unmarshal(bytes, &apiError)
if err != nil {
return fmt.Errorf("error unmarshalling error from byte stream: %v", err)
}
return fmt.Errorf("%s: %s", errorMessageReturnedAsMetadata, apiError.Error)
func NewExternalClient(controlPlaneNamespace string, kubeApi k8s.KubernetesApi) (pb.ApiClient, error) {
apiURL, err := kubeApi.UrlFor(controlPlaneNamespace, "/services/http:api:http/proxy/")
if err != nil {
return nil, err
}

err = proto.Unmarshal(bytes, out)
httpClientToUse, err := kubeApi.NewClient()
if err != nil {
return fmt.Errorf("error unmarshalling bytes: %v", err)
} else {
return nil
return nil, err
}

return newClient(apiURL, httpClientToUse)
}
Loading

0 comments on commit 9410da4

Please sign in to comment.