Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove improbable grpcWeb and cmux. #6190

Merged
merged 2 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions cmd/kubeapps-apis/core/plugins/v1alpha1/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ const (
// a plugin's RegisterWithGRPCServer function must accept. This allows
// the arguments to be defined (or modified) in the one place.
type GRPCPluginRegistrationOptions struct {
Registrar grpc.ServiceRegistrar
ConfigGetter core.KubernetesConfigGetter
ClustersConfig kube.ClustersConfig
PluginConfigPath string
Expand Down Expand Up @@ -72,7 +71,7 @@ type PluginsServer struct {
clustersConfig kube.ClustersConfig
}

func NewPluginsServer(serveOpts core.ServeOptions, registrar grpc.ServiceRegistrar, gwArgs core.GatewayHandlerArgs, mux *http.ServeMux) (*PluginsServer, error) {
func NewPluginsServer(serveOpts core.ServeOptions, gwArgs core.GatewayHandlerArgs, mux *http.ServeMux) (*PluginsServer, error) {
// Store the serveOptions in the global 'pluginsServeOpts' variable

// Find all .so plugins in the specified plugins directory.
Expand All @@ -90,7 +89,7 @@ func NewPluginsServer(serveOpts core.ServeOptions, registrar grpc.ServiceRegistr
}
ps.clustersConfig = clustersConfig

err = ps.registerPlugins(pluginPaths, registrar, gwArgs, serveOpts, mux)
err = ps.registerPlugins(pluginPaths, gwArgs, serveOpts, mux)
if err != nil {
return nil, fmt.Errorf("failed to register plugins: %w", err)
}
Expand Down Expand Up @@ -122,7 +121,7 @@ func (s *PluginsServer) GetConfiguredPlugins(ctx context.Context, in *connect.Re
}

// registerPlugins opens each plugin, looks up the register function and calls it with the registrar.
func (s *PluginsServer) registerPlugins(pluginPaths []string, grpcReg grpc.ServiceRegistrar, gwArgs core.GatewayHandlerArgs, serveOpts core.ServeOptions, mux *http.ServeMux) error {
func (s *PluginsServer) registerPlugins(pluginPaths []string, gwArgs core.GatewayHandlerArgs, serveOpts core.ServeOptions, mux *http.ServeMux) error {
pluginsWithServers := []PluginWithServer{}

configGetter, err := createConfigGetter(serveOpts, s.clustersConfig)
Expand All @@ -141,7 +140,7 @@ func (s *PluginsServer) registerPlugins(pluginPaths []string, grpcReg grpc.Servi
return err
}

if grpcServer, err := s.registerGRPC(p, pluginDetail, grpcReg, configGetter, serveOpts, mux); err != nil {
if grpcServer, err := s.registerGRPC(p, pluginDetail, configGetter, serveOpts, mux); err != nil {
return err
} else {
pluginsWithServers = append(pluginsWithServers, PluginWithServer{
Expand All @@ -165,8 +164,7 @@ func (s *PluginsServer) registerPlugins(pluginPaths []string, grpcReg grpc.Servi
}

// registerGRPC finds and calls the required function for registering the plugin for the GRPC server.
func (s *PluginsServer) registerGRPC(p *plugin.Plugin, pluginDetail *plugins.Plugin, registrar grpc.ServiceRegistrar,
configGetter core.KubernetesConfigGetter, serveOpts core.ServeOptions, mux *http.ServeMux) (interface{}, error) {
func (s *PluginsServer) registerGRPC(p *plugin.Plugin, pluginDetail *plugins.Plugin, configGetter core.KubernetesConfigGetter, serveOpts core.ServeOptions, mux *http.ServeMux) (interface{}, error) {
grpcRegFn, err := p.Lookup(grpcRegisterFunction)
if err != nil {
return nil, fmt.Errorf("unable to lookup %q for %v: %w", grpcRegisterFunction, pluginDetail, err)
Expand All @@ -182,7 +180,6 @@ func (s *PluginsServer) registerGRPC(p *plugin.Plugin, pluginDetail *plugins.Plu
}

server, err := grpcFn(GRPCPluginRegistrationOptions{
Registrar: registrar,
ConfigGetter: configGetter,
ClustersConfig: s.clustersConfig,
PluginConfigPath: serveOpts.PluginConfigPath,
Expand Down
192 changes: 22 additions & 170 deletions cmd/kubeapps-apis/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@ package server

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/http/httputil"
"reflect"
"strconv"
"strings"
"time"

Expand All @@ -21,8 +17,6 @@ import (
"k8s.io/client-go/rest"

grpchealth "github.com/bufbuild/connect-grpchealth-go"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/soheilhy/cmux"

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/core"
Expand All @@ -34,7 +28,6 @@ import (
pluginsConnect "github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/gen/core/plugins/v1alpha1/v1alpha1connect"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
klogv2 "k8s.io/klog/v2"
Expand Down Expand Up @@ -80,63 +73,58 @@ func LogRequest(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo
// It runs the gRPC service, registering the configured plugins.
func Serve(serveOpts core.ServeOptions) error {
listenAddr := fmt.Sprintf(":%d", serveOpts.Port)
// Note: Currently transitioning from the un-maintained improbable-eng grpc library
// to the connect one. During the transition, some gRPC services are running on the
// improbable grpc server. Those calls are proxied through, but in a few PRs we'll have
// all services on the new server and can remove the proxy.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
grpcSrv, gwArgs, listenerCMux, err := createImprobableGRPCServer(ctx, listenAddr)

gw, err := gatewayMux()
if err != nil {
return fmt.Errorf("failed to create gRPC server: %w", err)
return fmt.Errorf("failed to create gRPC gateway: %w", err)
}

// The connect service handler automatically handles grpc-web, connect and
// grpc for us, so we won't need all the extra code below once all services
// have been transitioned to the new mux (and we can remove the use of cmux
// once connect is used for all requests).
// Note: we point the gateway at our *new* gRPC handler, so that we can continue to use
// the gateway for a ReST-ish API
gwArgs := core.GatewayHandlerArgs{
Ctx: ctx,
Mux: gw,
Addr: listenAddr,
DialOptions: []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())},
}

// During the transition we use the connect grpc mux by default and any unhandled paths
// are proxied to the old cmux handler's listener.
mux_connect := http.NewServeMux()
mux := http.NewServeMux()

// Create the core.plugins.v1alpha1 server which handles registration of
// plugins, and register it for both grpc and http.
pluginsServer, err := pluginsv1alpha1.NewPluginsServer(serveOpts, grpcSrv, gwArgs, mux_connect)
pluginsServer, err := pluginsv1alpha1.NewPluginsServer(serveOpts, gwArgs, mux)
if err != nil {
return fmt.Errorf("failed to initialize plugins server: %v", err)
}
if err := registerPluginsServiceServer(mux_connect, pluginsServer, gwArgs); err != nil {
if err := registerPluginsServiceServer(mux, pluginsServer, gwArgs); err != nil {
return fmt.Errorf("failed to register plugins server: %v", err)
}
if err := registerPackagesServiceServer(mux_connect, pluginsServer, gwArgs); err != nil {
if err := registerPackagesServiceServer(mux, pluginsServer, gwArgs); err != nil {
return err
}
if err := registerRepositoriesServiceServer(mux_connect, pluginsServer, gwArgs); err != nil {
if err := registerRepositoriesServiceServer(mux, pluginsServer, gwArgs); err != nil {
return err
}

// The gRPC Health checker reports on all connected services.
checker := grpchealth.NewStaticChecker(
pluginsConnect.PluginsServiceName,
)
mux_connect.Handle(grpchealth.NewHandler(checker))
mux.Handle(grpchealth.NewHandler(checker))

port, err := startImprobableHandler(pluginsServer, *listenerCMux, grpcSrv, gwArgs)
if err != nil {
return err
}
// Finally, link the new mux so that all other requests are handled by the gateway
mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gwArgs.Mux.ServeHTTP(w, r)
}))

if serveOpts.UnsafeLocalDevKubeconfig {
klogv2.Warning("Using the local Kubeconfig file instead of the actual in-cluster's config. This is not recommended except for development purposes.")
}

// Finally, link the new mux so that all other requests are proxied to the port on which
// the improbable gRPC server is listening.
mux_connect.Handle("/", createProxyToImprobableHandler(port))

klogv2.Infof("Starting server on %q", listenAddr)
if err := http.ListenAndServe(listenAddr, h2c.NewHandler(mux_connect, &http2.Server{})); err != nil {
if err := http.ListenAndServe(listenAddr, h2c.NewHandler(mux, &http2.Server{})); err != nil {
klogv2.Fatalf("failed to server: %+v", err)
}

Expand Down Expand Up @@ -253,142 +241,6 @@ func gatewayMux() (*runtime.ServeMux, error) {
return gwmux, nil
}

// createProxyToImprobableHandler returns a handler func that proxies requests
// through to the improbable handler listening on a different port.
//
// It creates two reverse proxies, one with an h2c transport, the other with an http1 transport,
// so that, depending on the request being handled, the request can be sent on the correct
// transport.
//
// This function is temporary and will be removed once all code is switched to the connect
// gRPC library.
func createProxyToImprobableHandler(port int) http.HandlerFunc {
h2cProxy := &httputil.ReverseProxy{
Director: func(r *http.Request) {
r.URL.Scheme = "http"
r.URL.Host = fmt.Sprintf("127.0.0.1:%d", port)
},
Transport: &http2.Transport{
AllowHTTP: true,
DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) {
return net.Dial(network, addr)
},
},
}
http1Proxy := httputil.ReverseProxy{
Director: func(r *http.Request) {
r.URL.Scheme = "http"
r.URL.Host = fmt.Sprintf("127.0.0.1:%d", port)
},
}

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.ProtoMajor == 2 {
h2cProxy.ServeHTTP(w, r)
} else {
http1Proxy.ServeHTTP(w, r)
}
})
}

// createImprobableGRPCServer returns the created listener as well as the server and gateway arges.
//
// The latter are still required when registering plugins (though will be removed soon).
func createImprobableGRPCServer(ctx context.Context, listenAddr string) (*grpc.Server, core.GatewayHandlerArgs, *net.Listener, error) {
// Create the grpc server and register the reflection server (for now, useful for discovery
// using grpcurl) or similar.
grpcSrv := grpc.NewServer(grpc.ChainUnaryInterceptor(LogRequest))
reflection.Register(grpcSrv)

gw, err := gatewayMux()
if err != nil {
return nil, core.GatewayHandlerArgs{}, nil, err
}

// During the transition to the connect gRPC handlers, we'll continue to proxy unhandled
// gRPC requests through to the old improbable-eng-based handlers which used the cmux
// library to multiplex requests based on headers. The cmux listen address
// will be a random port. We'll send traffic through to this port from the main http.mux
// used by connect.
listenerCMux, err := net.Listen("tcp", ":0")
if err != nil {
return nil, core.GatewayHandlerArgs{}, nil, err
}

// Note: we point the gateway at our *new* gRPC handler, so that we can continue to use
// the gateway for a ReST-ish API
gwArgs := core.GatewayHandlerArgs{
Ctx: ctx,
Mux: gw,
Addr: listenAddr,
DialOptions: []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())},
}

return grpcSrv, gwArgs, &listenerCMux, nil
}

// startImprobableHandler returns the port on which the improbable gRPC handler is listening.
func startImprobableHandler(pluginsServer *pluginsv1alpha1.PluginsServer, listenerCMux net.Listener, grpcSrv *grpc.Server, gwArgs core.GatewayHandlerArgs) (int, error) {

// Multiplex the connection between grpc and http.
// Note: due to a change in the grpc protocol, it's no longer possible to just match
// on the simpler cmux.HTTP2HeaderField("content-type", "application/grpc"). More details
// at https://github.com/soheilhy/cmux/issues/64
mux := cmux.New(listenerCMux)
grpcListener := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
grpcWebListener := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc-web"))
httpListener := mux.Match(cmux.Any())

webRpcProxy := grpcweb.WrapServer(grpcSrv,
grpcweb.WithOriginFunc(func(origin string) bool { return true }),
grpcweb.WithWebsockets(true),
grpcweb.WithWebsocketOriginFunc(func(req *http.Request) bool { return true }),
)

httpSrv := &http.Server{
ReadHeaderTimeout: 60 * time.Second, // mitigate slowloris attacks, set to nginx's default
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if webRpcProxy.IsGrpcWebRequest(r) || webRpcProxy.IsAcceptableGrpcCorsRequest(r) || webRpcProxy.IsGrpcWebSocketRequest(r) {
webRpcProxy.ServeHTTP(w, r)
} else {
gwArgs.Mux.ServeHTTP(w, r)
}
},
),
}

go func() {
err := grpcSrv.Serve(grpcListener)
if err != nil {
klogv2.Fatalf("failed to serve: %v", err)
}
}()
go func() {
err := grpcSrv.Serve(grpcWebListener)
if err != nil {
klogv2.Fatalf("failed to serve: %v", err)
}
}()
go func() {
err := httpSrv.Serve(httpListener)
if err != nil {
klogv2.Fatalf("failed to serve: %v", err)
}
}()
go func() {
if err := mux.Serve(); err != nil {
klogv2.Fatalf("failed to serve: %v", err)
}
}()

parts := strings.SplitAfter(listenerCMux.Addr().String(), ":")
port, err := strconv.Atoi(parts[len(parts)-1])
if err != nil {
return 0, err
}
return port, nil
}

// Registers the pluginsServer with the mux and gateway.
func registerPluginsServiceServer(mux *http.ServeMux, pluginsServer *pluginsv1alpha1.PluginsServer, gwArgs core.GatewayHandlerArgs) error {
mux.Handle(pluginsConnect.NewPluginsServiceHandler(pluginsServer))
Expand Down
2 changes: 0 additions & 2 deletions dashboard/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
"@cds/core": "^6.2.0",
"@cds/react": "^6.2.0",
"@clr/ui": "^15.0.3",
"@improbable-eng/grpc-web": "^0.15.0",
"@paciolan/remote-component": "^2.13.0",
"@tanstack/match-sorter-utils": "^8.7.6",
"@tanstack/react-table": "^8.7.8",
Expand Down Expand Up @@ -81,7 +80,6 @@
"@bufbuild/protoc-gen-es": "^1.1.1",
"@craco/craco": "^7.0.0",
"@formatjs/cli": "^6.0.4",
"@improbable-eng/grpc-web-fake-transport": "^0.15.0",
"@testing-library/react": "^12.1.5",
"@types/enzyme": "^3.10.12",
"@types/jest": "^29.5.0",
Expand Down
14 changes: 0 additions & 14 deletions dashboard/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1477,20 +1477,6 @@
resolved "https://registry.yarnpkg.com/@humanwhocodes/object-schema/-/object-schema-1.2.1.tgz#b520529ec21d8e5945a1851dfd1c32e94e39ff45"
integrity sha512-ZnQMnLV4e7hDlUvw8H+U8ASL02SS2Gn6+9Ac3wGGLIe7+je2AeAOxPY+izIPJDfFDb7eDjev0Us8MO1iFRN8hA==

"@improbable-eng/grpc-web-fake-transport@^0.15.0":
version "0.15.0"
resolved "https://registry.yarnpkg.com/@improbable-eng/grpc-web-fake-transport/-/grpc-web-fake-transport-0.15.0.tgz#af137f93c87eab988dec4291f933decd405f3774"
integrity sha512-onrErq5KoP4+QnuuUox8rRFcLolgXvBRVVTIDoJvvIBY6n11eYHlXOMAU2ZfkrP7+sGnXgGn+FBoCgVQ0/NKVQ==
dependencies:
lodash.assignin "^4.2.0"

"@improbable-eng/grpc-web@^0.15.0":
version "0.15.0"
resolved "https://registry.yarnpkg.com/@improbable-eng/grpc-web/-/grpc-web-0.15.0.tgz#3e47e9fdd90381a74abd4b7d26e67422a2a04bef"
integrity sha512-ERft9/0/8CmYalqOVnJnpdDry28q+j+nAlFFARdjyxXDJ+Mhgv9+F600QC8BR9ygOfrXRlAk6CvST2j+JCpQPg==
dependencies:
browser-headers "^0.4.1"

"@istanbuljs/load-nyc-config@^1.0.0":
version "1.1.0"
resolved "https://registry.yarnpkg.com/@istanbuljs/load-nyc-config/-/load-nyc-config-1.1.0.tgz#fd3db1d59ecf7cf121e80650bb86712f9b55eced"
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,11 @@ require (
github.com/google/go-containerregistry v0.14.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2
github.com/improbable-eng/grpc-web v0.15.0
github.com/itchyny/gojq v0.12.12
github.com/jinzhu/copier v0.3.5
github.com/lib/pq v1.10.7
github.com/mitchellh/go-homedir v1.1.0
github.com/opencontainers/image-spec v1.1.0-rc2.0.20221005185240-3a7f492d3f1b
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.7.0
github.com/spf13/cobra-cli v1.3.0
github.com/spf13/pflag v1.0.5
Expand Down
5 changes: 0 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,6 @@ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
github.com/improbable-eng/grpc-web v0.15.0 h1:BN+7z6uNXZ1tQGcNAuaU1YjsLTApzkjt2tzCixLaUPQ=
github.com/improbable-eng/grpc-web v0.15.0/go.mod h1:1sy9HKV4Jt9aEs9JSnkWlRJPuPtwNr0l57L4f878wP8=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
Expand Down Expand Up @@ -915,9 +913,6 @@ github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
Expand Down
Loading