diff --git a/core/comm/config.go b/core/comm/config.go index 40e37990e35..dc3ed33b9e4 100644 --- a/core/comm/config.go +++ b/core/comm/config.go @@ -7,7 +7,11 @@ SPDX-License-Identifier: Apache-2.0 package comm import ( + "time" + "github.com/spf13/viper" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) var ( @@ -18,8 +22,34 @@ var ( // Max send and receive bytes for grpc clients and servers maxRecvMsgSize = 100 * 1024 * 1024 maxSendMsgSize = 100 * 1024 * 1024 + // Default keepalive options + keepaliveOptions = KeepaliveOptions{ + ClientKeepaliveTime: 300, // 5 min + ClientKeepaliveTimeout: 20, // 20 sec - gRPC default + ServerKeepaliveTime: 7200, // 2 hours - gRPC default + ServerKeepaliveTimeout: 20, // 20 sec - gRPC default + } ) +// KeepAliveOptions is used to set the gRPC keepalive settings for both +// clients and servers +type KeepaliveOptions struct { + // ClientKeepaliveTime is the duration in seconds after which if the client + // does not see any activity from the server it pings the server to see + // if it is alive + ClientKeepaliveTime int + // ClientKeepaliveTimeout is the duration the client waits for a response + // from the server after sending a ping before closing the connection + ClientKeepaliveTimeout int + // ServerKeepaliveTime is the duration in seconds after which if the server + // does not see any activity from the client it pings the client to see + // if it is alive + ServerKeepaliveTime int + // ServerKeepaliveTimeout is the duration the server waits for a response + // from the client after sending a ping before closing the connection + ServerKeepaliveTimeout int +} + // cacheConfiguration caches common package scoped variables func cacheConfiguration() { if !configurationCached { @@ -59,3 +89,39 @@ func MaxSendMsgSize() int { func SetMaxSendMsgSize(size int) { maxSendMsgSize = size } + +// SetKeepaliveOptions sets the gRPC keepalive options for both clients and +// servers +func SetKeepaliveOptions(ka KeepaliveOptions) { + keepaliveOptions = ka +} + +// ServerKeepaliveOptions returns the gRPC keepalive options for servers +func ServerKeepaliveOptions() []grpc.ServerOption { + var serverOpts []grpc.ServerOption + kap := keepalive.ServerParameters{ + Time: time.Duration(keepaliveOptions.ServerKeepaliveTime) * time.Second, + Timeout: time.Duration(keepaliveOptions.ServerKeepaliveTimeout) * time.Second, + } + serverOpts = append(serverOpts, grpc.KeepaliveParams(kap)) + kep := keepalive.EnforcementPolicy{ + // needs to match clientKeepalive + MinTime: time.Duration(keepaliveOptions.ClientKeepaliveTime) * time.Second, + // allow keepalive w/o rpc + PermitWithoutStream: true, + } + serverOpts = append(serverOpts, grpc.KeepaliveEnforcementPolicy(kep)) + return serverOpts +} + +// ClientKeepaliveOptions returns the gRPC keepalive options for clients +func ClientKeepaliveOptions() []grpc.DialOption { + var dialOpts []grpc.DialOption + kap := keepalive.ClientParameters{ + Time: time.Duration(keepaliveOptions.ClientKeepaliveTime) * time.Second, + Timeout: time.Duration(keepaliveOptions.ClientKeepaliveTimeout) * time.Second, + PermitWithoutStream: true, + } + dialOpts = append(dialOpts, grpc.WithKeepaliveParams(kap)) + return dialOpts +} diff --git a/core/comm/config_test.go b/core/comm/config_test.go index 55dd545ca9d..b3bc4807402 100644 --- a/core/comm/config_test.go +++ b/core/comm/config_test.go @@ -27,6 +27,22 @@ func TestConfig(t *testing.T) { assert.EqualValues(t, size, MaxRecvMsgSize()) assert.EqualValues(t, size, MaxSendMsgSize()) + // set keepalive options + timeout := 1000 + ka := KeepaliveOptions{ + ClientKeepaliveTime: timeout, + ClientKeepaliveTimeout: timeout + 1, + ServerKeepaliveTime: timeout + 2, + ServerKeepaliveTimeout: timeout + 3, + } + SetKeepaliveOptions(ka) + assert.EqualValues(t, timeout, keepaliveOptions.ClientKeepaliveTime) + assert.EqualValues(t, timeout+1, keepaliveOptions.ClientKeepaliveTimeout) + assert.EqualValues(t, timeout+2, keepaliveOptions.ServerKeepaliveTime) + assert.EqualValues(t, timeout+3, keepaliveOptions.ServerKeepaliveTimeout) + assert.EqualValues(t, 2, len(ServerKeepaliveOptions())) + assert.Equal(t, 1, len(ClientKeepaliveOptions())) + // reset cache configurationCached = false viper.Set("peer.tls.enabled", true) diff --git a/core/comm/server.go b/core/comm/server.go index 05c4cf349c3..a767b53cf76 100644 --- a/core/comm/server.go +++ b/core/comm/server.go @@ -168,6 +168,9 @@ func NewGRPCServerFromListener(listener net.Listener, secureConfig SecureServerC // set max send and recv msg sizes serverOpts = append(serverOpts, grpc.MaxSendMsgSize(MaxSendMsgSize())) serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(MaxRecvMsgSize())) + // set the keepalive options + serverOpts = append(serverOpts, ServerKeepaliveOptions()...) + grpcServer.server = grpc.NewServer(serverOpts...) return grpcServer, nil diff --git a/core/comm/server_test.go b/core/comm/server_test.go index 225eb7cfce6..0a0b3a28de3 100644 --- a/core/comm/server_test.go +++ b/core/comm/server_test.go @@ -21,6 +21,7 @@ import ( "crypto/x509" "errors" "fmt" + "io" "io/ioutil" "log" "net" @@ -34,6 +35,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/transport" "github.com/hyperledger/fabric/core/comm" testpb "github.com/hyperledger/fabric/core/comm/testdata/grpc" @@ -1357,3 +1359,61 @@ func TestSetClientRootCAs(t *testing.T) { } } + +func TestKeepaliveNoClientResponse(t *testing.T) { + t.Parallel() + // set up GRPCServer instance + kap := comm.KeepaliveOptions{ + ServerKeepaliveTime: 2, + ServerKeepaliveTimeout: 1, + } + comm.SetKeepaliveOptions(kap) + testAddress := "localhost:9400" + srv, err := comm.NewGRPCServer(testAddress, comm.SecureServerConfig{}) + assert.NoError(t, err, "Unexpected error starting GRPCServer") + go srv.Start() + defer srv.Stop() + + // test connection close if client does not response to ping + // net client will not response to keepalive + client, err := net.Dial("tcp", testAddress) + assert.NoError(t, err, "Unexpected error dialing GRPCServer") + defer client.Close() + // sleep past keepalive timeout + time.Sleep(4 * time.Second) + data := make([]byte, 24) + for { + _, err = client.Read(data) + if err == nil { + continue + } + assert.EqualError(t, err, io.EOF.Error(), "Expected io.EOF") + break + } +} + +func TestKeepaliveClientResponse(t *testing.T) { + t.Parallel() + // set up GRPCServer instance + kap := comm.KeepaliveOptions{ + ServerKeepaliveTime: 2, + ServerKeepaliveTimeout: 1, + } + comm.SetKeepaliveOptions(kap) + testAddress := "localhost:9401" + srv, err := comm.NewGRPCServer(testAddress, comm.SecureServerConfig{}) + assert.NoError(t, err, "Unexpected error starting GRPCServer") + go srv.Start() + defer srv.Stop() + + // test that connection does not close with response to ping + clientTransport, err := transport.NewClientTransport(context.Background(), + transport.TargetInfo{Addr: testAddress}, transport.ConnectOptions{}) + assert.NoError(t, err, "Unexpected error creating client transport") + defer clientTransport.Close() + // sleep past keepalive timeout + time.Sleep(4 * time.Second) + // try to create a stream + _, err = clientTransport.NewStream(context.Background(), &transport.CallHdr{}) + assert.NoError(t, err, "Unexpected error creating stream") +} diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index d778d9bf946..4e61933717b 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -194,6 +194,8 @@ func DefaultConnectionFactory(endpoint string) (*grpc.ClientConn, error) { // set max send/recv msg sizes dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()), grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize()))) + // set the keepalive options + dialOpts = append(dialOpts, comm.ClientKeepaliveOptions()...) if comm.TLSEnabled() { dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCASupport().GetDeliverServiceCredentials())) diff --git a/peer/node/start.go b/peer/node/start.go index e70a3fa2805..b64135a3b97 100644 --- a/peer/node/start.go +++ b/peer/node/start.go @@ -157,6 +157,9 @@ func serve(args []string) error { // set max send/recv msg sizes dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()), grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize()))) + // set the keepalive options + dialOpts = append(dialOpts, comm.ClientKeepaliveOptions()...) + if comm.TLSEnabled() { dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCASupport().GetPeerCredentials())) } else {