From a063c8a0332b7d1e29e38e14be411a32f093f913 Mon Sep 17 00:00:00 2001 From: Jason Yellick Date: Mon, 19 Nov 2018 12:49:29 -0500 Subject: [PATCH] FAB-9568 Add metrics to Broadcast This CR introduces three metrics for the broadcast service. The validation duration, enqueue duration, and processed count. The validation duration is the amount of time it takes to conclude a message is invalid or is valid and should be ordered. The enqueue duration is the amount of time it takes to pass a valid message to the consenter for ordering. The processed count is the total number of transactions processed by the broadcast interface. Change-Id: If328931ed492933981d22d23d9b39e058fa7e907 Signed-off-by: Jason Yellick --- common/grpcmetrics/interceptor.go | 4 +- orderer/common/broadcast/broadcast.go | 174 ++++++++++---- .../common/broadcast/broadcast_suite_test.go | 16 ++ orderer/common/broadcast/broadcast_test.go | 83 ++++++- orderer/common/broadcast/metrics.go | 47 ++++ orderer/common/broadcast/metrics_test.go | 38 +++ .../common/broadcast/mock/metrics_counter.go | 127 ++++++++++ .../broadcast/mock/metrics_histogram.go | 127 ++++++++++ .../common/broadcast/mock/metrics_provider.go | 218 ++++++++++++++++++ orderer/common/server/main.go | 2 +- orderer/common/server/server.go | 11 +- 11 files changed, 785 insertions(+), 62 deletions(-) create mode 100644 orderer/common/broadcast/metrics.go create mode 100644 orderer/common/broadcast/metrics_test.go create mode 100644 orderer/common/broadcast/mock/metrics_counter.go create mode 100644 orderer/common/broadcast/mock/metrics_histogram.go create mode 100644 orderer/common/broadcast/mock/metrics_provider.go diff --git a/common/grpcmetrics/interceptor.go b/common/grpcmetrics/interceptor.go index 1e426fd4e75..875cb044611 100644 --- a/common/grpcmetrics/interceptor.go +++ b/common/grpcmetrics/interceptor.go @@ -32,7 +32,7 @@ func UnaryServerInterceptor(um *UnaryMetrics) grpc.UnaryServerInterceptor { um.RequestDuration.With( "service", service, "method", method, "code", grpc.Code(err).String(), - ).Observe(float64(duration) / float64(time.Second)) + ).Observe(duration.Seconds()) um.RequestsCompleted.With("service", service, "method", method, "code", grpc.Code(err).String()).Add(1) return resp, err @@ -65,7 +65,7 @@ func StreamServerInterceptor(sm *StreamMetrics) grpc.StreamServerInterceptor { sm.RequestDuration.With( "service", service, "method", method, "code", grpc.Code(err).String(), - ).Observe(float64(duration) / float64(time.Second)) + ).Observe(duration.Seconds()) sm.RequestsCompleted.With("service", service, "method", method, "code", grpc.Code(err).String()).Add(1) return err diff --git a/orderer/common/broadcast/broadcast.go b/orderer/common/broadcast/broadcast.go index d25812f4592..9cb626d4ae9 100644 --- a/orderer/common/broadcast/broadcast.go +++ b/orderer/common/broadcast/broadcast.go @@ -8,6 +8,7 @@ package broadcast import ( "io" + "time" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/util" @@ -19,8 +20,9 @@ import ( var logger = flogging.MustGetLogger("orderer.common.broadcast") -// ChannelSupportRegistrar provides a way for the Handler to look up the Support for a channel //go:generate counterfeiter -o mock/channel_support_registrar.go --fake-name ChannelSupportRegistrar . ChannelSupportRegistrar + +// ChannelSupportRegistrar provides a way for the Handler to look up the Support for a channel type ChannelSupportRegistrar interface { // BroadcastChannelSupport returns the message channel header, whether the message is a config update // and the channel resources for a message or an error if the message is not a message which can @@ -28,8 +30,9 @@ type ChannelSupportRegistrar interface { BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, ChannelSupport, error) } -// ChannelSupport provides the backing resources needed to support broadcast on a channel //go:generate counterfeiter -o mock/channel_support.go --fake-name ChannelSupport . ChannelSupport + +// ChannelSupport provides the backing resources needed to support broadcast on a channel type ChannelSupport interface { msgprocessor.Processor Consenter @@ -53,18 +56,13 @@ type Consenter interface { WaitReady() error } +// Handler is designed to handle connections from Broadcast AB gRPC service type Handler struct { - sm ChannelSupportRegistrar + SupportRegistrar ChannelSupportRegistrar + Metrics *Metrics } -// NewHandlerImpl constructs a new implementation of the Handler interface -func NewHandlerImpl(sm ChannelSupportRegistrar) *Handler { - return &Handler{ - sm: sm, - } -} - -// Handle starts a service thread for a given gRPC connection and services the broadcast connection +// Handle reads requests from a Broadcast stream, processes them, and returns the responses to the stream func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { addr := util.ExtractRemoteAddress(srv.Context()) logger.Debugf("Starting new broadcast loop for %s", addr) @@ -79,59 +77,133 @@ func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { return err } - chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg) + resp := bh.ProcessMessage(msg, addr) + err = srv.Send(resp) + if resp.Status != cb.Status_SUCCESS { + return err + } + + if err != nil { + logger.Warningf("Error sending to %s: %s", addr, err) + return err + } + } + +} + +type MetricsTracker struct { + ValidateStartTime time.Time + EnqueueStartTime time.Time + ValidateDuration time.Duration + ChannelID string + TxType string + Metrics *Metrics +} + +func (mt *MetricsTracker) Record(resp *ab.BroadcastResponse) { + labels := []string{ + "status", resp.Status.String(), + "channel", mt.ChannelID, + "type", mt.TxType, + } + + if mt.ValidateDuration == 0 { + mt.EndValidate() + } + mt.Metrics.ValidateDuration.With(labels...).Observe(mt.ValidateDuration.Seconds()) + + if mt.EnqueueStartTime != (time.Time{}) { + enqueueDuration := time.Since(mt.EnqueueStartTime) + mt.Metrics.EnqueueDuration.With(labels...).Observe(enqueueDuration.Seconds()) + } + + mt.Metrics.ProcessedCount.With(labels...).Add(1) +} + +func (mt *MetricsTracker) BeginValidate() { + mt.ValidateStartTime = time.Now() +} + +func (mt *MetricsTracker) EndValidate() { + mt.ValidateDuration = time.Since(mt.ValidateStartTime) +} + +func (mt *MetricsTracker) BeginEnqueue() { + mt.EnqueueStartTime = time.Now() +} + +// ProcessMessage validates and enqueues a single message +func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) { + tracker := &MetricsTracker{ + ChannelID: "unknown", + TxType: "unknown", + Metrics: bh.Metrics, + } + defer func() { + // This looks a little unnecessary, but if done directly as + // a defer, resp gets the (always nil) current state of resp + // and not the return value + tracker.Record(resp) + }() + tracker.BeginValidate() + + chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg) + if chdr != nil { + tracker.ChannelID = chdr.ChannelId + tracker.TxType = cb.HeaderType(chdr.Type).String() + } + if err != nil { + logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", tracker.ChannelID, addr, err) + return &ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()} + } + + if !isConfig { + logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type]) + + configSeq, err := processor.ProcessNormalMsg(msg) if err != nil { - channelID := "" - if chdr != nil { - channelID = chdr.ChannelId - } - logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", channelID, addr, err) - return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()}) + logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err) + return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()} } + tracker.EndValidate() + tracker.BeginEnqueue() if err = processor.WaitReady(); err != nil { logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err) - return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}) + return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()} + } + + err = processor.Order(msg, configSeq) + if err != nil { + logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err) + return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()} } + } else { // isConfig + logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr) - if !isConfig { - logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type]) - - configSeq, err := processor.ProcessNormalMsg(msg) - if err != nil { - logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err) - return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}) - } - - err = processor.Order(msg, configSeq) - if err != nil { - logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err) - return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}) - } - } else { // isConfig - logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr) - - config, configSeq, err := processor.ProcessConfigUpdateMsg(msg) - if err != nil { - logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err) - return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}) - } - - err = processor.Configure(config, configSeq) - if err != nil { - logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err) - return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}) - } + config, configSeq, err := processor.ProcessConfigUpdateMsg(msg) + if err != nil { + logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err) + return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()} } + tracker.EndValidate() - logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr) + tracker.BeginEnqueue() + if err = processor.WaitReady(); err != nil { + logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err) + return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()} + } - err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS}) + err = processor.Configure(config, configSeq) if err != nil { - logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err) - return err + logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err) + return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()} } } + + logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr) + + return &ab.BroadcastResponse{Status: cb.Status_SUCCESS} } // ClassifyError converts an error type into a status code. diff --git a/orderer/common/broadcast/broadcast_suite_test.go b/orderer/common/broadcast/broadcast_suite_test.go index 72ccff66175..684d8ddd3bb 100644 --- a/orderer/common/broadcast/broadcast_suite_test.go +++ b/orderer/common/broadcast/broadcast_suite_test.go @@ -9,6 +9,7 @@ package broadcast_test import ( "testing" + "github.com/hyperledger/fabric/common/metrics" ab "github.com/hyperledger/fabric/protos/orderer" . "github.com/onsi/ginkgo" @@ -20,6 +21,21 @@ type abServer interface { ab.AtomicBroadcast_BroadcastServer } +//go:generate counterfeiter -o mock/metrics_histogram.go --fake-name MetricsHistogram . metricsHistogram +type metricsHistogram interface { + metrics.Histogram +} + +//go:generate counterfeiter -o mock/metrics_counter.go --fake-name MetricsCounter . metricsCounter +type metricsCounter interface { + metrics.Counter +} + +//go:generate counterfeiter -o mock/metrics_provider.go --fake-name MetricsProvider . metricsProvider +type metricsProvider interface { + metrics.Provider +} + func TestBroadcast(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Broadcast Suite") diff --git a/orderer/common/broadcast/broadcast_test.go b/orderer/common/broadcast/broadcast_test.go index 53c3c988681..ddb77dcb17e 100644 --- a/orderer/common/broadcast/broadcast_test.go +++ b/orderer/common/broadcast/broadcast_test.go @@ -24,13 +24,33 @@ import ( var _ = Describe("Broadcast", func() { var ( - fakeSupportRegistrar *mock.ChannelSupportRegistrar - handler *broadcast.Handler + fakeSupportRegistrar *mock.ChannelSupportRegistrar + handler *broadcast.Handler + fakeValidateHistogram *mock.MetricsHistogram + fakeEnqueueHistogram *mock.MetricsHistogram + fakeProcessedCounter *mock.MetricsCounter ) BeforeEach(func() { fakeSupportRegistrar = &mock.ChannelSupportRegistrar{} - handler = broadcast.NewHandlerImpl(fakeSupportRegistrar) + + fakeValidateHistogram = &mock.MetricsHistogram{} + fakeValidateHistogram.WithReturns(fakeValidateHistogram) + + fakeEnqueueHistogram = &mock.MetricsHistogram{} + fakeEnqueueHistogram.WithReturns(fakeEnqueueHistogram) + + fakeProcessedCounter = &mock.MetricsCounter{} + fakeProcessedCounter.WithReturns(fakeProcessedCounter) + + handler = &broadcast.Handler{ + SupportRegistrar: fakeSupportRegistrar, + Metrics: &broadcast.Metrics{ + ValidateDuration: fakeValidateHistogram, + EnqueueDuration: fakeEnqueueHistogram, + ProcessedCount: fakeProcessedCounter, + }, + } }) Describe("Handle", func() { @@ -52,7 +72,7 @@ var _ = Describe("Broadcast", func() { fakeSupport.ProcessNormalMsgReturns(5, nil) fakeSupportRegistrar.BroadcastChannelSupportReturns(&cb.ChannelHeader{ - Type: 1, + Type: 3, ChannelId: "fake-channel", }, false, fakeSupport, nil) }) @@ -75,6 +95,35 @@ var _ = Describe("Broadcast", func() { Expect(orderedMsg).To(Equal(fakeMsg)) Expect(seq).To(Equal(uint64(5))) + Expect(fakeValidateHistogram.WithCallCount()).To(Equal(1)) + Expect(fakeValidateHistogram.WithArgsForCall(0)).To(Equal([]string{ + "status", "SUCCESS", + "channel", "fake-channel", + "type", "ENDORSER_TRANSACTION", + })) + Expect(fakeValidateHistogram.ObserveCallCount()).To(Equal(1)) + Expect(fakeValidateHistogram.ObserveArgsForCall(0)).To(BeNumerically(">", 0)) + Expect(fakeValidateHistogram.ObserveArgsForCall(0)).To(BeNumerically("<", 1)) + + Expect(fakeEnqueueHistogram.WithCallCount()).To(Equal(1)) + Expect(fakeEnqueueHistogram.WithArgsForCall(0)).To(Equal([]string{ + "status", "SUCCESS", + "channel", "fake-channel", + "type", "ENDORSER_TRANSACTION", + })) + Expect(fakeEnqueueHistogram.ObserveCallCount()).To(Equal(1)) + Expect(fakeEnqueueHistogram.ObserveArgsForCall(0)).To(BeNumerically(">", 0)) + Expect(fakeEnqueueHistogram.ObserveArgsForCall(0)).To(BeNumerically("<", 1)) + + Expect(fakeProcessedCounter.WithCallCount()).To(Equal(1)) + Expect(fakeProcessedCounter.WithArgsForCall(0)).To(Equal([]string{ + "status", "SUCCESS", + "channel", "fake-channel", + "type", "ENDORSER_TRANSACTION", + })) + Expect(fakeProcessedCounter.AddCallCount()).To(Equal(1)) + Expect(fakeProcessedCounter.AddArgsForCall(0)).To(Equal(float64(1))) + Expect(fakeABServer.SendCallCount()).To(Equal(1)) Expect(proto.Equal(fakeABServer.SendArgsForCall(0), &ab.BroadcastResponse{Status: cb.Status_SUCCESS})).To(BeTrue()) }) @@ -106,6 +155,15 @@ var _ = Describe("Broadcast", func() { It("does not crash", func() { err := handler.Handle(fakeABServer) Expect(err).NotTo(HaveOccurred()) + + Expect(fakeValidateHistogram.WithCallCount()).To(Equal(1)) + Expect(fakeValidateHistogram.WithArgsForCall(0)).To(Equal([]string{ + "status", "BAD_REQUEST", + "channel", "unknown", + "type", "unknown", + })) + Expect(fakeEnqueueHistogram.WithCallCount()).To(Equal(0)) + Expect(fakeProcessedCounter.WithCallCount()).To(Equal(1)) }) }) @@ -254,6 +312,23 @@ var _ = Describe("Broadcast", func() { Expect(proto.Equal(fakeABServer.SendArgsForCall(0), &ab.BroadcastResponse{Status: cb.Status_SUCCESS})).To(BeTrue()) }) + Context("when the consenter is not ready for the request", func() { + BeforeEach(func() { + fakeSupport.WaitReadyReturns(fmt.Errorf("not-ready")) + }) + + It("returns the error to the client with a service unavailable status", func() { + err := handler.Handle(fakeABServer) + Expect(err).NotTo(HaveOccurred()) + + Expect(fakeABServer.SendCallCount()).To(Equal(1)) + Expect(proto.Equal( + fakeABServer.SendArgsForCall(0), + &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: "not-ready"}), + ).To(BeTrue()) + }) + }) + Context("when the consenter cannot enqueue the message", func() { BeforeEach(func() { fakeSupport.ConfigureReturns(fmt.Errorf("consenter-error")) diff --git a/orderer/common/broadcast/metrics.go b/orderer/common/broadcast/metrics.go new file mode 100644 index 00000000000..1d9cfdf59cc --- /dev/null +++ b/orderer/common/broadcast/metrics.go @@ -0,0 +1,47 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package broadcast + +import "github.com/hyperledger/fabric/common/metrics" + +var ( + validateDuration = metrics.HistogramOpts{ + Namespace: "broadcast", + Name: "validate_duration", + Help: "The time to validate a transaction in seconds.", + LabelNames: []string{"channel", "type", "status"}, + StatsdFormat: "%{#fqname}.%{channel}.%{type}.%{status}", + } + enqueueDuration = metrics.HistogramOpts{ + Namespace: "broadcast", + Name: "enqueue_duration", + Help: "The time to enqueue a transaction in seconds.", + LabelNames: []string{"channel", "type", "status"}, + StatsdFormat: "%{#fqname}.%{channel}.%{type}.%{status}", + } + processedCount = metrics.CounterOpts{ + Namespace: "broadcast", + Name: "processed_count", + Help: "The number of transactions processed.", + LabelNames: []string{"channel", "type", "status"}, + StatsdFormat: "%{#fqname}.%{channel}.%{type}.%{status}", + } +) + +type Metrics struct { + ValidateDuration metrics.Histogram + EnqueueDuration metrics.Histogram + ProcessedCount metrics.Counter +} + +func NewMetrics(p metrics.Provider) *Metrics { + return &Metrics{ + ValidateDuration: p.NewHistogram(validateDuration), + EnqueueDuration: p.NewHistogram(enqueueDuration), + ProcessedCount: p.NewCounter(processedCount), + } +} diff --git a/orderer/common/broadcast/metrics_test.go b/orderer/common/broadcast/metrics_test.go new file mode 100644 index 00000000000..f811e42ee11 --- /dev/null +++ b/orderer/common/broadcast/metrics_test.go @@ -0,0 +1,38 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package broadcast_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/hyperledger/fabric/orderer/common/broadcast" + "github.com/hyperledger/fabric/orderer/common/broadcast/mock" +) + +var _ = Describe("Metrics", func() { + var ( + fakeProvider *mock.MetricsProvider + ) + + BeforeEach(func() { + fakeProvider = &mock.MetricsProvider{} + fakeProvider.NewHistogramReturns(&mock.MetricsHistogram{}) + fakeProvider.NewCounterReturns(&mock.MetricsCounter{}) + }) + + It("uses the provider to initialize all fields", func() { + metrics := broadcast.NewMetrics(fakeProvider) + Expect(metrics).NotTo(BeNil()) + Expect(metrics.ValidateDuration).To(Equal(&mock.MetricsHistogram{})) + Expect(metrics.EnqueueDuration).To(Equal(&mock.MetricsHistogram{})) + Expect(metrics.ProcessedCount).To(Equal(&mock.MetricsCounter{})) + + Expect(fakeProvider.NewHistogramCallCount()).To(Equal(2)) + Expect(fakeProvider.NewCounterCallCount()).To(Equal(1)) + }) +}) diff --git a/orderer/common/broadcast/mock/metrics_counter.go b/orderer/common/broadcast/mock/metrics_counter.go new file mode 100644 index 00000000000..dc38e8ea1aa --- /dev/null +++ b/orderer/common/broadcast/mock/metrics_counter.go @@ -0,0 +1,127 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mock + +import ( + "sync" + + "github.com/hyperledger/fabric/common/metrics" +) + +type MetricsCounter struct { + WithStub func(labelValues ...string) metrics.Counter + withMutex sync.RWMutex + withArgsForCall []struct { + labelValues []string + } + withReturns struct { + result1 metrics.Counter + } + withReturnsOnCall map[int]struct { + result1 metrics.Counter + } + AddStub func(delta float64) + addMutex sync.RWMutex + addArgsForCall []struct { + delta float64 + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *MetricsCounter) With(labelValues ...string) metrics.Counter { + fake.withMutex.Lock() + ret, specificReturn := fake.withReturnsOnCall[len(fake.withArgsForCall)] + fake.withArgsForCall = append(fake.withArgsForCall, struct { + labelValues []string + }{labelValues}) + fake.recordInvocation("With", []interface{}{labelValues}) + fake.withMutex.Unlock() + if fake.WithStub != nil { + return fake.WithStub(labelValues...) + } + if specificReturn { + return ret.result1 + } + return fake.withReturns.result1 +} + +func (fake *MetricsCounter) WithCallCount() int { + fake.withMutex.RLock() + defer fake.withMutex.RUnlock() + return len(fake.withArgsForCall) +} + +func (fake *MetricsCounter) WithArgsForCall(i int) []string { + fake.withMutex.RLock() + defer fake.withMutex.RUnlock() + return fake.withArgsForCall[i].labelValues +} + +func (fake *MetricsCounter) WithReturns(result1 metrics.Counter) { + fake.WithStub = nil + fake.withReturns = struct { + result1 metrics.Counter + }{result1} +} + +func (fake *MetricsCounter) WithReturnsOnCall(i int, result1 metrics.Counter) { + fake.WithStub = nil + if fake.withReturnsOnCall == nil { + fake.withReturnsOnCall = make(map[int]struct { + result1 metrics.Counter + }) + } + fake.withReturnsOnCall[i] = struct { + result1 metrics.Counter + }{result1} +} + +func (fake *MetricsCounter) Add(delta float64) { + fake.addMutex.Lock() + fake.addArgsForCall = append(fake.addArgsForCall, struct { + delta float64 + }{delta}) + fake.recordInvocation("Add", []interface{}{delta}) + fake.addMutex.Unlock() + if fake.AddStub != nil { + fake.AddStub(delta) + } +} + +func (fake *MetricsCounter) AddCallCount() int { + fake.addMutex.RLock() + defer fake.addMutex.RUnlock() + return len(fake.addArgsForCall) +} + +func (fake *MetricsCounter) AddArgsForCall(i int) float64 { + fake.addMutex.RLock() + defer fake.addMutex.RUnlock() + return fake.addArgsForCall[i].delta +} + +func (fake *MetricsCounter) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.withMutex.RLock() + defer fake.withMutex.RUnlock() + fake.addMutex.RLock() + defer fake.addMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *MetricsCounter) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/orderer/common/broadcast/mock/metrics_histogram.go b/orderer/common/broadcast/mock/metrics_histogram.go new file mode 100644 index 00000000000..1c1884c50d0 --- /dev/null +++ b/orderer/common/broadcast/mock/metrics_histogram.go @@ -0,0 +1,127 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mock + +import ( + "sync" + + "github.com/hyperledger/fabric/common/metrics" +) + +type MetricsHistogram struct { + WithStub func(labelValues ...string) metrics.Histogram + withMutex sync.RWMutex + withArgsForCall []struct { + labelValues []string + } + withReturns struct { + result1 metrics.Histogram + } + withReturnsOnCall map[int]struct { + result1 metrics.Histogram + } + ObserveStub func(value float64) + observeMutex sync.RWMutex + observeArgsForCall []struct { + value float64 + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *MetricsHistogram) With(labelValues ...string) metrics.Histogram { + fake.withMutex.Lock() + ret, specificReturn := fake.withReturnsOnCall[len(fake.withArgsForCall)] + fake.withArgsForCall = append(fake.withArgsForCall, struct { + labelValues []string + }{labelValues}) + fake.recordInvocation("With", []interface{}{labelValues}) + fake.withMutex.Unlock() + if fake.WithStub != nil { + return fake.WithStub(labelValues...) + } + if specificReturn { + return ret.result1 + } + return fake.withReturns.result1 +} + +func (fake *MetricsHistogram) WithCallCount() int { + fake.withMutex.RLock() + defer fake.withMutex.RUnlock() + return len(fake.withArgsForCall) +} + +func (fake *MetricsHistogram) WithArgsForCall(i int) []string { + fake.withMutex.RLock() + defer fake.withMutex.RUnlock() + return fake.withArgsForCall[i].labelValues +} + +func (fake *MetricsHistogram) WithReturns(result1 metrics.Histogram) { + fake.WithStub = nil + fake.withReturns = struct { + result1 metrics.Histogram + }{result1} +} + +func (fake *MetricsHistogram) WithReturnsOnCall(i int, result1 metrics.Histogram) { + fake.WithStub = nil + if fake.withReturnsOnCall == nil { + fake.withReturnsOnCall = make(map[int]struct { + result1 metrics.Histogram + }) + } + fake.withReturnsOnCall[i] = struct { + result1 metrics.Histogram + }{result1} +} + +func (fake *MetricsHistogram) Observe(value float64) { + fake.observeMutex.Lock() + fake.observeArgsForCall = append(fake.observeArgsForCall, struct { + value float64 + }{value}) + fake.recordInvocation("Observe", []interface{}{value}) + fake.observeMutex.Unlock() + if fake.ObserveStub != nil { + fake.ObserveStub(value) + } +} + +func (fake *MetricsHistogram) ObserveCallCount() int { + fake.observeMutex.RLock() + defer fake.observeMutex.RUnlock() + return len(fake.observeArgsForCall) +} + +func (fake *MetricsHistogram) ObserveArgsForCall(i int) float64 { + fake.observeMutex.RLock() + defer fake.observeMutex.RUnlock() + return fake.observeArgsForCall[i].value +} + +func (fake *MetricsHistogram) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.withMutex.RLock() + defer fake.withMutex.RUnlock() + fake.observeMutex.RLock() + defer fake.observeMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *MetricsHistogram) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/orderer/common/broadcast/mock/metrics_provider.go b/orderer/common/broadcast/mock/metrics_provider.go new file mode 100644 index 00000000000..6c0eafa95b4 --- /dev/null +++ b/orderer/common/broadcast/mock/metrics_provider.go @@ -0,0 +1,218 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mock + +import ( + "sync" + + "github.com/hyperledger/fabric/common/metrics" +) + +type MetricsProvider struct { + NewCounterStub func(metrics.CounterOpts) metrics.Counter + newCounterMutex sync.RWMutex + newCounterArgsForCall []struct { + arg1 metrics.CounterOpts + } + newCounterReturns struct { + result1 metrics.Counter + } + newCounterReturnsOnCall map[int]struct { + result1 metrics.Counter + } + NewGaugeStub func(metrics.GaugeOpts) metrics.Gauge + newGaugeMutex sync.RWMutex + newGaugeArgsForCall []struct { + arg1 metrics.GaugeOpts + } + newGaugeReturns struct { + result1 metrics.Gauge + } + newGaugeReturnsOnCall map[int]struct { + result1 metrics.Gauge + } + NewHistogramStub func(metrics.HistogramOpts) metrics.Histogram + newHistogramMutex sync.RWMutex + newHistogramArgsForCall []struct { + arg1 metrics.HistogramOpts + } + newHistogramReturns struct { + result1 metrics.Histogram + } + newHistogramReturnsOnCall map[int]struct { + result1 metrics.Histogram + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *MetricsProvider) NewCounter(arg1 metrics.CounterOpts) metrics.Counter { + fake.newCounterMutex.Lock() + ret, specificReturn := fake.newCounterReturnsOnCall[len(fake.newCounterArgsForCall)] + fake.newCounterArgsForCall = append(fake.newCounterArgsForCall, struct { + arg1 metrics.CounterOpts + }{arg1}) + fake.recordInvocation("NewCounter", []interface{}{arg1}) + fake.newCounterMutex.Unlock() + if fake.NewCounterStub != nil { + return fake.NewCounterStub(arg1) + } + if specificReturn { + return ret.result1 + } + return fake.newCounterReturns.result1 +} + +func (fake *MetricsProvider) NewCounterCallCount() int { + fake.newCounterMutex.RLock() + defer fake.newCounterMutex.RUnlock() + return len(fake.newCounterArgsForCall) +} + +func (fake *MetricsProvider) NewCounterArgsForCall(i int) metrics.CounterOpts { + fake.newCounterMutex.RLock() + defer fake.newCounterMutex.RUnlock() + return fake.newCounterArgsForCall[i].arg1 +} + +func (fake *MetricsProvider) NewCounterReturns(result1 metrics.Counter) { + fake.NewCounterStub = nil + fake.newCounterReturns = struct { + result1 metrics.Counter + }{result1} +} + +func (fake *MetricsProvider) NewCounterReturnsOnCall(i int, result1 metrics.Counter) { + fake.NewCounterStub = nil + if fake.newCounterReturnsOnCall == nil { + fake.newCounterReturnsOnCall = make(map[int]struct { + result1 metrics.Counter + }) + } + fake.newCounterReturnsOnCall[i] = struct { + result1 metrics.Counter + }{result1} +} + +func (fake *MetricsProvider) NewGauge(arg1 metrics.GaugeOpts) metrics.Gauge { + fake.newGaugeMutex.Lock() + ret, specificReturn := fake.newGaugeReturnsOnCall[len(fake.newGaugeArgsForCall)] + fake.newGaugeArgsForCall = append(fake.newGaugeArgsForCall, struct { + arg1 metrics.GaugeOpts + }{arg1}) + fake.recordInvocation("NewGauge", []interface{}{arg1}) + fake.newGaugeMutex.Unlock() + if fake.NewGaugeStub != nil { + return fake.NewGaugeStub(arg1) + } + if specificReturn { + return ret.result1 + } + return fake.newGaugeReturns.result1 +} + +func (fake *MetricsProvider) NewGaugeCallCount() int { + fake.newGaugeMutex.RLock() + defer fake.newGaugeMutex.RUnlock() + return len(fake.newGaugeArgsForCall) +} + +func (fake *MetricsProvider) NewGaugeArgsForCall(i int) metrics.GaugeOpts { + fake.newGaugeMutex.RLock() + defer fake.newGaugeMutex.RUnlock() + return fake.newGaugeArgsForCall[i].arg1 +} + +func (fake *MetricsProvider) NewGaugeReturns(result1 metrics.Gauge) { + fake.NewGaugeStub = nil + fake.newGaugeReturns = struct { + result1 metrics.Gauge + }{result1} +} + +func (fake *MetricsProvider) NewGaugeReturnsOnCall(i int, result1 metrics.Gauge) { + fake.NewGaugeStub = nil + if fake.newGaugeReturnsOnCall == nil { + fake.newGaugeReturnsOnCall = make(map[int]struct { + result1 metrics.Gauge + }) + } + fake.newGaugeReturnsOnCall[i] = struct { + result1 metrics.Gauge + }{result1} +} + +func (fake *MetricsProvider) NewHistogram(arg1 metrics.HistogramOpts) metrics.Histogram { + fake.newHistogramMutex.Lock() + ret, specificReturn := fake.newHistogramReturnsOnCall[len(fake.newHistogramArgsForCall)] + fake.newHistogramArgsForCall = append(fake.newHistogramArgsForCall, struct { + arg1 metrics.HistogramOpts + }{arg1}) + fake.recordInvocation("NewHistogram", []interface{}{arg1}) + fake.newHistogramMutex.Unlock() + if fake.NewHistogramStub != nil { + return fake.NewHistogramStub(arg1) + } + if specificReturn { + return ret.result1 + } + return fake.newHistogramReturns.result1 +} + +func (fake *MetricsProvider) NewHistogramCallCount() int { + fake.newHistogramMutex.RLock() + defer fake.newHistogramMutex.RUnlock() + return len(fake.newHistogramArgsForCall) +} + +func (fake *MetricsProvider) NewHistogramArgsForCall(i int) metrics.HistogramOpts { + fake.newHistogramMutex.RLock() + defer fake.newHistogramMutex.RUnlock() + return fake.newHistogramArgsForCall[i].arg1 +} + +func (fake *MetricsProvider) NewHistogramReturns(result1 metrics.Histogram) { + fake.NewHistogramStub = nil + fake.newHistogramReturns = struct { + result1 metrics.Histogram + }{result1} +} + +func (fake *MetricsProvider) NewHistogramReturnsOnCall(i int, result1 metrics.Histogram) { + fake.NewHistogramStub = nil + if fake.newHistogramReturnsOnCall == nil { + fake.newHistogramReturnsOnCall = make(map[int]struct { + result1 metrics.Histogram + }) + } + fake.newHistogramReturnsOnCall[i] = struct { + result1 metrics.Histogram + }{result1} +} + +func (fake *MetricsProvider) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.newCounterMutex.RLock() + defer fake.newCounterMutex.RUnlock() + fake.newGaugeMutex.RLock() + defer fake.newGaugeMutex.RUnlock() + fake.newHistogramMutex.RLock() + defer fake.newHistogramMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *MetricsProvider) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/orderer/common/server/main.go b/orderer/common/server/main.go index 18c37aaac11..0e5b6946c05 100644 --- a/orderer/common/server/main.go +++ b/orderer/common/server/main.go @@ -128,7 +128,7 @@ func Start(cmd string, conf *localconfig.TopLevel) { manager := initializeMultichannelRegistrar(clusterType, clusterDialer, serverConfig, grpcServer, conf, signer, tlsCallback) mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert - server := NewServer(manager, signer, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS) + server := NewServer(manager, metricsProvider, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS) switch cmd { case start.FullCommand(): // "start" command diff --git a/orderer/common/server/server.go b/orderer/common/server/server.go index 1b1d43cc076..0d6aa95296e 100644 --- a/orderer/common/server/server.go +++ b/orderer/common/server/server.go @@ -14,8 +14,8 @@ import ( "time" "github.com/golang/protobuf/proto" - "github.com/hyperledger/fabric/common/crypto" "github.com/hyperledger/fabric/common/deliver" + "github.com/hyperledger/fabric/common/metrics" "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/orderer/common/broadcast" localconfig "github.com/hyperledger/fabric/orderer/common/localconfig" @@ -72,10 +72,13 @@ func (rs *responseSender) SendBlockResponse(block *cb.Block) error { } // NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Reader -func NewServer(r *multichannel.Registrar, _ crypto.LocalSigner, debug *localconfig.Debug, timeWindow time.Duration, mutualTLS bool) ab.AtomicBroadcastServer { +func NewServer(r *multichannel.Registrar, metricsProvider metrics.Provider, debug *localconfig.Debug, timeWindow time.Duration, mutualTLS bool) ab.AtomicBroadcastServer { s := &server{ - dh: deliver.NewHandler(deliverSupport{Registrar: r}, timeWindow, mutualTLS), - bh: broadcast.NewHandlerImpl(broadcastSupport{Registrar: r}), + dh: deliver.NewHandler(deliverSupport{Registrar: r}, timeWindow, mutualTLS), + bh: &broadcast.Handler{ + SupportRegistrar: broadcastSupport{Registrar: r}, + Metrics: broadcast.NewMetrics(metricsProvider), + }, debug: debug, Registrar: r, }