Skip to content

Commit

Permalink
[FAB-6017] enable GRPC keepalive on chaincode
Browse files Browse the repository at this point in the history
Link level disconnects on the chaincode-peer connection could cause
either side to wait indefinitely for the other side to restablish
connectivity. This would cause leaks
   . chaincode container stays up after peer dies
   . peer holds chaincode handlers after chaincode dies
Note that the above condition requires a disconnect to happen before
the containers go away. Container deaths under normal network connectivity
is cleanly detected and cleaned up on either side (which is why we normally
don't notice this issue.

gRPC keepalive enabled in FAB-4305 provides an elegant solution to this
problem. This CR enables client-side keep-alives for chaincode client by
setting keepalive.ClientParameters.  To contain wide changes, non-chaincode
connections are left as is.

With this fix chaincode and server send gRPC pings once a min when there's
no activity on the connection. Ping failures are detected by gRPC which
terminate the connection. This is detected by the higher layers which
clean up normally.

This should also fix FAB-6333. Another nice side-effect is we may not need
the app level KEEPALIVE pings any more. But this can be revisited in future.

Change-Id: Icc04391cf114713e464cfd59a6bb4b5dea0a4806
Signed-off-by: Srinivasan Muralidharan <srinivasan.muralidharan99@gmail.com>
  • Loading branch information
muralisrini committed Oct 4, 2017
1 parent 7500e9b commit 780c31a
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 22 deletions.
4 changes: 2 additions & 2 deletions core/chaincode/shim/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ func getPeerAddress() string {
func newPeerClientConnection() (*grpc.ClientConn, error) {
var peerAddress = getPeerAddress()
if comm.TLSEnabled() {
return comm.NewClientConnectionWithAddress(peerAddress, true, true, comm.InitTLSForShim(key, cert))
return comm.NewChaincodeClientConnectionWithAddress(peerAddress, true, true, comm.InitTLSForShim(key, cert))
}
return comm.NewClientConnectionWithAddress(peerAddress, true, false, nil)
return comm.NewChaincodeClientConnectionWithAddress(peerAddress, true, false, nil)
}

func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode) error {
Expand Down
28 changes: 22 additions & 6 deletions core/comm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,21 @@ var (
// Max send and receive bytes for grpc clients and servers
maxRecvMsgSize = 100 * 1024 * 1024
maxSendMsgSize = 100 * 1024 * 1024
// Default keepalive options
// Default peer keepalive options
keepaliveOptions = KeepaliveOptions{
ClientKeepaliveTime: 60, // 1 min
ClientKeepaliveTimeout: 20, // 20 sec - gRPC default
ServerKeepaliveTime: 7200, // 2 hours - gRPC default
ServerKeepaliveTimeout: 20, // 20 sec - gRPC default
}
// chaincode keepalive options separate from peer keepalive
// options above (for flexibility)
chaincodeKeepaliveOptions = KeepaliveOptions{
ClientKeepaliveTime: 60, // 1 min
ClientKeepaliveTimeout: 20, // 20 sec - gRPC default
ServerKeepaliveTime: 60, // 1 min
ServerKeepaliveTimeout: 20, // 20 sec - gRPC default
}
)

// KeepAliveOptions is used to set the gRPC keepalive settings for both
Expand Down Expand Up @@ -98,15 +106,19 @@ func SetKeepaliveOptions(ka KeepaliveOptions) {

// ServerKeepaliveOptions returns the gRPC keepalive options for servers
func ServerKeepaliveOptions() []grpc.ServerOption {
return serverKeepaliveOptionsWithKa(&keepaliveOptions)
}

func serverKeepaliveOptionsWithKa(ka *KeepaliveOptions) []grpc.ServerOption {
var serverOpts []grpc.ServerOption
kap := keepalive.ServerParameters{
Time: time.Duration(keepaliveOptions.ServerKeepaliveTime) * time.Second,
Timeout: time.Duration(keepaliveOptions.ServerKeepaliveTimeout) * time.Second,
Time: time.Duration(ka.ServerKeepaliveTime) * time.Second,
Timeout: time.Duration(ka.ServerKeepaliveTimeout) * time.Second,
}
serverOpts = append(serverOpts, grpc.KeepaliveParams(kap))
kep := keepalive.EnforcementPolicy{
// needs to match clientKeepalive
MinTime: time.Duration(keepaliveOptions.ClientKeepaliveTime) * time.Second,
MinTime: time.Duration(ka.ClientKeepaliveTime) * time.Second,
// allow keepalive w/o rpc
PermitWithoutStream: true,
}
Expand All @@ -116,10 +128,14 @@ func ServerKeepaliveOptions() []grpc.ServerOption {

// ClientKeepaliveOptions returns the gRPC keepalive options for clients
func ClientKeepaliveOptions() []grpc.DialOption {
return clientKeepaliveOptionsWithKa(&keepaliveOptions)
}

func clientKeepaliveOptionsWithKa(ka *KeepaliveOptions) []grpc.DialOption {
var dialOpts []grpc.DialOption
kap := keepalive.ClientParameters{
Time: time.Duration(keepaliveOptions.ClientKeepaliveTime) * time.Second,
Timeout: time.Duration(keepaliveOptions.ClientKeepaliveTimeout) * time.Second,
Time: time.Duration(ka.ClientKeepaliveTime) * time.Second,
Timeout: time.Duration(ka.ClientKeepaliveTimeout) * time.Second,
PermitWithoutStream: true,
}
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(kap))
Expand Down
25 changes: 24 additions & 1 deletion core/comm/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,32 @@ func GetPeerTestingAddress(port string) string {
return getEnv("UNIT_TEST_PEER_IP", "localhost") + ":" + port
}

// NewClientConnectionWithAddress Returns a new grpc.ClientConn to the given address.
// NewClientConnectionWithAddress Returns a new grpc.ClientConn to the given address
func NewClientConnectionWithAddress(peerAddress string, block bool, tslEnabled bool, creds credentials.TransportCredentials) (*grpc.ClientConn, error) {
return newClientConnectionWithAddressWithKa(peerAddress, block, tslEnabled, creds, nil)
}

// NewChaincodeClientConnectionWithAddress Returns a new chaincode type grpc.ClientConn to the given address
func NewChaincodeClientConnectionWithAddress(peerAddress string, block bool, tslEnabled bool, creds credentials.TransportCredentials) (*grpc.ClientConn, error) {
ka := chaincodeKeepaliveOptions
//client side's keepalive parameter better be greater than EnforcementPolicies MinTime
//to prevent server killing the connection due to timing issues. Just increase by a min
ka.ClientKeepaliveTime += 60

return newClientConnectionWithAddressWithKa(peerAddress, block, tslEnabled, creds, &ka)
}

// newClientConnectionWithAddressWithKa Returns a new grpc.ClientConn to the given address using specied keepalive options
func newClientConnectionWithAddressWithKa(peerAddress string, block bool, tslEnabled bool, creds credentials.TransportCredentials, ka *KeepaliveOptions) (*grpc.ClientConn, error) {
var opts []grpc.DialOption

//preserve old behavior for non chaincode. We probably
//want to change this in future to have peer client
//send keepalives too
if ka != nil {
opts = clientKeepaliveOptionsWithKa(ka)
}

if tslEnabled {
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
Expand Down
35 changes: 35 additions & 0 deletions core/comm/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,23 @@ func TestConnection_Correct(t *testing.T) {
tmpConn.Close()
}

func TestChaincodeConnection_Correct(t *testing.T) {
testutil.SetupTestConfig()
viper.Set("ledger.blockchain.deploy-system-chaincode", "false")
peerAddress := GetPeerTestingAddress("7052")
var tmpConn *grpc.ClientConn
var err error
if TLSEnabled() {
tmpConn, err = NewChaincodeClientConnectionWithAddress(peerAddress, true, true, InitTLSForPeer())
}
tmpConn, err = NewChaincodeClientConnectionWithAddress(peerAddress, true, false, nil)
if err != nil {
t.Fatalf("error connection to server at host:port = %s\n", peerAddress)
}

tmpConn.Close()
}

func TestConnection_WrongAddress(t *testing.T) {
testutil.SetupTestConfig()
viper.Set("ledger.blockchain.deploy-system-chaincode", "false")
Expand All @@ -91,6 +108,24 @@ func TestConnection_WrongAddress(t *testing.T) {
}
}

func TestChaincodeConnection_WrongAddress(t *testing.T) {
testutil.SetupTestConfig()
viper.Set("ledger.blockchain.deploy-system-chaincode", "false")
//some random port
peerAddress := GetPeerTestingAddress("10287")
var tmpConn *grpc.ClientConn
var err error
if TLSEnabled() {
tmpConn, err = NewChaincodeClientConnectionWithAddress(peerAddress, true, true, InitTLSForPeer())
}
tmpConn, err = NewChaincodeClientConnectionWithAddress(peerAddress, true, false, nil)
if err == nil {
fmt.Printf("error connection to server - at host:port = %s\n", peerAddress)
t.Error("error connection to server - connection should fail")
tmpConn.Close()
}
}

// utility function to load up our test root certificates from testdata/certs
func loadRootCAs() [][]byte {

Expand Down
31 changes: 24 additions & 7 deletions core/comm/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,26 @@ type grpcServerImpl struct {
}

//NewGRPCServer creates a new implementation of a GRPCServer given a
//listen address.
//listen address
func NewGRPCServer(address string, secureConfig SecureServerConfig) (GRPCServer, error) {
return newGRPCServerWithKa(address, secureConfig, &keepaliveOptions)
}

//NewChaincodeGRPCServer creates a new implementation of a chaincode GRPCServer given a
//listen address
func NewChaincodeGRPCServer(address string, secureConfig SecureServerConfig) (GRPCServer, error) {
return newGRPCServerWithKa(address, secureConfig, &chaincodeKeepaliveOptions)
}

//NewGRPCServerFromListener creates a new implementation of a GRPCServer given
//an existing net.Listener instance using default keepalive
func NewGRPCServerFromListener(listener net.Listener, secureConfig SecureServerConfig) (GRPCServer, error) {
return newGRPCServerFromListenerWithKa(listener, secureConfig, &keepaliveOptions)
}

//newGRPCServerWithKa creates a new implementation of a GRPCServer given a
//listen address with specified keepalive options
func newGRPCServerWithKa(address string, secureConfig SecureServerConfig, ka *KeepaliveOptions) (GRPCServer, error) {

if address == "" {
return nil, errors.New("Missing address parameter")
Expand All @@ -103,14 +121,13 @@ func NewGRPCServer(address string, secureConfig SecureServerConfig) (GRPCServer,
return nil, err
}

return NewGRPCServerFromListener(lis, secureConfig)
return newGRPCServerFromListenerWithKa(lis, secureConfig, ka)

}

//NewGRPCServerFromListener creates a new implementation of a GRPCServer given
//an existing net.Listener instance.
func NewGRPCServerFromListener(listener net.Listener, secureConfig SecureServerConfig) (GRPCServer, error) {

//newGRPCServerFromListenerWithKa creates a new implementation of a GRPCServer given
//an existing net.Listener instance with specfied keepalive
func newGRPCServerFromListenerWithKa(listener net.Listener, secureConfig SecureServerConfig, ka *KeepaliveOptions) (GRPCServer, error) {
grpcServer := &grpcServerImpl{
address: listener.Addr().String(),
listener: listener,
Expand Down Expand Up @@ -169,7 +186,7 @@ func NewGRPCServerFromListener(listener net.Listener, secureConfig SecureServerC
serverOpts = append(serverOpts, grpc.MaxSendMsgSize(MaxSendMsgSize()))
serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(MaxRecvMsgSize()))
// set the keepalive options
serverOpts = append(serverOpts, ServerKeepaliveOptions()...)
serverOpts = append(serverOpts, serverKeepaliveOptionsWithKa(ka)...)

grpcServer.server = grpc.NewServer(serverOpts...)

Expand Down
17 changes: 12 additions & 5 deletions core/comm/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,11 +475,9 @@ func TestNewGRPCServerInvalidParameters(t *testing.T) {
}
}

func TestNewGRPCServer(t *testing.T) {

t.Parallel()
testAddress := "localhost:9053"
srv, err := comm.NewGRPCServer(testAddress,
//pass a server func to test different types of servers
func testNewGRPCServer(t *testing.T, testAddress string, svrfunc func(string, comm.SecureServerConfig) (comm.GRPCServer, error)) {
srv, err := svrfunc(testAddress,
comm.SecureServerConfig{UseTLS: false})
//check for error
if err != nil {
Expand Down Expand Up @@ -518,7 +516,16 @@ func TestNewGRPCServer(t *testing.T) {
} else {
t.Log("GRPC client successfully invoked the EmptyCall service: " + testAddress)
}
}

func TestNewGRPCServer(t *testing.T) {
t.Parallel()
testNewGRPCServer(t, "localhost:9053", comm.NewGRPCServer)
}

func TestNewChaincodeGRPCServer(t *testing.T) {
t.Parallel()
testNewGRPCServer(t, "localhost:9059", comm.NewChaincodeGRPCServer)
}

func TestNewGRPCServerFromListener(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion peer/node/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func createChaincodeServer(caCert []byte, peerHostname string) (comm.GRPCServer,
config.ClientRootCAs = append(config.ClientRootCAs, caCert)
}

srv, err = comm.NewGRPCServer(cclistenAddress, config)
srv, err = comm.NewChaincodeGRPCServer(cclistenAddress, config)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 780c31a

Please sign in to comment.