Skip to content

Commit

Permalink
Merge "[FAB-4305] Implement gRPC keepalive support"
Browse files Browse the repository at this point in the history
  • Loading branch information
yacovm authored and Gerrit Code Review committed Jun 2, 2017
2 parents 49538b4 + 2bd0e6d commit 1c8dc30
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 0 deletions.
66 changes: 66 additions & 0 deletions core/comm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ SPDX-License-Identifier: Apache-2.0
package comm

import (
"time"

"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

var (
Expand All @@ -18,8 +22,34 @@ var (
// Max send and receive bytes for grpc clients and servers
maxRecvMsgSize = 100 * 1024 * 1024
maxSendMsgSize = 100 * 1024 * 1024
// Default keepalive options
keepaliveOptions = KeepaliveOptions{
ClientKeepaliveTime: 300, // 5 min
ClientKeepaliveTimeout: 20, // 20 sec - gRPC default
ServerKeepaliveTime: 7200, // 2 hours - gRPC default
ServerKeepaliveTimeout: 20, // 20 sec - gRPC default
}
)

// KeepAliveOptions is used to set the gRPC keepalive settings for both
// clients and servers
type KeepaliveOptions struct {
// ClientKeepaliveTime is the duration in seconds after which if the client
// does not see any activity from the server it pings the server to see
// if it is alive
ClientKeepaliveTime int
// ClientKeepaliveTimeout is the duration the client waits for a response
// from the server after sending a ping before closing the connection
ClientKeepaliveTimeout int
// ServerKeepaliveTime is the duration in seconds after which if the server
// does not see any activity from the client it pings the client to see
// if it is alive
ServerKeepaliveTime int
// ServerKeepaliveTimeout is the duration the server waits for a response
// from the client after sending a ping before closing the connection
ServerKeepaliveTimeout int
}

// cacheConfiguration caches common package scoped variables
func cacheConfiguration() {
if !configurationCached {
Expand Down Expand Up @@ -59,3 +89,39 @@ func MaxSendMsgSize() int {
func SetMaxSendMsgSize(size int) {
maxSendMsgSize = size
}

// SetKeepaliveOptions sets the gRPC keepalive options for both clients and
// servers
func SetKeepaliveOptions(ka KeepaliveOptions) {
keepaliveOptions = ka
}

// ServerKeepaliveOptions returns the gRPC keepalive options for servers
func ServerKeepaliveOptions() []grpc.ServerOption {
var serverOpts []grpc.ServerOption
kap := keepalive.ServerParameters{
Time: time.Duration(keepaliveOptions.ServerKeepaliveTime) * time.Second,
Timeout: time.Duration(keepaliveOptions.ServerKeepaliveTimeout) * time.Second,
}
serverOpts = append(serverOpts, grpc.KeepaliveParams(kap))
kep := keepalive.EnforcementPolicy{
// needs to match clientKeepalive
MinTime: time.Duration(keepaliveOptions.ClientKeepaliveTime) * time.Second,
// allow keepalive w/o rpc
PermitWithoutStream: true,
}
serverOpts = append(serverOpts, grpc.KeepaliveEnforcementPolicy(kep))
return serverOpts
}

// ClientKeepaliveOptions returns the gRPC keepalive options for clients
func ClientKeepaliveOptions() []grpc.DialOption {
var dialOpts []grpc.DialOption
kap := keepalive.ClientParameters{
Time: time.Duration(keepaliveOptions.ClientKeepaliveTime) * time.Second,
Timeout: time.Duration(keepaliveOptions.ClientKeepaliveTimeout) * time.Second,
PermitWithoutStream: true,
}
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(kap))
return dialOpts
}
16 changes: 16 additions & 0 deletions core/comm/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@ func TestConfig(t *testing.T) {
assert.EqualValues(t, size, MaxRecvMsgSize())
assert.EqualValues(t, size, MaxSendMsgSize())

// set keepalive options
timeout := 1000
ka := KeepaliveOptions{
ClientKeepaliveTime: timeout,
ClientKeepaliveTimeout: timeout + 1,
ServerKeepaliveTime: timeout + 2,
ServerKeepaliveTimeout: timeout + 3,
}
SetKeepaliveOptions(ka)
assert.EqualValues(t, timeout, keepaliveOptions.ClientKeepaliveTime)
assert.EqualValues(t, timeout+1, keepaliveOptions.ClientKeepaliveTimeout)
assert.EqualValues(t, timeout+2, keepaliveOptions.ServerKeepaliveTime)
assert.EqualValues(t, timeout+3, keepaliveOptions.ServerKeepaliveTimeout)
assert.EqualValues(t, 2, len(ServerKeepaliveOptions()))
assert.Equal(t, 1, len(ClientKeepaliveOptions()))

// reset cache
configurationCached = false
viper.Set("peer.tls.enabled", true)
Expand Down
3 changes: 3 additions & 0 deletions core/comm/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ func NewGRPCServerFromListener(listener net.Listener, secureConfig SecureServerC
// set max send and recv msg sizes
serverOpts = append(serverOpts, grpc.MaxSendMsgSize(MaxSendMsgSize()))
serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(MaxRecvMsgSize()))
// set the keepalive options
serverOpts = append(serverOpts, ServerKeepaliveOptions()...)

grpcServer.server = grpc.NewServer(serverOpts...)

return grpcServer, nil
Expand Down
60 changes: 60 additions & 0 deletions core/comm/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/x509"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
Expand All @@ -34,6 +35,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/transport"

"github.com/hyperledger/fabric/core/comm"
testpb "github.com/hyperledger/fabric/core/comm/testdata/grpc"
Expand Down Expand Up @@ -1357,3 +1359,61 @@ func TestSetClientRootCAs(t *testing.T) {
}

}

func TestKeepaliveNoClientResponse(t *testing.T) {
t.Parallel()
// set up GRPCServer instance
kap := comm.KeepaliveOptions{
ServerKeepaliveTime: 2,
ServerKeepaliveTimeout: 1,
}
comm.SetKeepaliveOptions(kap)
testAddress := "localhost:9400"
srv, err := comm.NewGRPCServer(testAddress, comm.SecureServerConfig{})
assert.NoError(t, err, "Unexpected error starting GRPCServer")
go srv.Start()
defer srv.Stop()

// test connection close if client does not response to ping
// net client will not response to keepalive
client, err := net.Dial("tcp", testAddress)
assert.NoError(t, err, "Unexpected error dialing GRPCServer")
defer client.Close()
// sleep past keepalive timeout
time.Sleep(4 * time.Second)
data := make([]byte, 24)
for {
_, err = client.Read(data)
if err == nil {
continue
}
assert.EqualError(t, err, io.EOF.Error(), "Expected io.EOF")
break
}
}

func TestKeepaliveClientResponse(t *testing.T) {
t.Parallel()
// set up GRPCServer instance
kap := comm.KeepaliveOptions{
ServerKeepaliveTime: 2,
ServerKeepaliveTimeout: 1,
}
comm.SetKeepaliveOptions(kap)
testAddress := "localhost:9401"
srv, err := comm.NewGRPCServer(testAddress, comm.SecureServerConfig{})
assert.NoError(t, err, "Unexpected error starting GRPCServer")
go srv.Start()
defer srv.Stop()

// test that connection does not close with response to ping
clientTransport, err := transport.NewClientTransport(context.Background(),
transport.TargetInfo{Addr: testAddress}, transport.ConnectOptions{})
assert.NoError(t, err, "Unexpected error creating client transport")
defer clientTransport.Close()
// sleep past keepalive timeout
time.Sleep(4 * time.Second)
// try to create a stream
_, err = clientTransport.NewStream(context.Background(), &transport.CallHdr{})
assert.NoError(t, err, "Unexpected error creating stream")
}
2 changes: 2 additions & 0 deletions core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ func DefaultConnectionFactory(endpoint string) (*grpc.ClientConn, error) {
// set max send/recv msg sizes
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()),
grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize())))
// set the keepalive options
dialOpts = append(dialOpts, comm.ClientKeepaliveOptions()...)

if comm.TLSEnabled() {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCASupport().GetDeliverServiceCredentials()))
Expand Down
3 changes: 3 additions & 0 deletions peer/node/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ func serve(args []string) error {
// set max send/recv msg sizes
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()),
grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize())))
// set the keepalive options
dialOpts = append(dialOpts, comm.ClientKeepaliveOptions()...)

if comm.TLSEnabled() {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCASupport().GetPeerCredentials()))
} else {
Expand Down

0 comments on commit 1c8dc30

Please sign in to comment.