From 2bd0e6de10b7f2eb1aa5d17d1174c7e7ec504f78 Mon Sep 17 00:00:00 2001 From: Gari Singh Date: Thu, 1 Jun 2017 14:07:19 -0400 Subject: [PATCH] [FAB-4305] Implement gRPC keepalive support There are a couple of defects related to long running gRPC connections being broken by intermediaries due to inactivity. The latest version of the gRPC libraries added support for keepliaves to deal with this situation. This change adds keepalive support for gRPC server and client connections. The defaults are currently being used and no user configuration has been exposed. The defaults are sane (5min inactivity and 20sec ping response) for typical environments. Will submit a separate change for gRPC configuration in general, although we can decide whether or not to take that on at this point. Change-Id: Idf63e37eb219e7961a2217193d6c00a69d259998 Signed-off-by: Gari Singh --- core/comm/config.go | 66 +++++++++++++++++++++++++++ core/comm/config_test.go | 16 +++++++ core/comm/server.go | 3 ++ core/comm/server_test.go | 60 ++++++++++++++++++++++++ core/deliverservice/deliveryclient.go | 2 + peer/node/start.go | 3 ++ 6 files changed, 150 insertions(+) diff --git a/core/comm/config.go b/core/comm/config.go index 40e37990e35..dc3ed33b9e4 100644 --- a/core/comm/config.go +++ b/core/comm/config.go @@ -7,7 +7,11 @@ SPDX-License-Identifier: Apache-2.0 package comm import ( + "time" + "github.com/spf13/viper" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) var ( @@ -18,8 +22,34 @@ var ( // Max send and receive bytes for grpc clients and servers maxRecvMsgSize = 100 * 1024 * 1024 maxSendMsgSize = 100 * 1024 * 1024 + // Default keepalive options + keepaliveOptions = KeepaliveOptions{ + ClientKeepaliveTime: 300, // 5 min + ClientKeepaliveTimeout: 20, // 20 sec - gRPC default + ServerKeepaliveTime: 7200, // 2 hours - gRPC default + ServerKeepaliveTimeout: 20, // 20 sec - gRPC default + } ) +// KeepAliveOptions is used to set the gRPC keepalive settings for both +// clients and servers +type KeepaliveOptions struct { + // ClientKeepaliveTime is the duration in seconds after which if the client + // does not see any activity from the server it pings the server to see + // if it is alive + ClientKeepaliveTime int + // ClientKeepaliveTimeout is the duration the client waits for a response + // from the server after sending a ping before closing the connection + ClientKeepaliveTimeout int + // ServerKeepaliveTime is the duration in seconds after which if the server + // does not see any activity from the client it pings the client to see + // if it is alive + ServerKeepaliveTime int + // ServerKeepaliveTimeout is the duration the server waits for a response + // from the client after sending a ping before closing the connection + ServerKeepaliveTimeout int +} + // cacheConfiguration caches common package scoped variables func cacheConfiguration() { if !configurationCached { @@ -59,3 +89,39 @@ func MaxSendMsgSize() int { func SetMaxSendMsgSize(size int) { maxSendMsgSize = size } + +// SetKeepaliveOptions sets the gRPC keepalive options for both clients and +// servers +func SetKeepaliveOptions(ka KeepaliveOptions) { + keepaliveOptions = ka +} + +// ServerKeepaliveOptions returns the gRPC keepalive options for servers +func ServerKeepaliveOptions() []grpc.ServerOption { + var serverOpts []grpc.ServerOption + kap := keepalive.ServerParameters{ + Time: time.Duration(keepaliveOptions.ServerKeepaliveTime) * time.Second, + Timeout: time.Duration(keepaliveOptions.ServerKeepaliveTimeout) * time.Second, + } + serverOpts = append(serverOpts, grpc.KeepaliveParams(kap)) + kep := keepalive.EnforcementPolicy{ + // needs to match clientKeepalive + MinTime: time.Duration(keepaliveOptions.ClientKeepaliveTime) * time.Second, + // allow keepalive w/o rpc + PermitWithoutStream: true, + } + serverOpts = append(serverOpts, grpc.KeepaliveEnforcementPolicy(kep)) + return serverOpts +} + +// ClientKeepaliveOptions returns the gRPC keepalive options for clients +func ClientKeepaliveOptions() []grpc.DialOption { + var dialOpts []grpc.DialOption + kap := keepalive.ClientParameters{ + Time: time.Duration(keepaliveOptions.ClientKeepaliveTime) * time.Second, + Timeout: time.Duration(keepaliveOptions.ClientKeepaliveTimeout) * time.Second, + PermitWithoutStream: true, + } + dialOpts = append(dialOpts, grpc.WithKeepaliveParams(kap)) + return dialOpts +} diff --git a/core/comm/config_test.go b/core/comm/config_test.go index 55dd545ca9d..b3bc4807402 100644 --- a/core/comm/config_test.go +++ b/core/comm/config_test.go @@ -27,6 +27,22 @@ func TestConfig(t *testing.T) { assert.EqualValues(t, size, MaxRecvMsgSize()) assert.EqualValues(t, size, MaxSendMsgSize()) + // set keepalive options + timeout := 1000 + ka := KeepaliveOptions{ + ClientKeepaliveTime: timeout, + ClientKeepaliveTimeout: timeout + 1, + ServerKeepaliveTime: timeout + 2, + ServerKeepaliveTimeout: timeout + 3, + } + SetKeepaliveOptions(ka) + assert.EqualValues(t, timeout, keepaliveOptions.ClientKeepaliveTime) + assert.EqualValues(t, timeout+1, keepaliveOptions.ClientKeepaliveTimeout) + assert.EqualValues(t, timeout+2, keepaliveOptions.ServerKeepaliveTime) + assert.EqualValues(t, timeout+3, keepaliveOptions.ServerKeepaliveTimeout) + assert.EqualValues(t, 2, len(ServerKeepaliveOptions())) + assert.Equal(t, 1, len(ClientKeepaliveOptions())) + // reset cache configurationCached = false viper.Set("peer.tls.enabled", true) diff --git a/core/comm/server.go b/core/comm/server.go index 05c4cf349c3..a767b53cf76 100644 --- a/core/comm/server.go +++ b/core/comm/server.go @@ -168,6 +168,9 @@ func NewGRPCServerFromListener(listener net.Listener, secureConfig SecureServerC // set max send and recv msg sizes serverOpts = append(serverOpts, grpc.MaxSendMsgSize(MaxSendMsgSize())) serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(MaxRecvMsgSize())) + // set the keepalive options + serverOpts = append(serverOpts, ServerKeepaliveOptions()...) + grpcServer.server = grpc.NewServer(serverOpts...) return grpcServer, nil diff --git a/core/comm/server_test.go b/core/comm/server_test.go index 225eb7cfce6..0a0b3a28de3 100644 --- a/core/comm/server_test.go +++ b/core/comm/server_test.go @@ -21,6 +21,7 @@ import ( "crypto/x509" "errors" "fmt" + "io" "io/ioutil" "log" "net" @@ -34,6 +35,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/transport" "github.com/hyperledger/fabric/core/comm" testpb "github.com/hyperledger/fabric/core/comm/testdata/grpc" @@ -1357,3 +1359,61 @@ func TestSetClientRootCAs(t *testing.T) { } } + +func TestKeepaliveNoClientResponse(t *testing.T) { + t.Parallel() + // set up GRPCServer instance + kap := comm.KeepaliveOptions{ + ServerKeepaliveTime: 2, + ServerKeepaliveTimeout: 1, + } + comm.SetKeepaliveOptions(kap) + testAddress := "localhost:9400" + srv, err := comm.NewGRPCServer(testAddress, comm.SecureServerConfig{}) + assert.NoError(t, err, "Unexpected error starting GRPCServer") + go srv.Start() + defer srv.Stop() + + // test connection close if client does not response to ping + // net client will not response to keepalive + client, err := net.Dial("tcp", testAddress) + assert.NoError(t, err, "Unexpected error dialing GRPCServer") + defer client.Close() + // sleep past keepalive timeout + time.Sleep(4 * time.Second) + data := make([]byte, 24) + for { + _, err = client.Read(data) + if err == nil { + continue + } + assert.EqualError(t, err, io.EOF.Error(), "Expected io.EOF") + break + } +} + +func TestKeepaliveClientResponse(t *testing.T) { + t.Parallel() + // set up GRPCServer instance + kap := comm.KeepaliveOptions{ + ServerKeepaliveTime: 2, + ServerKeepaliveTimeout: 1, + } + comm.SetKeepaliveOptions(kap) + testAddress := "localhost:9401" + srv, err := comm.NewGRPCServer(testAddress, comm.SecureServerConfig{}) + assert.NoError(t, err, "Unexpected error starting GRPCServer") + go srv.Start() + defer srv.Stop() + + // test that connection does not close with response to ping + clientTransport, err := transport.NewClientTransport(context.Background(), + transport.TargetInfo{Addr: testAddress}, transport.ConnectOptions{}) + assert.NoError(t, err, "Unexpected error creating client transport") + defer clientTransport.Close() + // sleep past keepalive timeout + time.Sleep(4 * time.Second) + // try to create a stream + _, err = clientTransport.NewStream(context.Background(), &transport.CallHdr{}) + assert.NoError(t, err, "Unexpected error creating stream") +} diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index d778d9bf946..4e61933717b 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -194,6 +194,8 @@ func DefaultConnectionFactory(endpoint string) (*grpc.ClientConn, error) { // set max send/recv msg sizes dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()), grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize()))) + // set the keepalive options + dialOpts = append(dialOpts, comm.ClientKeepaliveOptions()...) if comm.TLSEnabled() { dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCASupport().GetDeliverServiceCredentials())) diff --git a/peer/node/start.go b/peer/node/start.go index e70a3fa2805..b64135a3b97 100644 --- a/peer/node/start.go +++ b/peer/node/start.go @@ -157,6 +157,9 @@ func serve(args []string) error { // set max send/recv msg sizes dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()), grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize()))) + // set the keepalive options + dialOpts = append(dialOpts, comm.ClientKeepaliveOptions()...) + if comm.TLSEnabled() { dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCASupport().GetPeerCredentials())) } else {