Skip to content

Commit

Permalink
FAB-9568 Add metrics to Broadcast
Browse files Browse the repository at this point in the history
This CR introduces three metrics for the broadcast service.  The
validation duration, enqueue duration, and processed count.

The validation duration is the amount of time it takes to conclude a
message is invalid or is valid and should be ordered.

The enqueue duration is the amount of time it takes to pass a valid
message to the consenter for ordering.

The processed count is the total number of transactions processed by the
broadcast interface.

Change-Id: If328931ed492933981d22d23d9b39e058fa7e907
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Nov 28, 2018
1 parent b63ba8d commit a063c8a
Show file tree
Hide file tree
Showing 11 changed files with 785 additions and 62 deletions.
4 changes: 2 additions & 2 deletions common/grpcmetrics/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func UnaryServerInterceptor(um *UnaryMetrics) grpc.UnaryServerInterceptor {

um.RequestDuration.With(
"service", service, "method", method, "code", grpc.Code(err).String(),
).Observe(float64(duration) / float64(time.Second))
).Observe(duration.Seconds())
um.RequestsCompleted.With("service", service, "method", method, "code", grpc.Code(err).String()).Add(1)

return resp, err
Expand Down Expand Up @@ -65,7 +65,7 @@ func StreamServerInterceptor(sm *StreamMetrics) grpc.StreamServerInterceptor {

sm.RequestDuration.With(
"service", service, "method", method, "code", grpc.Code(err).String(),
).Observe(float64(duration) / float64(time.Second))
).Observe(duration.Seconds())
sm.RequestsCompleted.With("service", service, "method", method, "code", grpc.Code(err).String()).Add(1)

return err
Expand Down
174 changes: 123 additions & 51 deletions orderer/common/broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package broadcast

import (
"io"
"time"

"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/util"
Expand All @@ -19,17 +20,19 @@ import (

var logger = flogging.MustGetLogger("orderer.common.broadcast")

// ChannelSupportRegistrar provides a way for the Handler to look up the Support for a channel
//go:generate counterfeiter -o mock/channel_support_registrar.go --fake-name ChannelSupportRegistrar . ChannelSupportRegistrar

// ChannelSupportRegistrar provides a way for the Handler to look up the Support for a channel
type ChannelSupportRegistrar interface {
// BroadcastChannelSupport returns the message channel header, whether the message is a config update
// and the channel resources for a message or an error if the message is not a message which can
// be processed directly (like CONFIG and ORDERER_TRANSACTION messages)
BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, ChannelSupport, error)
}

// ChannelSupport provides the backing resources needed to support broadcast on a channel
//go:generate counterfeiter -o mock/channel_support.go --fake-name ChannelSupport . ChannelSupport

// ChannelSupport provides the backing resources needed to support broadcast on a channel
type ChannelSupport interface {
msgprocessor.Processor
Consenter
Expand All @@ -53,18 +56,13 @@ type Consenter interface {
WaitReady() error
}

// Handler is designed to handle connections from Broadcast AB gRPC service
type Handler struct {
sm ChannelSupportRegistrar
SupportRegistrar ChannelSupportRegistrar
Metrics *Metrics
}

// NewHandlerImpl constructs a new implementation of the Handler interface
func NewHandlerImpl(sm ChannelSupportRegistrar) *Handler {
return &Handler{
sm: sm,
}
}

// Handle starts a service thread for a given gRPC connection and services the broadcast connection
// Handle reads requests from a Broadcast stream, processes them, and returns the responses to the stream
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
addr := util.ExtractRemoteAddress(srv.Context())
logger.Debugf("Starting new broadcast loop for %s", addr)
Expand All @@ -79,59 +77,133 @@ func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
return err
}

chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg)
resp := bh.ProcessMessage(msg, addr)
err = srv.Send(resp)
if resp.Status != cb.Status_SUCCESS {
return err
}

if err != nil {
logger.Warningf("Error sending to %s: %s", addr, err)
return err
}
}

}

type MetricsTracker struct {
ValidateStartTime time.Time
EnqueueStartTime time.Time
ValidateDuration time.Duration
ChannelID string
TxType string
Metrics *Metrics
}

func (mt *MetricsTracker) Record(resp *ab.BroadcastResponse) {
labels := []string{
"status", resp.Status.String(),
"channel", mt.ChannelID,
"type", mt.TxType,
}

if mt.ValidateDuration == 0 {
mt.EndValidate()
}
mt.Metrics.ValidateDuration.With(labels...).Observe(mt.ValidateDuration.Seconds())

if mt.EnqueueStartTime != (time.Time{}) {
enqueueDuration := time.Since(mt.EnqueueStartTime)
mt.Metrics.EnqueueDuration.With(labels...).Observe(enqueueDuration.Seconds())
}

mt.Metrics.ProcessedCount.With(labels...).Add(1)
}

func (mt *MetricsTracker) BeginValidate() {
mt.ValidateStartTime = time.Now()
}

func (mt *MetricsTracker) EndValidate() {
mt.ValidateDuration = time.Since(mt.ValidateStartTime)
}

func (mt *MetricsTracker) BeginEnqueue() {
mt.EnqueueStartTime = time.Now()
}

// ProcessMessage validates and enqueues a single message
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
tracker := &MetricsTracker{
ChannelID: "unknown",
TxType: "unknown",
Metrics: bh.Metrics,
}
defer func() {
// This looks a little unnecessary, but if done directly as
// a defer, resp gets the (always nil) current state of resp
// and not the return value
tracker.Record(resp)
}()
tracker.BeginValidate()

chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg)
if chdr != nil {
tracker.ChannelID = chdr.ChannelId
tracker.TxType = cb.HeaderType(chdr.Type).String()
}
if err != nil {
logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", tracker.ChannelID, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()}
}

if !isConfig {
logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type])

configSeq, err := processor.ProcessNormalMsg(msg)
if err != nil {
channelID := "<malformed_header>"
if chdr != nil {
channelID = chdr.ChannelId
}
logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", channelID, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()})
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}
}
tracker.EndValidate()

tracker.BeginEnqueue()
if err = processor.WaitReady(); err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()})
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}

err = processor.Order(msg, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
} else { // isConfig
logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)

if !isConfig {
logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type])

configSeq, err := processor.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()})
}

err = processor.Order(msg, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()})
}
} else { // isConfig
logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)

config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()})
}

err = processor.Configure(config, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()})
}
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}
}
tracker.EndValidate()

logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr)
tracker.BeginEnqueue()
if err = processor.WaitReady(); err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}

err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
err = processor.Configure(config, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
return err
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
}

logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr)

return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}
}

// ClassifyError converts an error type into a status code.
Expand Down
16 changes: 16 additions & 0 deletions orderer/common/broadcast/broadcast_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package broadcast_test
import (
"testing"

"github.com/hyperledger/fabric/common/metrics"
ab "github.com/hyperledger/fabric/protos/orderer"

. "github.com/onsi/ginkgo"
Expand All @@ -20,6 +21,21 @@ type abServer interface {
ab.AtomicBroadcast_BroadcastServer
}

//go:generate counterfeiter -o mock/metrics_histogram.go --fake-name MetricsHistogram . metricsHistogram
type metricsHistogram interface {
metrics.Histogram
}

//go:generate counterfeiter -o mock/metrics_counter.go --fake-name MetricsCounter . metricsCounter
type metricsCounter interface {
metrics.Counter
}

//go:generate counterfeiter -o mock/metrics_provider.go --fake-name MetricsProvider . metricsProvider
type metricsProvider interface {
metrics.Provider
}

func TestBroadcast(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Broadcast Suite")
Expand Down
83 changes: 79 additions & 4 deletions orderer/common/broadcast/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,33 @@ import (

var _ = Describe("Broadcast", func() {
var (
fakeSupportRegistrar *mock.ChannelSupportRegistrar
handler *broadcast.Handler
fakeSupportRegistrar *mock.ChannelSupportRegistrar
handler *broadcast.Handler
fakeValidateHistogram *mock.MetricsHistogram
fakeEnqueueHistogram *mock.MetricsHistogram
fakeProcessedCounter *mock.MetricsCounter
)

BeforeEach(func() {
fakeSupportRegistrar = &mock.ChannelSupportRegistrar{}
handler = broadcast.NewHandlerImpl(fakeSupportRegistrar)

fakeValidateHistogram = &mock.MetricsHistogram{}
fakeValidateHistogram.WithReturns(fakeValidateHistogram)

fakeEnqueueHistogram = &mock.MetricsHistogram{}
fakeEnqueueHistogram.WithReturns(fakeEnqueueHistogram)

fakeProcessedCounter = &mock.MetricsCounter{}
fakeProcessedCounter.WithReturns(fakeProcessedCounter)

handler = &broadcast.Handler{
SupportRegistrar: fakeSupportRegistrar,
Metrics: &broadcast.Metrics{
ValidateDuration: fakeValidateHistogram,
EnqueueDuration: fakeEnqueueHistogram,
ProcessedCount: fakeProcessedCounter,
},
}
})

Describe("Handle", func() {
Expand All @@ -52,7 +72,7 @@ var _ = Describe("Broadcast", func() {
fakeSupport.ProcessNormalMsgReturns(5, nil)

fakeSupportRegistrar.BroadcastChannelSupportReturns(&cb.ChannelHeader{
Type: 1,
Type: 3,
ChannelId: "fake-channel",
}, false, fakeSupport, nil)
})
Expand All @@ -75,6 +95,35 @@ var _ = Describe("Broadcast", func() {
Expect(orderedMsg).To(Equal(fakeMsg))
Expect(seq).To(Equal(uint64(5)))

Expect(fakeValidateHistogram.WithCallCount()).To(Equal(1))
Expect(fakeValidateHistogram.WithArgsForCall(0)).To(Equal([]string{
"status", "SUCCESS",
"channel", "fake-channel",
"type", "ENDORSER_TRANSACTION",
}))
Expect(fakeValidateHistogram.ObserveCallCount()).To(Equal(1))
Expect(fakeValidateHistogram.ObserveArgsForCall(0)).To(BeNumerically(">", 0))
Expect(fakeValidateHistogram.ObserveArgsForCall(0)).To(BeNumerically("<", 1))

Expect(fakeEnqueueHistogram.WithCallCount()).To(Equal(1))
Expect(fakeEnqueueHistogram.WithArgsForCall(0)).To(Equal([]string{
"status", "SUCCESS",
"channel", "fake-channel",
"type", "ENDORSER_TRANSACTION",
}))
Expect(fakeEnqueueHistogram.ObserveCallCount()).To(Equal(1))
Expect(fakeEnqueueHistogram.ObserveArgsForCall(0)).To(BeNumerically(">", 0))
Expect(fakeEnqueueHistogram.ObserveArgsForCall(0)).To(BeNumerically("<", 1))

Expect(fakeProcessedCounter.WithCallCount()).To(Equal(1))
Expect(fakeProcessedCounter.WithArgsForCall(0)).To(Equal([]string{
"status", "SUCCESS",
"channel", "fake-channel",
"type", "ENDORSER_TRANSACTION",
}))
Expect(fakeProcessedCounter.AddCallCount()).To(Equal(1))
Expect(fakeProcessedCounter.AddArgsForCall(0)).To(Equal(float64(1)))

Expect(fakeABServer.SendCallCount()).To(Equal(1))
Expect(proto.Equal(fakeABServer.SendArgsForCall(0), &ab.BroadcastResponse{Status: cb.Status_SUCCESS})).To(BeTrue())
})
Expand Down Expand Up @@ -106,6 +155,15 @@ var _ = Describe("Broadcast", func() {
It("does not crash", func() {
err := handler.Handle(fakeABServer)
Expect(err).NotTo(HaveOccurred())

Expect(fakeValidateHistogram.WithCallCount()).To(Equal(1))
Expect(fakeValidateHistogram.WithArgsForCall(0)).To(Equal([]string{
"status", "BAD_REQUEST",
"channel", "unknown",
"type", "unknown",
}))
Expect(fakeEnqueueHistogram.WithCallCount()).To(Equal(0))
Expect(fakeProcessedCounter.WithCallCount()).To(Equal(1))
})
})

Expand Down Expand Up @@ -254,6 +312,23 @@ var _ = Describe("Broadcast", func() {
Expect(proto.Equal(fakeABServer.SendArgsForCall(0), &ab.BroadcastResponse{Status: cb.Status_SUCCESS})).To(BeTrue())
})

Context("when the consenter is not ready for the request", func() {
BeforeEach(func() {
fakeSupport.WaitReadyReturns(fmt.Errorf("not-ready"))
})

It("returns the error to the client with a service unavailable status", func() {
err := handler.Handle(fakeABServer)
Expect(err).NotTo(HaveOccurred())

Expect(fakeABServer.SendCallCount()).To(Equal(1))
Expect(proto.Equal(
fakeABServer.SendArgsForCall(0),
&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: "not-ready"}),
).To(BeTrue())
})
})

Context("when the consenter cannot enqueue the message", func() {
BeforeEach(func() {
fakeSupport.ConfigureReturns(fmt.Errorf("consenter-error"))
Expand Down
Loading

0 comments on commit a063c8a

Please sign in to comment.