Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WebSocket support and e2e test to verify. #3240

Merged
merged 2 commits into from
Feb 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ limitations under the License.
package handler

import (
"bufio"
"fmt"
"net"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"time"

"github.com/knative/pkg/websocket"
"github.com/knative/serving/pkg/activator"
"github.com/knative/serving/pkg/activator/util"
pkghttp "github.com/knative/serving/pkg/http"
Expand Down Expand Up @@ -116,3 +119,10 @@ func (s *statusCapture) WriteHeader(statusCode int) {
s.statusCode = statusCode
s.ResponseWriter.WriteHeader(statusCode)
}

// Hijack calls Hijack() on the wrapped http.ResponseWriter if it implements
// http.Hijacker interface, which is required for net/http/httputil/reverseproxy
// to handle connection upgrade/switching protocol. Otherwise returns an error.
func (s *statusCapture) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return websocket.HijackIfPossible(s.ResponseWriter)
}
11 changes: 11 additions & 0 deletions pkg/queue/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ limitations under the License.
package queue

import (
"bufio"
"context"
"io"
"net"
"net/http"
"sync"
"time"

"github.com/knative/pkg/websocket"
)

var defaultTimeoutBody = "<html><head><title>Timeout</title></head><body><h1>Timeout</h1></body></html>"
Expand Down Expand Up @@ -120,6 +124,13 @@ type timeoutWriter struct {

var _ http.ResponseWriter = (*timeoutWriter)(nil)

// Hijack calls Hijack() on the wrapped http.ResponseWriter if it implements
// http.Hijacker interface, which is required for net/http/httputil/reverseproxy
// to handle connection upgrade/switching protocol. Otherwise returns an error.
func (tw *timeoutWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return websocket.HijackIfPossible(tw.w)
}

func (tw *timeoutWriter) Header() http.Header { return tw.w.Header() }

func (tw *timeoutWriter) Write(p []byte) (int, error) {
Expand Down
151 changes: 151 additions & 0 deletions test/e2e/websocket_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// +build e2e

/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
"bytes"
"fmt"
"net/http"
"net/url"
"testing"
"time"

"github.com/gorilla/websocket"
"github.com/knative/pkg/test/logging"
"github.com/knative/serving/test"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)

const (
connectRetryInterval = 1 * time.Second
connectTimeout = 6 * time.Minute
)

// connect attempts to establish WebSocket connection with the Service.
// It will retry until reaching `connectTimeout` duration.
func connect(logger *logging.BaseLogger, ingressIP string, domain string) (*websocket.Conn, error) {
u := url.URL{Scheme: "ws", Host: ingressIP, Path: "/"}
var conn *websocket.Conn
waitErr := wait.PollImmediate(connectRetryInterval, connectTimeout, func() (bool, error) {
logger.Infof("Connecting using websocket: url=%s, host=%s", u.String(), domain)
c, resp, err := websocket.DefaultDialer.Dial(u.String(), http.Header{"Host": []string{domain}})
if err == nil {
logger.Info("WebSocket connection established.")
conn = c
return true, nil
}
if resp == nil {
// We don't have an HTTP response, probably TCP errors.
logger.Infof("Connection failed: %v", err)
return false, nil
}
body := &bytes.Buffer{}
defer resp.Body.Close()
if _, readErr := body.ReadFrom(resp.Body); readErr != nil {
logger.Infof("Connection failed: %v. Failed to read HTTP response: %v", err, readErr)
return false, nil
}
logger.Infof("HTTP connection failed: %v. Response=%+v. ResponseBody=%q", err, resp, body.String())
return false, nil
})
return conn, waitErr
}

// While we do have similar logic in knative/pkg, it is deeply buried
// inside the SpoofClient which is very HTTP centric.
//
// TODO(tcnghia): Extract the GatewayIP logic out from SpoofClient.
// Also, we should deduce this information from the child
// ClusterIngress's Status.
func getGatewayIP(kube *kubernetes.Clientset) (string, error) {
const ingressName = "istio-ingressgateway"
const ingressNamespace = "istio-system"

ingress, err := kube.CoreV1().Services(ingressNamespace).Get(ingressName, metav1.GetOptions{})
if err != nil {
return "", err
}
if len(ingress.Status.LoadBalancer.Ingress) != 1 {
return "", fmt.Errorf("expected exactly one ingress load balancer, instead had %d: %s",
len(ingress.Status.LoadBalancer.Ingress), ingress.Status.LoadBalancer.Ingress)
}
if ingress.Status.LoadBalancer.Ingress[0].IP == "" {
return "", fmt.Errorf("expected ingress loadbalancer IP for %s to be set, instead was empty", ingressName)
}
return ingress.Status.LoadBalancer.Ingress[0].IP, nil
}

func validateWebSocketConnection(logger *logging.BaseLogger, clients *test.Clients, names test.ResourceNames) error {
// Get the gatewayIP.
gatewayIP, err := getGatewayIP(clients.KubeClient.Kube)
if err != nil {
return err
}

// Establish the websocket connection.
conn, err := connect(logger, gatewayIP, names.Domain)
if err != nil {
return err
}
defer conn.Close()

// Send a message.
const sent = "Hello, websocket"
logger.Infof("Sending message %q to server.", sent)
if err = conn.WriteMessage(websocket.TextMessage, []byte(sent)); err != nil {
return err
}
logger.Info("Message sent.")

// Read back the echoed message and compared with sent.
if _, recv, err := conn.ReadMessage(); err != nil {
return err
} else if sent != string(recv) {
return fmt.Errorf("expected to receive back the message: %q but received %q", sent, string(recv))
} else {
tcnghia marked this conversation as resolved.
Show resolved Hide resolved
logger.Infof("Received message %q from echo server.", recv)
}
return nil
}

// TestWebSocket (1) creates a service based on the `wsserver` image,
// (2) connects to the service using websocket, (3) sends a message, and
// (4) verifies that we receive back the same message.
func TestWebSocket(t *testing.T) {
logger = logging.GetContextLogger(t.Name())
clients = Setup(t)

names := test.ResourceNames{
Service: test.AppendRandomString("websocket-server-", logger),
Image: "wsserver",
}

// Clean up in both abnormal and normal exits.
defer TearDown(clients, names, logger)
test.CleanupOnInterrupt(func() { TearDown(clients, names, logger) }, logger)

// Setup a WebSocket server.
if _, err := test.CreateRunLatestServiceReady(logger, clients, &names, &test.Options{}); err != nil {
t.Fatalf("Failed to create WebSocket server: %v", err)
}

// Validate the websocket connection.
if err := validateWebSocketConnection(logger, clients, names); err != nil {
t.Error(err)
}
}
8 changes: 6 additions & 2 deletions test/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func CreateRunLatestServiceReady(logger *logging.BaseLogger, clients *Clients, n
return nil, fmt.Errorf("expected non-empty Service and Image name; got Service=%v, Image=%v", names.Service, names.Image)
}

logger.Info("Creating a new Service as RunLatest.")
logger.Infof("Creating a new Service %s as RunLatest.", names.Service)
svc, err := CreateLatestService(logger, clients, *names, options, fopt...)
if err != nil {
return nil, err
Expand All @@ -164,7 +164,11 @@ func CreateRunLatestServiceReady(logger *logging.BaseLogger, clients *Clients, n
}

logger.Info("Getting latest objects Created by Service.")
return GetResourceObjects(clients, *names)
resources, err := GetResourceObjects(clients, *names)
if err == nil {
logger.Infof("Successfully created Service %s.", names.Domain)
}
return resources, err
}

// CreateReleaseService creates a service in namespace with the name names.Service and names.Image,
Expand Down
11 changes: 11 additions & 0 deletions test/test_images/wsserver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Echo WebSocket test image

A simple WebSocket server adapted from
https://github.com/gorilla/WebSocket/blob/master/examples/echo/server.go . The
server simply echoes messages sent to it. We use this server in testing that
all our proxies on request path can handle WebSocket upgrades.

## Building

For details about building and adding new images, see the
[section about test images](/test/README.md#test-images).
69 changes: 69 additions & 0 deletions test/test_images/wsserver/echo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main

import (
"flag"
"log"
"net/http"

"github.com/gorilla/websocket"
"github.com/knative/serving/test"
)

var addr = flag.String("addr", "localhost:8080", "http service address")

var upgrader = websocket.Upgrader{
// Allow any origin, since we are spoofing requests anyway.
CheckOrigin: func(r *http.Request) bool {
return true
},
}

func handler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("Error upgrading websocket:", err)
return
}
defer conn.Close()
log.Println("Connection upgraded to WebSocket. Entering receive loop.")
for {
messageType, message, err := conn.ReadMessage()
if err != nil {
// We close abnormally, because we're just closing the connection in the client,
// which is okay. There's no value delaying closure of the connection unnecessarily.
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) {
log.Println("Client disconnected.")
} else {
log.Println("Handler exiting on error:", err)
}
return
}
log.Printf("Successfully received: %q", message)
if err = conn.WriteMessage(messageType, message); err != nil {
log.Println("Failed to write message:", err)
return
}
log.Printf("Successfully wrote: %q", message)
}
}

func main() {
flag.Parse()
log.SetFlags(0)
test.ListenAndServeGracefully(*addr, handler)
}
25 changes: 25 additions & 0 deletions test/test_images/wsserver/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2019 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
name: websocket-server
namespace: default
spec:
runLatest:
configuration:
revisionTemplate:
spec:
container:
image: github.com/knative/serving/test/test_images/wsserver