diff --git a/internal/example/main/main.go b/internal/example/main/main.go index 59bbb16fa7..cbb6a12090 100644 --- a/internal/example/main/main.go +++ b/internal/example/main/main.go @@ -67,5 +67,5 @@ func main() { ctx := context.Background() cb := &test.Callbacks{Debug: l.Debug} srv := server.NewServer(ctx, cache, cb) - example.RunServer(ctx, srv, port) + example.RunServer(srv, port) } diff --git a/internal/example/server.go b/internal/example/server.go index b907c30faf..09134bc907 100644 --- a/internal/example/server.go +++ b/internal/example/server.go @@ -31,7 +31,9 @@ import ( routeservice "github.com/envoyproxy/go-control-plane/envoy/service/route/v3" runtimeservice "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3" secretservice "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" + "github.com/envoyproxy/go-control-plane/pkg/test/v3" ) const ( @@ -41,6 +43,58 @@ const ( grpcMaxConcurrentStreams = 1000000 ) +type Server struct { + xdsserver server.Server +} + +func NewServer(ctx context.Context, cache cache.Cache, cb *test.Callbacks) *Server { + srv := server.NewServer(ctx, cache, cb) + return &Server{srv} +} + +func (s *Server) registerServer(grpcServer *grpc.Server) { + // register services + discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, s.xdsserver) + endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, s.xdsserver) + clusterservice.RegisterClusterDiscoveryServiceServer(grpcServer, s.xdsserver) + routeservice.RegisterRouteDiscoveryServiceServer(grpcServer, s.xdsserver) + listenerservice.RegisterListenerDiscoveryServiceServer(grpcServer, s.xdsserver) + secretservice.RegisterSecretDiscoveryServiceServer(grpcServer, s.xdsserver) + runtimeservice.RegisterRuntimeDiscoveryServiceServer(grpcServer, s.xdsserver) +} + +func (s *Server) Run(port uint) { + // gRPC golang library sets a very small upper bound for the number gRPC/h2 + // streams over a single TCP connection. If a proxy multiplexes requests over + // a single connection to the management server, then it might lead to + // availability problems. Keepalive timeouts based on connection_keepalive parameter https://www.envoyproxy.io/docs/envoy/latest/configuration/overview/examples#dynamic + var grpcOptions []grpc.ServerOption + grpcOptions = append(grpcOptions, + grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams), + grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: grpcKeepaliveTime, + Timeout: grpcKeepaliveTimeout, + }), + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: grpcKeepaliveMinTime, + PermitWithoutStream: true, + }), + ) + grpcServer := grpc.NewServer(grpcOptions...) + + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + log.Fatal(err) + } + + s.registerServer(grpcServer) + + log.Printf("management server listening on %d\n", port) + if err = grpcServer.Serve(lis); err != nil { + log.Println(err) + } +} + func registerServer(grpcServer *grpc.Server, server server.Server) { // register services discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, server) @@ -53,7 +107,7 @@ func registerServer(grpcServer *grpc.Server, server server.Server) { } // RunServer starts an xDS server at the given port. -func RunServer(ctx context.Context, srv server.Server, port uint) { +func RunServer(srv server.Server, port uint) { // gRPC golang library sets a very small upper bound for the number gRPC/h2 // streams over a single TCP connection. If a proxy multiplexes requests over // a single connection to the management server, then it might lead to