Skip to content

Commit

Permalink
FAB-16695 Re-use metrics for comm GRPCServer
Browse files Browse the repository at this point in the history
Today, the comm.NewGRPCServer code creates and registers metrics, given
a supplied metrics provider.  This is problematic because we should only
have one metrics provider, and re-registering a metric causes a panic.

Thus, if the admin service, or the Raft cluster service are put onto
different ports when Prometheus is enabled, the result is a crash.  This
CR fixes both peer and orderer initialization to re-use the metric
components across servers.

Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
Change-Id: I80a33c1a3bd4cb25b08ca09297b098b11963dbba
  • Loading branch information
Jason Yellick committed Sep 27, 2019
1 parent 0ac4069 commit ad1b514
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 32 deletions.
4 changes: 2 additions & 2 deletions core/comm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ type ServerConfig struct {
UnaryInterceptors []grpc.UnaryServerInterceptor
// Logger specifies the logger the server will use
Logger *flogging.FabricLogger
// Metrics Provider
MetricsProvider metrics.Provider
// ServerStatsHandler should be set if metrics on connections are to be reported.
ServerStatsHandler *ServerStatsHandler
}

// ClientConfig defines the parameters for configuring a GRPCClient instance
Expand Down
5 changes: 2 additions & 3 deletions core/comm/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,8 @@ func NewGRPCServerFromListener(listener net.Listener, serverConfig ServerConfig)
)
}

if serverConfig.MetricsProvider != nil {
sh := NewServerStatsHandler(serverConfig.MetricsProvider)
serverOpts = append(serverOpts, grpc.StatsHandler(sh))
if serverConfig.ServerStatsHandler != nil {
serverOpts = append(serverOpts, grpc.StatsHandler(serverConfig.ServerStatsHandler))
}

grpcServer.server = grpc.NewServer(serverOpts...)
Expand Down
4 changes: 2 additions & 2 deletions core/comm/serverstatshandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func TestConnMetricsGRPCServer(t *testing.T) {
srv, err := comm.NewGRPCServerFromListener(
listener,
comm.ServerConfig{
SecOpts: &comm.SecureOptions{UseTLS: false},
MetricsProvider: fakeProvider,
SecOpts: &comm.SecureOptions{UseTLS: false},
ServerStatsHandler: comm.NewServerStatsHandler(fakeProvider),
},
)
gt.Expect(err).NotTo(HaveOccurred())
Expand Down
13 changes: 7 additions & 6 deletions orderer/common/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func configureClusterListener(conf *localconfig.TopLevel, generalConf comm.Serve
StreamInterceptors: generalConf.StreamInterceptors,
UnaryInterceptors: generalConf.UnaryInterceptors,
ConnectionTimeout: generalConf.ConnectionTimeout,
MetricsProvider: generalConf.MetricsProvider,
ServerStatsHandler: generalConf.ServerStatsHandler,
Logger: generalConf.Logger,
KaOpts: generalConf.KaOpts,
SecOpts: &comm.SecureOptions{
Expand Down Expand Up @@ -488,16 +488,17 @@ func initializeServerConfig(conf *localconfig.TopLevel, metricsProvider metrics.
kaOpts.ServerTimeout = conf.General.Keepalive.ServerTimeout

commLogger := flogging.MustGetLogger("core.comm").With("server", "Orderer")

if metricsProvider == nil {
metricsProvider = &disabled.Provider{}
}

return comm.ServerConfig{
SecOpts: secureOpts,
KaOpts: kaOpts,
Logger: commLogger,
MetricsProvider: metricsProvider,
ConnectionTimeout: conf.General.ConnectionTimeout,
SecOpts: secureOpts,
KaOpts: kaOpts,
Logger: commLogger,
ServerStatsHandler: comm.NewServerStatsHandler(metricsProvider),
ConnectionTimeout: conf.General.ConnectionTimeout,
StreamInterceptors: []grpc.StreamServerInterceptor{
grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),
grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
Expand Down
4 changes: 2 additions & 2 deletions orderer/common/server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ func TestInitializeServerConfig(t *testing.T) {

sc = initializeServerConfig(conf, nil)
assert.NotNil(t, sc.Logger)
assert.Equal(t, &disabled.Provider{}, sc.MetricsProvider)
assert.Equal(t, comm.NewServerStatsHandler(&disabled.Provider{}), sc.ServerStatsHandler)
assert.Len(t, sc.UnaryInterceptors, 2)
assert.Len(t, sc.StreamInterceptors, 2)

sc = initializeServerConfig(conf, &prometheus.Provider{})
assert.Equal(t, &prometheus.Provider{}, sc.MetricsProvider)
assert.NotNil(t, sc.ServerStatsHandler)

goodFile := "main.go"
badFile := "does_not_exist"
Expand Down
24 changes: 7 additions & 17 deletions peer/node/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func serve(args []string) error {

throttle := comm.NewThrottle(grpcMaxConcurrency)
serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer")
serverConfig.MetricsProvider = metricsProvider
serverConfig.ServerStatsHandler = comm.NewServerStatsHandler(metricsProvider)
serverConfig.UnaryInterceptors = append(
serverConfig.UnaryInterceptors,
grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),
Expand Down Expand Up @@ -273,7 +273,7 @@ func serve(args []string) error {
logger.Debugf("Running peer")

// Start the Admin server
startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
startAdminServer(listenAddr, peerServer.Server(), serverConfig)

privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {
return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt)
Expand Down Expand Up @@ -798,7 +798,7 @@ func adminHasSeparateListener(peerListenAddr string, adminListenAddress string)
return adminPort != peerPort
}

func startAdminServer(peerListenAddr string, peerServer *grpc.Server, metricsProvider metrics.Provider) {
func startAdminServer(peerListenAddr string, peerServer *grpc.Server, baseServerConfig comm.ServerConfig) {
adminListenAddress := viper.GetString("peer.adminService.listenAddress")
separateLsnrForAdmin := adminHasSeparateListener(peerListenAddr, adminListenAddress)
mspID := viper.GetString("peer.localMspId")
Expand All @@ -810,21 +810,11 @@ func startAdminServer(peerListenAddr string, peerServer *grpc.Server, metricsPro
if err != nil {
logger.Fatalf("Error loading secure config for admin service (%s)", err)
}
throttle := comm.NewThrottle(grpcMaxConcurrency)
serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "AdminServer")
serverConfig.MetricsProvider = metricsProvider
serverConfig.UnaryInterceptors = append(
serverConfig.UnaryInterceptors,
grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),
grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
throttle.UnaryServerIntercptor,
)
serverConfig.StreamInterceptors = append(
serverConfig.StreamInterceptors,
grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),
grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
throttle.StreamServerInterceptor,
)
serverConfig.ServerStatsHandler = baseServerConfig.ServerStatsHandler
serverConfig.UnaryInterceptors = baseServerConfig.UnaryInterceptors
serverConfig.StreamInterceptors = baseServerConfig.StreamInterceptors

adminServer, err := peer.NewPeerServer(adminListenAddress, serverConfig)
if err != nil {
logger.Fatalf("Failed to create admin server (%s)", err)
Expand Down

0 comments on commit ad1b514

Please sign in to comment.