Skip to content

Commit

Permalink
Merge pull request #392 from ecordell/grpc-buffcon-and-client
Browse files Browse the repository at this point in the history
support buffconn for grpc server config
  • Loading branch information
ecordell authored Feb 3, 2022
2 parents 28ecd8f + 67f61fd commit a906b7c
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 22 deletions.
13 changes: 13 additions & 0 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/authzed/grpcutil"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/rs/cors"
Expand Down Expand Up @@ -204,6 +205,7 @@ func (c *Config) Complete() (RunnableServer, error) {
dashboardServer: dashboardServer,
unaryMiddleware: c.UnaryMiddleware,
streamingMiddleware: c.StreamingMiddleware,
presharedKey: c.PresharedKey,
}, nil
}

Expand All @@ -212,6 +214,7 @@ type RunnableServer interface {
Run(ctx context.Context) error
Middleware() ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor)
SetMiddleware(unaryInterceptors []grpc.UnaryServerInterceptor, streamingInterceptors []grpc.StreamServerInterceptor) RunnableServer
GRPCDialContext(ctx context.Context, opts ...grpc.DialOption) (*grpc.ClientConn, error)
}

// completedServerConfig holds the full configuration to run a spicedb server,
Expand All @@ -226,6 +229,7 @@ type completedServerConfig struct {

unaryMiddleware []grpc.UnaryServerInterceptor
streamingMiddleware []grpc.StreamServerInterceptor
presharedKey string
}

func (c *completedServerConfig) Middleware() ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor) {
Expand All @@ -238,6 +242,15 @@ func (c *completedServerConfig) SetMiddleware(unaryInterceptors []grpc.UnaryServ
return c
}

func (c *completedServerConfig) GRPCDialContext(ctx context.Context, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
if c.gRPCServer.Insecure() {
opts = append(opts, grpcutil.WithInsecureBearerToken(c.presharedKey))
} else {
opts = append(opts, grpcutil.WithBearerToken(c.presharedKey))
}
return c.gRPCServer.DialContext(ctx, opts...)
}

func (c *completedServerConfig) Run(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)

Expand Down
158 changes: 136 additions & 22 deletions pkg/cmd/util/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package util

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net"
Expand All @@ -13,16 +16,24 @@ import (
"github.com/spf13/pflag"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/test/bufconn"

"github.com/authzed/spicedb/pkg/x509util"
)

const BufferedNetwork string = "buffnet"

type GRPCServerConfig struct {
Address string
Network string
TLSCertPath string
TLSKeyPath string
MaxConnAge time.Duration
Enabled bool
Address string
Network string
TLSCertPath string
TLSKeyPath string
MaxConnAge time.Duration
Enabled bool
BufferSize int
ClientCAPath string

flagPrefix string
}
Expand All @@ -47,22 +58,31 @@ func RegisterGRPCServerFlags(flags *pflag.FlagSet, config *GRPCServerConfig, fla
flags.BoolVar(&config.Enabled, flagPrefix+"-enabled", defaultEnabled, "enable "+serviceName+" gRPC server")
}

type DialFunc func(ctx context.Context, opts ...grpc.DialOption) (*grpc.ClientConn, error)

// Complete takes a set of default options and returns a completed server
func (c *GRPCServerConfig) Complete(level zerolog.Level, svcRegistrationFn func(server *grpc.Server), opts ...grpc.ServerOption) (RunnableGRPCServer, error) {
if !c.Enabled {
return &disabledGrpcServer{}, nil
}
if c.BufferSize == 0 {
c.BufferSize = 1024 * 1024
}
opts = append(opts, grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionAge: c.MaxConnAge,
}))
switch {
case c.TLSCertPath == "" && c.TLSKeyPath == "":
log.Warn().Str("prefix", c.flagPrefix).Msg("grpc server serving plaintext")
case c.TLSCertPath != "" && c.TLSKeyPath != "":
creds, err := credentials.NewServerTLSFromFile(c.TLSCertPath, c.TLSKeyPath)
if err != nil {
return nil, err
}
opts = append(opts, grpc.Creds(creds))
tlsOpts, err := c.tlsOpts()
if err != nil {
return nil, err
}
l, err := net.Listen(c.Network, c.Address)
opts = append(opts, tlsOpts...)

clientCreds, err := c.clientCreds()
if err != nil {
return nil, err
}

l, dial, err := c.listenerAndDialer()
if err != nil {
return nil, fmt.Errorf("failed to listen on addr for gRPC server: %w", err)
}
Expand All @@ -78,29 +98,90 @@ func (c *GRPCServerConfig) Complete(level zerolog.Level, svcRegistrationFn func(
listenFunc: func() error {
return srv.Serve(l)
},
dial: dial,
prestopFunc: func() {
log.WithLevel(level).Str("addr", c.Address).Str("network", c.Network).
Str("prefix", c.flagPrefix).Msg("grpc server stopped listening")
},
stopFunc: srv.GracefulStop,
enabled: c.Enabled,
creds: clientCreds,
}, nil
}

func (c *GRPCServerConfig) listenerAndDialer() (net.Listener, DialFunc, error) {
if c.Network == BufferedNetwork {
bl := bufconn.Listen(c.BufferSize)
return bl, func(ctx context.Context, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) {
return bl.DialContext(ctx)
}))
return grpc.DialContext(ctx, BufferedNetwork, opts...)
}, nil
}
l, err := net.Listen(c.Network, c.Address)
if err != nil {
return nil, nil, err
}
return l, func(ctx context.Context, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, c.Address, opts...)
}, nil
}

func (c *GRPCServerConfig) tlsOpts() ([]grpc.ServerOption, error) {
switch {
case c.TLSCertPath == "" && c.TLSKeyPath == "":
log.Warn().Str("prefix", c.flagPrefix).Msg("grpc server serving plaintext")
return nil, nil
case c.TLSCertPath != "" && c.TLSKeyPath != "":
creds, err := credentials.NewServerTLSFromFile(c.TLSCertPath, c.TLSKeyPath)
if err != nil {
return nil, err
}
return []grpc.ServerOption{grpc.Creds(creds)}, nil
default:
return nil, nil
}
}

func (c *GRPCServerConfig) clientCreds() (credentials.TransportCredentials, error) {
switch {
case c.TLSCertPath == "" && c.TLSKeyPath == "":
return insecure.NewCredentials(), nil
case c.TLSCertPath != "" && c.TLSKeyPath != "":
var err error
var pool *x509.CertPool
if c.ClientCAPath != "" {
pool, err = x509util.CustomCertPool(c.ClientCAPath)
} else {
pool, err = x509.SystemCertPool()
}
if err != nil {
return nil, err
}

return credentials.NewTLS(&tls.Config{RootCAs: pool, MinVersion: tls.VersionTLS12}), nil
default:
return nil, nil
}
}

type RunnableGRPCServer interface {
WithOpts(opts ...grpc.ServerOption) RunnableGRPCServer
Listen() error
DialContext(ctx context.Context, opts ...grpc.DialOption) (*grpc.ClientConn, error)
Insecure() bool
GracefulStop()
}

type completedGRPCServer struct {
opts []grpc.ServerOption
listener net.Listener
svcRegistrationFn func(server *grpc.Server)
svcRegistrationFn func(*grpc.Server)
listenFunc func() error
prestopFunc func()
stopFunc func()
enabled bool
dial func(context.Context, ...grpc.DialOption) (*grpc.ClientConn, error)
creds credentials.TransportCredentials
}

// WithOpts adds to the options for running the server
Expand All @@ -117,18 +198,51 @@ func (c *completedGRPCServer) WithOpts(opts ...grpc.ServerOption) RunnableGRPCSe

// Listen runs a configured server
func (c *completedGRPCServer) Listen() error {
if !c.enabled {
return nil
}
return c.listenFunc()
}

// DialContext starts a connection to grpc server
func (c *completedGRPCServer) DialContext(ctx context.Context, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts = append(opts, grpc.WithTransportCredentials(c.creds))
return c.dial(ctx, opts...)
}

// Insecure returns true if the server is configured without TLS enabled
func (c *completedGRPCServer) Insecure() bool {
return c.creds.Info().SecurityProtocol == "insecure"
}

// GracefulStop stops a running server
func (c *completedGRPCServer) GracefulStop() {
c.prestopFunc()
c.stopFunc()
}

type disabledGrpcServer struct{}

// WithOpts adds to the options for running the server
func (d *disabledGrpcServer) WithOpts(opts ...grpc.ServerOption) RunnableGRPCServer {
return d
}

// Listen runs a configured server
func (d *disabledGrpcServer) Listen() error {
return nil
}

// Insecure returns true if the server is configured without TLS enabled
func (d *disabledGrpcServer) Insecure() bool {
return true
}

// DialContext starts a connection to grpc server
func (d *disabledGrpcServer) DialContext(ctx context.Context, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return nil, nil
}

// GracefulStop stops a running server
func (d *disabledGrpcServer) GracefulStop() {}

type HTTPServerConfig struct {
Address string
TLSCertPath string
Expand Down

0 comments on commit a906b7c

Please sign in to comment.