Skip to content

Commit

Permalink
Changing the allocator API to gRPC (#1314)
Browse files Browse the repository at this point in the history
Co-authored-by: Mark Mandel <mark.mandel@gmail.com>
  • Loading branch information
pooneh-m and markmandel authored Feb 10, 2020
1 parent bb05ab0 commit 8da5d31
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 345 deletions.
167 changes: 67 additions & 100 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
package main

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"path/filepath"
Expand All @@ -39,13 +39,17 @@ import (
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
"github.com/heptiolabs/healthcheck"
"github.com/pkg/errors"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/stats/view"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"gopkg.in/fsnotify.v1"
k8serror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
Expand All @@ -72,9 +76,6 @@ func init() {
registerMetricViews()
}

// A handler for the web server
type handler func(w http.ResponseWriter, r *http.Request)

func main() {
conf := parseEnvFlags()

Expand Down Expand Up @@ -103,22 +104,17 @@ func main() {

h := newServiceHandler(kubeClient, agonesClient, health)

// mux for https server to serve gameserver allocations
httpsMux := http.NewServeMux()
httpsMux.HandleFunc("/v1alpha1/gameserverallocation", h.postOnly(h.allocateHandler))

// creates a new file watcher for client certificate folder
watcher, _ := fsnotify.NewWatcher()
defer watcher.Close() // nolint: errcheck
if err := watcher.Add(certDir); err != nil {
logger.WithError(err).Fatalf("cannot watch folder %s for secret changes", certDir)
}

tlsCer, err := tls.LoadX509KeyPair(tlsDir+"tls.crt", tlsDir+"tls.key")
listener, err := net.Listen("tcp", fmt.Sprintf(":%s", sslPort))
if err != nil {
logger.WithError(err).Fatal("server TLS could not be loaded")
logger.WithError(err).Fatalf("failed to listen on TCP port %s", sslPort)
}
caCertPool := loadCACertPool()

// Watching for the events in certificate directory for updating certificates, when there is a change
go func() {
Expand All @@ -127,7 +123,11 @@ func main() {
// watch for events
case event := <-watcher.Events:
h.certMutex.Lock()
caCertPool = loadCACertPool()
caCertPool, err := getCACertPool(certDir)
if err != nil {
logger.WithError(err).Error("could not load CA certs.")
}
h.caCertPool = caCertPool
logger.Infof("Certificate directory change event %v", event)
h.certMutex.Unlock()

Expand All @@ -138,33 +138,14 @@ func main() {
}
}()

cfg := &tls.Config{
Certificates: []tls.Certificate{tlsCer},
ClientAuth: tls.RequireAndVerifyClientCert,
GetConfigForClient: func(*tls.ClientHelloInfo) (*tls.Config, error) {
h.certMutex.RLock()
defer h.certMutex.RUnlock()
return &tls.Config{
Certificates: []tls.Certificate{tlsCer},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: caCertPool,
}, nil
},
}
opts := h.getServerOptions()

srv := &http.Server{
Addr: ":" + sslPort,
TLSConfig: cfg,
// add http OC metrics (opencensus.io/http/server/*)
Handler: &ochttp.Handler{
Handler: httpsMux,
},
}
grpcServer := grpc.NewServer(opts...)
pb.RegisterAllocationServiceServer(grpcServer, h)

// listen on https to serve allocations
// serve GRPC for allocation
go func() {
// The certs are set on the config so passing empty as the cert path
err := srv.ListenAndServeTLS("", "")
err := grpcServer.Serve(listener)
logger.WithError(err).Fatal("allocation service crashed")
os.Exit(1)
}()
Expand All @@ -175,15 +156,7 @@ func main() {
logger.WithError(err).Fatal("allocation service crashed")
}

func loadCACertPool() *x509.CertPool {
caCertPool, err := getCACertPool(certDir)
if err != nil {
logger.WithError(err).Fatal("could not get CA certs")
}
return caCertPool
}

func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler) *httpHandler {
func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler) *serviceHandler {
defaultResync := 30 * time.Second
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)
Expand All @@ -196,7 +169,7 @@ func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.I
gameserverallocations.NewReadyGameServerCache(agonesInformerFactory.Agones().V1().GameServers(), agonesClient.AgonesV1(), gsCounter, health))

stop := signals.NewStopChannel()
h := httpHandler{
h := serviceHandler{
allocationCallback: func(gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) {
return allocator.Allocate(gsa, stop)
},
Expand All @@ -208,9 +181,40 @@ func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.I
logger.WithError(err).Fatal("starting allocator failed.")
}

caCertPool, err := getCACertPool(certDir)
if err != nil {
logger.WithError(err).Fatal("could not load CA certs.")
}
h.caCertPool = caCertPool

return &h
}

// getServerOptions returns a list of GRPC server options.
// Current options are TLS certs and opencensus stats handler.
func (h *serviceHandler) getServerOptions() []grpc.ServerOption {
tlsCer, err := tls.LoadX509KeyPair(tlsDir+"tls.crt", tlsDir+"tls.key")
if err != nil {
logger.WithError(err).Fatal("failed to generate credentials")
}

cfg := &tls.Config{
Certificates: []tls.Certificate{tlsCer},
ClientAuth: tls.RequireAndVerifyClientCert,
GetConfigForClient: func(*tls.ClientHelloInfo) (*tls.Config, error) {
h.certMutex.RLock()
defer h.certMutex.RUnlock()
return &tls.Config{
Certificates: []tls.Certificate{tlsCer},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: h.caCertPool,
}, nil
},
}
// Add options for creds and OpenCensus stats handler to enable stats and tracing.
return []grpc.ServerOption{grpc.Creds(credentials.NewTLS(cfg)), grpc.StatsHandler(&ocgrpc.ServerHandler{})}
}

// Set up our client which we will use to call the API
func getClients() (*kubernetes.Clientset, *versioned.Clientset, error) {
// Create the in-cluster config
Expand Down Expand Up @@ -249,7 +253,7 @@ func getCACertPool(path string) (*x509.CertPool, error) {
certFile := filepath.Join(path, file.Name())
caCert, err := ioutil.ReadFile(certFile)
if err != nil {
logger.Errorf("ca cert is not readable or missing: %s", err.Error())
logger.Errorf("CA cert is not readable or missing: %s", err.Error())
continue
}
if !caCertPool.AppendCertsFromPEM(caCert) {
Expand All @@ -262,72 +266,35 @@ func getCACertPool(path string) (*x509.CertPool, error) {
return caCertPool, nil
}

// Limit verbs the web server handles
func (h *httpHandler) postOnly(in handler) handler {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
in(w, r)
return
}
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
}

type httpHandler struct {
type serviceHandler struct {
allocationCallback func(*allocationv1.GameServerAllocation) (k8sruntime.Object, error)
certMutex sync.RWMutex
caCertPool *x509.CertPool
}

func (h *httpHandler) allocateHandler(w http.ResponseWriter, r *http.Request) {
request := pb.AllocationRequest{}
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
http.Error(w, "invalid request", http.StatusBadRequest)
logger.WithError(err).Info("bad request")
return
}
logger.WithField("request", request).Infof("allocation request received")

gsa := converters.ConvertAllocationRequestV1Alpha1ToGSAV1(&request)
// PostAllocate implements the PostAllocate gRPC method definition
func (h *serviceHandler) PostAllocate(ctx context.Context, in *pb.AllocationRequest) (*pb.AllocationResponse, error) {
logger.WithField("request", in).Infof("allocation request received.")
gsa := converters.ConvertAllocationRequestV1Alpha1ToGSAV1(in)
resultObj, err := h.allocationCallback(gsa)
if err != nil {
http.Error(w, err.Error(), httpCode(err))
logger.WithField("gsa", gsa).WithError(err).Info("allocation failed")
return
return nil, err
}

w.Header().Set("Content-Type", "application/json")
if status, ok := resultObj.(*metav1.Status); ok {
w.WriteHeader(int(status.Code))
err = json.NewEncoder(w).Encode(status)
if err != nil {
http.Error(w, "internal server error", http.StatusInternalServerError)
logger.WithError(err).Error("Unable to encode status in json")
return
}
if s, ok := resultObj.(*metav1.Status); ok {
return nil, status.Errorf(codes.Code(s.Code), s.Message, resultObj)
}

allocatedGsa, ok := resultObj.(*allocationv1.GameServerAllocation)
if !ok {
http.Error(w, "internal server error", http.StatusInternalServerError)
logger.Errorf("internal server error - Bad GSA format %v", resultObj)
return
return nil, status.Errorf(codes.Internal, "internal server error- Bad GSA format %v", resultObj)
}
response := converters.ConvertGSAV1ToAllocationResponseV1Alpha1(allocatedGsa)
logger.WithField("response", response).Infof("allocation response is being sent")

err = json.NewEncoder(w).Encode(response)
if err != nil {
http.Error(w, "internal server error", http.StatusInternalServerError)
logger.WithError(err).Error("Unable to encode status in json")
return
}
}

func httpCode(err error) int {
code := http.StatusInternalServerError
if t, ok := err.(k8serror.APIStatus); ok {
code = int(t.Status().Code)
}
return code
return response, nil
}

type config struct {
Expand Down Expand Up @@ -370,7 +337,7 @@ func parseEnvFlags() config {
}

func registerMetricViews() {
if err := view.Register(ochttp.DefaultServerViews...); err != nil {
if err := view.Register(ocgrpc.DefaultServerViews...); err != nil {
logger.WithError(err).Error("could not register view")
}
}
Expand Down
Loading

0 comments on commit 8da5d31

Please sign in to comment.