diff --git a/common/grpcmetrics/interceptor.go b/common/grpcmetrics/interceptor.go index 1e426fd4e75..875cb044611 100644 --- a/common/grpcmetrics/interceptor.go +++ b/common/grpcmetrics/interceptor.go @@ -32,7 +32,7 @@ func UnaryServerInterceptor(um *UnaryMetrics) grpc.UnaryServerInterceptor { um.RequestDuration.With( "service", service, "method", method, "code", grpc.Code(err).String(), - ).Observe(float64(duration) / float64(time.Second)) + ).Observe(duration.Seconds()) um.RequestsCompleted.With("service", service, "method", method, "code", grpc.Code(err).String()).Add(1) return resp, err @@ -65,7 +65,7 @@ func StreamServerInterceptor(sm *StreamMetrics) grpc.StreamServerInterceptor { sm.RequestDuration.With( "service", service, "method", method, "code", grpc.Code(err).String(), - ).Observe(float64(duration) / float64(time.Second)) + ).Observe(duration.Seconds()) sm.RequestsCompleted.With("service", service, "method", method, "code", grpc.Code(err).String()).Add(1) return err diff --git a/orderer/common/broadcast/broadcast.go b/orderer/common/broadcast/broadcast.go index d25812f4592..9cb626d4ae9 100644 --- a/orderer/common/broadcast/broadcast.go +++ b/orderer/common/broadcast/broadcast.go @@ -8,6 +8,7 @@ package broadcast import ( "io" + "time" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/util" @@ -19,8 +20,9 @@ import ( var logger = flogging.MustGetLogger("orderer.common.broadcast") -// ChannelSupportRegistrar provides a way for the Handler to look up the Support for a channel //go:generate counterfeiter -o mock/channel_support_registrar.go --fake-name ChannelSupportRegistrar . ChannelSupportRegistrar + +// ChannelSupportRegistrar provides a way for the Handler to look up the Support for a channel type ChannelSupportRegistrar interface { // BroadcastChannelSupport returns the message channel header, whether the message is a config update // and the channel resources for a message or an error if the message is not a message which can @@ -28,8 +30,9 @@ type ChannelSupportRegistrar interface { BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, ChannelSupport, error) } -// ChannelSupport provides the backing resources needed to support broadcast on a channel //go:generate counterfeiter -o mock/channel_support.go --fake-name ChannelSupport . ChannelSupport + +// ChannelSupport provides the backing resources needed to support broadcast on a channel type ChannelSupport interface { msgprocessor.Processor Consenter @@ -53,18 +56,13 @@ type Consenter interface { WaitReady() error } +// Handler is designed to handle connections from Broadcast AB gRPC service type Handler struct { - sm ChannelSupportRegistrar + SupportRegistrar ChannelSupportRegistrar + Metrics *Metrics } -// NewHandlerImpl constructs a new implementation of the Handler interface -func NewHandlerImpl(sm ChannelSupportRegistrar) *Handler { - return &Handler{ - sm: sm, - } -} - -// Handle starts a service thread for a given gRPC connection and services the broadcast connection +// Handle reads requests from a Broadcast stream, processes them, and returns the responses to the stream func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { addr := util.ExtractRemoteAddress(srv.Context()) logger.Debugf("Starting new broadcast loop for %s", addr) @@ -79,59 +77,133 @@ func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { return err } - chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg) + resp := bh.ProcessMessage(msg, addr) + err = srv.Send(resp) + if resp.Status != cb.Status_SUCCESS { + return err + } + + if err != nil { + logger.Warningf("Error sending to %s: %s", addr, err) + return err + } + } + +} + +type MetricsTracker struct { + ValidateStartTime time.Time + EnqueueStartTime time.Time + ValidateDuration time.Duration + ChannelID string + TxType string + Metrics *Metrics +} + +func (mt *MetricsTracker) Record(resp *ab.BroadcastResponse) { + labels := []string{ + "status", resp.Status.String(), + "channel", mt.ChannelID, + "type", mt.TxType, + } + + if mt.ValidateDuration == 0 { + mt.EndValidate() + } + mt.Metrics.ValidateDuration.With(labels...).Observe(mt.ValidateDuration.Seconds()) + + if mt.EnqueueStartTime != (time.Time{}) { + enqueueDuration := time.Since(mt.EnqueueStartTime) + mt.Metrics.EnqueueDuration.With(labels...).Observe(enqueueDuration.Seconds()) + } + + mt.Metrics.ProcessedCount.With(labels...).Add(1) +} + +func (mt *MetricsTracker) BeginValidate() { + mt.ValidateStartTime = time.Now() +} + +func (mt *MetricsTracker) EndValidate() { + mt.ValidateDuration = time.Since(mt.ValidateStartTime) +} + +func (mt *MetricsTracker) BeginEnqueue() { + mt.EnqueueStartTime = time.Now() +} + +// ProcessMessage validates and enqueues a single message +func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) { + tracker := &MetricsTracker{ + ChannelID: "unknown", + TxType: "unknown", + Metrics: bh.Metrics, + } + defer func() { + // This looks a little unnecessary, but if done directly as + // a defer, resp gets the (always nil) current state of resp + // and not the return value + tracker.Record(resp) + }() + tracker.BeginValidate() + + chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg) + if chdr != nil { + tracker.ChannelID = chdr.ChannelId + tracker.TxType = cb.HeaderType(chdr.Type).String() + } + if err != nil { + logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", tracker.ChannelID, addr, err) + return &ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()} + } + + if !isConfig { + logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type]) + + configSeq, err := processor.ProcessNormalMsg(msg) if err != nil { - channelID := "" - if chdr != nil { - channelID = chdr.ChannelId - } - logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", channelID, addr, err) - return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()}) + logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err) + return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()} } + tracker.EndValidate() + tracker.BeginEnqueue() if err = processor.WaitReady(); err != nil { logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err) - return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}) + return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()} + } + + err = processor.Order(msg, configSeq) + if err != nil { + logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err) + return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()} } + } else { // isConfig + logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr) - if !isConfig { - logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type]) - - configSeq, err := processor.ProcessNormalMsg(msg) - if err != nil { - logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err) - return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}) - } - - err = processor.Order(msg, configSeq) - if err != nil { - logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err) - return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}) - } - } else { // isConfig - logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr) - - config, configSeq, err := processor.ProcessConfigUpdateMsg(msg) - if err != nil { - logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err) - return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}) - } - - err = processor.Configure(config, configSeq) - if err != nil { - logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err) - return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}) - } + config, configSeq, err := processor.ProcessConfigUpdateMsg(msg) + if err != nil { + logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err) + return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()} } + tracker.EndValidate() - logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr) + tracker.BeginEnqueue() + if err = processor.WaitReady(); err != nil { + logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err) + return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()} + } - err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS}) + err = processor.Configure(config, configSeq) if err != nil { - logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err) - return err + logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err) + return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()} } } + + logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr) + + return &ab.BroadcastResponse{Status: cb.Status_SUCCESS} } // ClassifyError converts an error type into a status code. diff --git a/orderer/common/broadcast/broadcast_suite_test.go b/orderer/common/broadcast/broadcast_suite_test.go index 72ccff66175..684d8ddd3bb 100644 --- a/orderer/common/broadcast/broadcast_suite_test.go +++ b/orderer/common/broadcast/broadcast_suite_test.go @@ -9,6 +9,7 @@ package broadcast_test import ( "testing" + "github.com/hyperledger/fabric/common/metrics" ab "github.com/hyperledger/fabric/protos/orderer" . "github.com/onsi/ginkgo" @@ -20,6 +21,21 @@ type abServer interface { ab.AtomicBroadcast_BroadcastServer } +//go:generate counterfeiter -o mock/metrics_histogram.go --fake-name MetricsHistogram . metricsHistogram +type metricsHistogram interface { + metrics.Histogram +} + +//go:generate counterfeiter -o mock/metrics_counter.go --fake-name MetricsCounter . metricsCounter +type metricsCounter interface { + metrics.Counter +} + +//go:generate counterfeiter -o mock/metrics_provider.go --fake-name MetricsProvider . metricsProvider +type metricsProvider interface { + metrics.Provider +} + func TestBroadcast(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Broadcast Suite") diff --git a/orderer/common/broadcast/broadcast_test.go b/orderer/common/broadcast/broadcast_test.go index 53c3c988681..ddb77dcb17e 100644 --- a/orderer/common/broadcast/broadcast_test.go +++ b/orderer/common/broadcast/broadcast_test.go @@ -24,13 +24,33 @@ import ( var _ = Describe("Broadcast", func() { var ( - fakeSupportRegistrar *mock.ChannelSupportRegistrar - handler *broadcast.Handler + fakeSupportRegistrar *mock.ChannelSupportRegistrar + handler *broadcast.Handler + fakeValidateHistogram *mock.MetricsHistogram + fakeEnqueueHistogram *mock.MetricsHistogram + fakeProcessedCounter *mock.MetricsCounter ) BeforeEach(func() { fakeSupportRegistrar = &mock.ChannelSupportRegistrar{} - handler = broadcast.NewHandlerImpl(fakeSupportRegistrar) + + fakeValidateHistogram = &mock.MetricsHistogram{} + fakeValidateHistogram.WithReturns(fakeValidateHistogram) + + fakeEnqueueHistogram = &mock.MetricsHistogram{} + fakeEnqueueHistogram.WithReturns(fakeEnqueueHistogram) + + fakeProcessedCounter = &mock.MetricsCounter{} + fakeProcessedCounter.WithReturns(fakeProcessedCounter) + + handler = &broadcast.Handler{ + SupportRegistrar: fakeSupportRegistrar, + Metrics: &broadcast.Metrics{ + ValidateDuration: fakeValidateHistogram, + EnqueueDuration: fakeEnqueueHistogram, + ProcessedCount: fakeProcessedCounter, + }, + } }) Describe("Handle", func() { @@ -52,7 +72,7 @@ var _ = Describe("Broadcast", func() { fakeSupport.ProcessNormalMsgReturns(5, nil) fakeSupportRegistrar.BroadcastChannelSupportReturns(&cb.ChannelHeader{ - Type: 1, + Type: 3, ChannelId: "fake-channel", }, false, fakeSupport, nil) }) @@ -75,6 +95,35 @@ var _ = Describe("Broadcast", func() { Expect(orderedMsg).To(Equal(fakeMsg)) Expect(seq).To(Equal(uint64(5))) + Expect(fakeValidateHistogram.WithCallCount()).To(Equal(1)) + Expect(fakeValidateHistogram.WithArgsForCall(0)).To(Equal([]string{ + "status", "SUCCESS", + "channel", "fake-channel", + "type", "ENDORSER_TRANSACTION", + })) + Expect(fakeValidateHistogram.ObserveCallCount()).To(Equal(1)) + Expect(fakeValidateHistogram.ObserveArgsForCall(0)).To(BeNumerically(">", 0)) + Expect(fakeValidateHistogram.ObserveArgsForCall(0)).To(BeNumerically("<", 1)) + + Expect(fakeEnqueueHistogram.WithCallCount()).To(Equal(1)) + Expect(fakeEnqueueHistogram.WithArgsForCall(0)).To(Equal([]string{ + "status", "SUCCESS", + "channel", "fake-channel", + "type", "ENDORSER_TRANSACTION", + })) + Expect(fakeEnqueueHistogram.ObserveCallCount()).To(Equal(1)) + Expect(fakeEnqueueHistogram.ObserveArgsForCall(0)).To(BeNumerically(">", 0)) + Expect(fakeEnqueueHistogram.ObserveArgsForCall(0)).To(BeNumerically("<", 1)) + + Expect(fakeProcessedCounter.WithCallCount()).To(Equal(1)) + Expect(fakeProcessedCounter.WithArgsForCall(0)).To(Equal([]string{ + "status", "SUCCESS", + "channel", "fake-channel", + "type", "ENDORSER_TRANSACTION", + })) + Expect(fakeProcessedCounter.AddCallCount()).To(Equal(1)) + Expect(fakeProcessedCounter.AddArgsForCall(0)).To(Equal(float64(1))) + Expect(fakeABServer.SendCallCount()).To(Equal(1)) Expect(proto.Equal(fakeABServer.SendArgsForCall(0), &ab.BroadcastResponse{Status: cb.Status_SUCCESS})).To(BeTrue()) }) @@ -106,6 +155,15 @@ var _ = Describe("Broadcast", func() { It("does not crash", func() { err := handler.Handle(fakeABServer) Expect(err).NotTo(HaveOccurred()) + + Expect(fakeValidateHistogram.WithCallCount()).To(Equal(1)) + Expect(fakeValidateHistogram.WithArgsForCall(0)).To(Equal([]string{ + "status", "BAD_REQUEST", + "channel", "unknown", + "type", "unknown", + })) + Expect(fakeEnqueueHistogram.WithCallCount()).To(Equal(0)) + Expect(fakeProcessedCounter.WithCallCount()).To(Equal(1)) }) }) @@ -254,6 +312,23 @@ var _ = Describe("Broadcast", func() { Expect(proto.Equal(fakeABServer.SendArgsForCall(0), &ab.BroadcastResponse{Status: cb.Status_SUCCESS})).To(BeTrue()) }) + Context("when the consenter is not ready for the request", func() { + BeforeEach(func() { + fakeSupport.WaitReadyReturns(fmt.Errorf("not-ready")) + }) + + It("returns the error to the client with a service unavailable status", func() { + err := handler.Handle(fakeABServer) + Expect(err).NotTo(HaveOccurred()) + + Expect(fakeABServer.SendCallCount()).To(Equal(1)) + Expect(proto.Equal( + fakeABServer.SendArgsForCall(0), + &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: "not-ready"}), + ).To(BeTrue()) + }) + }) + Context("when the consenter cannot enqueue the message", func() { BeforeEach(func() { fakeSupport.ConfigureReturns(fmt.Errorf("consenter-error")) diff --git a/orderer/common/broadcast/metrics.go b/orderer/common/broadcast/metrics.go new file mode 100644 index 00000000000..1d9cfdf59cc --- /dev/null +++ b/orderer/common/broadcast/metrics.go @@ -0,0 +1,47 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package broadcast + +import "github.com/hyperledger/fabric/common/metrics" + +var ( + validateDuration = metrics.HistogramOpts{ + Namespace: "broadcast", + Name: "validate_duration", + Help: "The time to validate a transaction in seconds.", + LabelNames: []string{"channel", "type", "status"}, + StatsdFormat: "%{#fqname}.%{channel}.%{type}.%{status}", + } + enqueueDuration = metrics.HistogramOpts{ + Namespace: "broadcast", + Name: "enqueue_duration", + Help: "The time to enqueue a transaction in seconds.", + LabelNames: []string{"channel", "type", "status"}, + StatsdFormat: "%{#fqname}.%{channel}.%{type}.%{status}", + } + processedCount = metrics.CounterOpts{ + Namespace: "broadcast", + Name: "processed_count", + Help: "The number of transactions processed.", + LabelNames: []string{"channel", "type", "status"}, + StatsdFormat: "%{#fqname}.%{channel}.%{type}.%{status}", + } +) + +type Metrics struct { + ValidateDuration metrics.Histogram + EnqueueDuration metrics.Histogram + ProcessedCount metrics.Counter +} + +func NewMetrics(p metrics.Provider) *Metrics { + return &Metrics{ + ValidateDuration: p.NewHistogram(validateDuration), + EnqueueDuration: p.NewHistogram(enqueueDuration), + ProcessedCount: p.NewCounter(processedCount), + } +} diff --git a/orderer/common/broadcast/metrics_test.go b/orderer/common/broadcast/metrics_test.go new file mode 100644 index 00000000000..f811e42ee11 --- /dev/null +++ b/orderer/common/broadcast/metrics_test.go @@ -0,0 +1,38 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package broadcast_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/hyperledger/fabric/orderer/common/broadcast" + "github.com/hyperledger/fabric/orderer/common/broadcast/mock" +) + +var _ = Describe("Metrics", func() { + var ( + fakeProvider *mock.MetricsProvider + ) + + BeforeEach(func() { + fakeProvider = &mock.MetricsProvider{} + fakeProvider.NewHistogramReturns(&mock.MetricsHistogram{}) + fakeProvider.NewCounterReturns(&mock.MetricsCounter{}) + }) + + It("uses the provider to initialize all fields", func() { + metrics := broadcast.NewMetrics(fakeProvider) + Expect(metrics).NotTo(BeNil()) + Expect(metrics.ValidateDuration).To(Equal(&mock.MetricsHistogram{})) + Expect(metrics.EnqueueDuration).To(Equal(&mock.MetricsHistogram{})) + Expect(metrics.ProcessedCount).To(Equal(&mock.MetricsCounter{})) + + Expect(fakeProvider.NewHistogramCallCount()).To(Equal(2)) + Expect(fakeProvider.NewCounterCallCount()).To(Equal(1)) + }) +}) diff --git a/orderer/common/broadcast/mock/metrics_counter.go b/orderer/common/broadcast/mock/metrics_counter.go new file mode 100644 index 00000000000..dc38e8ea1aa --- /dev/null +++ b/orderer/common/broadcast/mock/metrics_counter.go @@ -0,0 +1,127 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mock + +import ( + "sync" + + "github.com/hyperledger/fabric/common/metrics" +) + +type MetricsCounter struct { + WithStub func(labelValues ...string) metrics.Counter + withMutex sync.RWMutex + withArgsForCall []struct { + labelValues []string + } + withReturns struct { + result1 metrics.Counter + } + withReturnsOnCall map[int]struct { + result1 metrics.Counter + } + AddStub func(delta float64) + addMutex sync.RWMutex + addArgsForCall []struct { + delta float64 + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *MetricsCounter) With(labelValues ...string) metrics.Counter { + fake.withMutex.Lock() + ret, specificReturn := fake.withReturnsOnCall[len(fake.withArgsForCall)] + fake.withArgsForCall = append(fake.withArgsForCall, struct { + labelValues []string + }{labelValues}) + fake.recordInvocation("With", []interface{}{labelValues}) + fake.withMutex.Unlock() + if fake.WithStub != nil { + return fake.WithStub(labelValues...) + } + if specificReturn { + return ret.result1 + } + return fake.withReturns.result1 +} + +func (fake *MetricsCounter) WithCallCount() int { + fake.withMutex.RLock() + defer fake.withMutex.RUnlock() + return len(fake.withArgsForCall) +} + +func (fake *MetricsCounter) WithArgsForCall(i int) []string { + fake.withMutex.RLock() + defer fake.withMutex.RUnlock() + return fake.withArgsForCall[i].labelValues +} + +func (fake *MetricsCounter) WithReturns(result1 metrics.Counter) { + fake.WithStub = nil + fake.withReturns = struct { + result1 metrics.Counter + }{result1} +} + +func (fake *MetricsCounter) WithReturnsOnCall(i int, result1 metrics.Counter) { + fake.WithStub = nil + if fake.withReturnsOnCall == nil { + fake.withReturnsOnCall = make(map[int]struct { + result1 metrics.Counter + }) + } + fake.withReturnsOnCall[i] = struct { + result1 metrics.Counter + }{result1} +} + +func (fake *MetricsCounter) Add(delta float64) { + fake.addMutex.Lock() + fake.addArgsForCall = append(fake.addArgsForCall, struct { + delta float64 + }{delta}) + fake.recordInvocation("Add", []interface{}{delta}) + fake.addMutex.Unlock() + if fake.AddStub != nil { + fake.AddStub(delta) + } +} + +func (fake *MetricsCounter) AddCallCount() int { + fake.addMutex.RLock() + defer fake.addMutex.RUnlock() + return len(fake.addArgsForCall) +} + +func (fake *MetricsCounter) AddArgsForCall(i int) float64 { + fake.addMutex.RLock() + defer fake.addMutex.RUnlock() + return fake.addArgsForCall[i].delta +} + +func (fake *MetricsCounter) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.withMutex.RLock() + defer fake.withMutex.RUnlock() + fake.addMutex.RLock() + defer fake.addMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *MetricsCounter) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/orderer/common/broadcast/mock/metrics_histogram.go b/orderer/common/broadcast/mock/metrics_histogram.go new file mode 100644 index 00000000000..1c1884c50d0 --- /dev/null +++ b/orderer/common/broadcast/mock/metrics_histogram.go @@ -0,0 +1,127 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mock + +import ( + "sync" + + "github.com/hyperledger/fabric/common/metrics" +) + +type MetricsHistogram struct { + WithStub func(labelValues ...string) metrics.Histogram + withMutex sync.RWMutex + withArgsForCall []struct { + labelValues []string + } + withReturns struct { + result1 metrics.Histogram + } + withReturnsOnCall map[int]struct { + result1 metrics.Histogram + } + ObserveStub func(value float64) + observeMutex sync.RWMutex + observeArgsForCall []struct { + value float64 + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *MetricsHistogram) With(labelValues ...string) metrics.Histogram { + fake.withMutex.Lock() + ret, specificReturn := fake.withReturnsOnCall[len(fake.withArgsForCall)] + fake.withArgsForCall = append(fake.withArgsForCall, struct { + labelValues []string + }{labelValues}) + fake.recordInvocation("With", []interface{}{labelValues}) + fake.withMutex.Unlock() + if fake.WithStub != nil { + return fake.WithStub(labelValues...) + } + if specificReturn { + return ret.result1 + } + return fake.withReturns.result1 +} + +func (fake *MetricsHistogram) WithCallCount() int { + fake.withMutex.RLock() + defer fake.withMutex.RUnlock() + return len(fake.withArgsForCall) +} + +func (fake *MetricsHistogram) WithArgsForCall(i int) []string { + fake.withMutex.RLock() + defer fake.withMutex.RUnlock() + return fake.withArgsForCall[i].labelValues +} + +func (fake *MetricsHistogram) WithReturns(result1 metrics.Histogram) { + fake.WithStub = nil + fake.withReturns = struct { + result1 metrics.Histogram + }{result1} +} + +func (fake *MetricsHistogram) WithReturnsOnCall(i int, result1 metrics.Histogram) { + fake.WithStub = nil + if fake.withReturnsOnCall == nil { + fake.withReturnsOnCall = make(map[int]struct { + result1 metrics.Histogram + }) + } + fake.withReturnsOnCall[i] = struct { + result1 metrics.Histogram + }{result1} +} + +func (fake *MetricsHistogram) Observe(value float64) { + fake.observeMutex.Lock() + fake.observeArgsForCall = append(fake.observeArgsForCall, struct { + value float64 + }{value}) + fake.recordInvocation("Observe", []interface{}{value}) + fake.observeMutex.Unlock() + if fake.ObserveStub != nil { + fake.ObserveStub(value) + } +} + +func (fake *MetricsHistogram) ObserveCallCount() int { + fake.observeMutex.RLock() + defer fake.observeMutex.RUnlock() + return len(fake.observeArgsForCall) +} + +func (fake *MetricsHistogram) ObserveArgsForCall(i int) float64 { + fake.observeMutex.RLock() + defer fake.observeMutex.RUnlock() + return fake.observeArgsForCall[i].value +} + +func (fake *MetricsHistogram) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.withMutex.RLock() + defer fake.withMutex.RUnlock() + fake.observeMutex.RLock() + defer fake.observeMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *MetricsHistogram) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/orderer/common/broadcast/mock/metrics_provider.go b/orderer/common/broadcast/mock/metrics_provider.go new file mode 100644 index 00000000000..6c0eafa95b4 --- /dev/null +++ b/orderer/common/broadcast/mock/metrics_provider.go @@ -0,0 +1,218 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mock + +import ( + "sync" + + "github.com/hyperledger/fabric/common/metrics" +) + +type MetricsProvider struct { + NewCounterStub func(metrics.CounterOpts) metrics.Counter + newCounterMutex sync.RWMutex + newCounterArgsForCall []struct { + arg1 metrics.CounterOpts + } + newCounterReturns struct { + result1 metrics.Counter + } + newCounterReturnsOnCall map[int]struct { + result1 metrics.Counter + } + NewGaugeStub func(metrics.GaugeOpts) metrics.Gauge + newGaugeMutex sync.RWMutex + newGaugeArgsForCall []struct { + arg1 metrics.GaugeOpts + } + newGaugeReturns struct { + result1 metrics.Gauge + } + newGaugeReturnsOnCall map[int]struct { + result1 metrics.Gauge + } + NewHistogramStub func(metrics.HistogramOpts) metrics.Histogram + newHistogramMutex sync.RWMutex + newHistogramArgsForCall []struct { + arg1 metrics.HistogramOpts + } + newHistogramReturns struct { + result1 metrics.Histogram + } + newHistogramReturnsOnCall map[int]struct { + result1 metrics.Histogram + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *MetricsProvider) NewCounter(arg1 metrics.CounterOpts) metrics.Counter { + fake.newCounterMutex.Lock() + ret, specificReturn := fake.newCounterReturnsOnCall[len(fake.newCounterArgsForCall)] + fake.newCounterArgsForCall = append(fake.newCounterArgsForCall, struct { + arg1 metrics.CounterOpts + }{arg1}) + fake.recordInvocation("NewCounter", []interface{}{arg1}) + fake.newCounterMutex.Unlock() + if fake.NewCounterStub != nil { + return fake.NewCounterStub(arg1) + } + if specificReturn { + return ret.result1 + } + return fake.newCounterReturns.result1 +} + +func (fake *MetricsProvider) NewCounterCallCount() int { + fake.newCounterMutex.RLock() + defer fake.newCounterMutex.RUnlock() + return len(fake.newCounterArgsForCall) +} + +func (fake *MetricsProvider) NewCounterArgsForCall(i int) metrics.CounterOpts { + fake.newCounterMutex.RLock() + defer fake.newCounterMutex.RUnlock() + return fake.newCounterArgsForCall[i].arg1 +} + +func (fake *MetricsProvider) NewCounterReturns(result1 metrics.Counter) { + fake.NewCounterStub = nil + fake.newCounterReturns = struct { + result1 metrics.Counter + }{result1} +} + +func (fake *MetricsProvider) NewCounterReturnsOnCall(i int, result1 metrics.Counter) { + fake.NewCounterStub = nil + if fake.newCounterReturnsOnCall == nil { + fake.newCounterReturnsOnCall = make(map[int]struct { + result1 metrics.Counter + }) + } + fake.newCounterReturnsOnCall[i] = struct { + result1 metrics.Counter + }{result1} +} + +func (fake *MetricsProvider) NewGauge(arg1 metrics.GaugeOpts) metrics.Gauge { + fake.newGaugeMutex.Lock() + ret, specificReturn := fake.newGaugeReturnsOnCall[len(fake.newGaugeArgsForCall)] + fake.newGaugeArgsForCall = append(fake.newGaugeArgsForCall, struct { + arg1 metrics.GaugeOpts + }{arg1}) + fake.recordInvocation("NewGauge", []interface{}{arg1}) + fake.newGaugeMutex.Unlock() + if fake.NewGaugeStub != nil { + return fake.NewGaugeStub(arg1) + } + if specificReturn { + return ret.result1 + } + return fake.newGaugeReturns.result1 +} + +func (fake *MetricsProvider) NewGaugeCallCount() int { + fake.newGaugeMutex.RLock() + defer fake.newGaugeMutex.RUnlock() + return len(fake.newGaugeArgsForCall) +} + +func (fake *MetricsProvider) NewGaugeArgsForCall(i int) metrics.GaugeOpts { + fake.newGaugeMutex.RLock() + defer fake.newGaugeMutex.RUnlock() + return fake.newGaugeArgsForCall[i].arg1 +} + +func (fake *MetricsProvider) NewGaugeReturns(result1 metrics.Gauge) { + fake.NewGaugeStub = nil + fake.newGaugeReturns = struct { + result1 metrics.Gauge + }{result1} +} + +func (fake *MetricsProvider) NewGaugeReturnsOnCall(i int, result1 metrics.Gauge) { + fake.NewGaugeStub = nil + if fake.newGaugeReturnsOnCall == nil { + fake.newGaugeReturnsOnCall = make(map[int]struct { + result1 metrics.Gauge + }) + } + fake.newGaugeReturnsOnCall[i] = struct { + result1 metrics.Gauge + }{result1} +} + +func (fake *MetricsProvider) NewHistogram(arg1 metrics.HistogramOpts) metrics.Histogram { + fake.newHistogramMutex.Lock() + ret, specificReturn := fake.newHistogramReturnsOnCall[len(fake.newHistogramArgsForCall)] + fake.newHistogramArgsForCall = append(fake.newHistogramArgsForCall, struct { + arg1 metrics.HistogramOpts + }{arg1}) + fake.recordInvocation("NewHistogram", []interface{}{arg1}) + fake.newHistogramMutex.Unlock() + if fake.NewHistogramStub != nil { + return fake.NewHistogramStub(arg1) + } + if specificReturn { + return ret.result1 + } + return fake.newHistogramReturns.result1 +} + +func (fake *MetricsProvider) NewHistogramCallCount() int { + fake.newHistogramMutex.RLock() + defer fake.newHistogramMutex.RUnlock() + return len(fake.newHistogramArgsForCall) +} + +func (fake *MetricsProvider) NewHistogramArgsForCall(i int) metrics.HistogramOpts { + fake.newHistogramMutex.RLock() + defer fake.newHistogramMutex.RUnlock() + return fake.newHistogramArgsForCall[i].arg1 +} + +func (fake *MetricsProvider) NewHistogramReturns(result1 metrics.Histogram) { + fake.NewHistogramStub = nil + fake.newHistogramReturns = struct { + result1 metrics.Histogram + }{result1} +} + +func (fake *MetricsProvider) NewHistogramReturnsOnCall(i int, result1 metrics.Histogram) { + fake.NewHistogramStub = nil + if fake.newHistogramReturnsOnCall == nil { + fake.newHistogramReturnsOnCall = make(map[int]struct { + result1 metrics.Histogram + }) + } + fake.newHistogramReturnsOnCall[i] = struct { + result1 metrics.Histogram + }{result1} +} + +func (fake *MetricsProvider) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.newCounterMutex.RLock() + defer fake.newCounterMutex.RUnlock() + fake.newGaugeMutex.RLock() + defer fake.newGaugeMutex.RUnlock() + fake.newHistogramMutex.RLock() + defer fake.newHistogramMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *MetricsProvider) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/orderer/common/server/main.go b/orderer/common/server/main.go index 18c37aaac11..0e5b6946c05 100644 --- a/orderer/common/server/main.go +++ b/orderer/common/server/main.go @@ -128,7 +128,7 @@ func Start(cmd string, conf *localconfig.TopLevel) { manager := initializeMultichannelRegistrar(clusterType, clusterDialer, serverConfig, grpcServer, conf, signer, tlsCallback) mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert - server := NewServer(manager, signer, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS) + server := NewServer(manager, metricsProvider, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS) switch cmd { case start.FullCommand(): // "start" command diff --git a/orderer/common/server/server.go b/orderer/common/server/server.go index 1b1d43cc076..0d6aa95296e 100644 --- a/orderer/common/server/server.go +++ b/orderer/common/server/server.go @@ -14,8 +14,8 @@ import ( "time" "github.com/golang/protobuf/proto" - "github.com/hyperledger/fabric/common/crypto" "github.com/hyperledger/fabric/common/deliver" + "github.com/hyperledger/fabric/common/metrics" "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/orderer/common/broadcast" localconfig "github.com/hyperledger/fabric/orderer/common/localconfig" @@ -72,10 +72,13 @@ func (rs *responseSender) SendBlockResponse(block *cb.Block) error { } // NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Reader -func NewServer(r *multichannel.Registrar, _ crypto.LocalSigner, debug *localconfig.Debug, timeWindow time.Duration, mutualTLS bool) ab.AtomicBroadcastServer { +func NewServer(r *multichannel.Registrar, metricsProvider metrics.Provider, debug *localconfig.Debug, timeWindow time.Duration, mutualTLS bool) ab.AtomicBroadcastServer { s := &server{ - dh: deliver.NewHandler(deliverSupport{Registrar: r}, timeWindow, mutualTLS), - bh: broadcast.NewHandlerImpl(broadcastSupport{Registrar: r}), + dh: deliver.NewHandler(deliverSupport{Registrar: r}, timeWindow, mutualTLS), + bh: &broadcast.Handler{ + SupportRegistrar: broadcastSupport{Registrar: r}, + Metrics: broadcast.NewMetrics(metricsProvider), + }, debug: debug, Registrar: r, }