Skip to content

Commit

Permalink
FAB-12593 Wire in the stats handler
Browse files Browse the repository at this point in the history
Wire in the stats handler to the server to start
tracking connections

Change-Id: Ibb8e6c9be2b2013ae93293400df8da39011556e2
Signed-off-by: Saad Karim <skarim@us.ibm.com>
  • Loading branch information
Saad Karim authored and mastersingh24 committed Oct 31, 2018
1 parent f15c6d8 commit 6ab3eeb
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 14 deletions.
10 changes: 10 additions & 0 deletions core/comm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"crypto/x509"
"time"

"github.com/go-kit/kit/metrics"
"github.com/hyperledger/fabric/common/flogging"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -59,6 +60,8 @@ type ServerConfig struct {
UnaryInterceptors []grpc.UnaryServerInterceptor
// Logger specifies the logger the server will use
Logger *flogging.FabricLogger
// Metrics specifies the different metrics that are tracked for the server
Metrics *Metrics
}

// ClientConfig defines the parameters for configuring a GRPCClient instance
Expand Down Expand Up @@ -117,6 +120,13 @@ type KeepaliveOptions struct {
ServerMinInterval time.Duration
}

type Metrics struct {
// OpenConnCounter keeps track of number of open connections
OpenConnCounter metrics.Counter
// ClosedConnCounter keeps track of number connections closed
ClosedConnCounter metrics.Counter
}

// ServerKeepaliveOptions returns gRPC keepalive options for server. If
// opts is nil, the default keepalive options are returned
func ServerKeepaliveOptions(ka *KeepaliveOptions) []grpc.ServerOption {
Expand Down
8 changes: 8 additions & 0 deletions core/comm/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ func NewGRPCServerFromListener(listener net.Listener, serverConfig ServerConfig)
)
}

if serverConfig.Metrics != nil {
sh := &ServerStatsHandler{
OpenConnCounter: serverConfig.Metrics.OpenConnCounter,
ClosedConnCounter: serverConfig.Metrics.ClosedConnCounter,
}
serverOpts = append(serverOpts, grpc.StatsHandler(sh))
}

grpcServer.server = grpc.NewServer(serverOpts...)

return grpcServer, nil
Expand Down
18 changes: 9 additions & 9 deletions core/comm/serverstatshandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ func (h *ServerStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
return ctx
}

func (h *ServerStatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
switch s.(type) {
case *stats.Begin:
h.OpenConnCounter.Add(1)
case *stats.End:
h.ClosedConnCounter.Add(1)
}
}
func (h *ServerStatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {}

func (h *ServerStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return ctx
}

func (h *ServerStatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) {}
func (h *ServerStatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) {
switch s.(type) {
case *stats.ConnBegin:
h.OpenConnCounter.Add(1)
case *stats.ConnEnd:
h.ClosedConnCounter.Add(1)
}
}
72 changes: 67 additions & 5 deletions core/comm/serverstatshandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,22 @@ package comm_test

import (
"context"
"net"
"testing"
"time"

"github.com/hyperledger/fabric/common/metrics/metricsfakes"
"github.com/hyperledger/fabric/core/comm"
"github.com/stretchr/testify/assert"
testpb "github.com/hyperledger/fabric/core/comm/testdata/grpc"
. "github.com/onsi/gomega"
"google.golang.org/grpc"
"google.golang.org/grpc/stats"
)

func TestConnectionCounters(t *testing.T) {
t.Parallel()
gt := NewGomegaWithT(t)

openConn := &metricsfakes.Counter{}
closedConn := &metricsfakes.Counter{}
sh := &comm.ServerStatsHandler{
Expand All @@ -25,13 +32,68 @@ func TestConnectionCounters(t *testing.T) {
}

for i := 1; i <= 10; i++ {
sh.HandleRPC(context.Background(), &stats.Begin{})
sh.HandleConn(context.Background(), &stats.ConnBegin{})
gt.Expect(openConn.AddCallCount()).To(Equal(i))
}
assert.Equal(t, 10, openConn.AddCallCount())

for i := 1; i <= 5; i++ {
sh.HandleRPC(context.Background(), &stats.End{})
sh.HandleConn(context.Background(), &stats.ConnEnd{})
gt.Expect(closedConn.AddCallCount()).To(Equal(i))
}
assert.Equal(t, 5, closedConn.AddCallCount())
}

func TestConnMetricsGRPCServer(t *testing.T) {
t.Parallel()
gt := NewGomegaWithT(t)

openConn := &metricsfakes.Counter{}
closedConn := &metricsfakes.Counter{}

listener, err := net.Listen("tcp", "localhost:0")
gt.Expect(err).NotTo(HaveOccurred())
srv, err := comm.NewGRPCServerFromListener(
listener,
comm.ServerConfig{
SecOpts: &comm.SecureOptions{UseTLS: false},
Metrics: &comm.Metrics{
OpenConnCounter: openConn,
ClosedConnCounter: closedConn,
},
},
)
gt.Expect(err).NotTo(HaveOccurred())

// register the GRPC test server
testpb.RegisterEmptyServiceServer(srv.Server(), &emptyServiceServer{})

// start the server
go srv.Start()
defer srv.Stop()

// test grpc connection counts
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

gt.Expect(openConn.AddCallCount()).To(Equal(0))
gt.Expect(closedConn.AddCallCount()).To(Equal(0))

//create GRPC client conn
var clientConns []*grpc.ClientConn
for i := 1; i <= 3; i++ {
clientConn, err := grpc.DialContext(ctx, listener.Addr().String(), grpc.WithInsecure())
gt.Expect(err).NotTo(HaveOccurred())
clientConns = append(clientConns, clientConn)

//invoke service
client := testpb.NewEmptyServiceClient(clientConn)
_, err = client.EmptyCall(context.Background(), &testpb.Empty{})
gt.Expect(err).NotTo(HaveOccurred())
gt.Expect(openConn.AddCallCount()).To(Equal(i))
}

for i, conn := range clientConns {
gt.Expect(closedConn.AddCallCount()).Should(Equal(i))
conn.Close()
gt.Eventually(closedConn.AddCallCount, time.Second).Should(Equal(i + 1))
}
}

0 comments on commit 6ab3eeb

Please sign in to comment.