From 88cb6ccc1c355815df1133728d3e244ef16f41f2 Mon Sep 17 00:00:00 2001 From: Jason Yellick Date: Wed, 8 Mar 2017 12:12:21 -0500 Subject: [PATCH] [FAB-2691] Improve Bcst/Dlvr log serviceability https://jira.hyperledger.org/browse/FAB-2691 The orderer replies with HTTP status codes with no error text. Although in some cases this is sufficient, especially in development scenarios, the log does not provide enough information for the rejection in order to diagnose the root cause of the bad status. This CR enhances the logging of both Deliver and Broadcast to always log a verbose error if logging is enabled at debug for why a client request is being rejected. Change-Id: I4150a5c43a101d0a3d727cee1bde1aeeb8b8ada2 Signed-off-by: Jason Yellick --- orderer/common/broadcast/broadcast.go | 39 ++++++++++++++------ orderer/common/deliver/deliver.go | 52 +++++++++++++++++++-------- 2 files changed, 66 insertions(+), 25 deletions(-) diff --git a/orderer/common/broadcast/broadcast.go b/orderer/common/broadcast/broadcast.go index 72716077487..c9e5e0d0ddc 100644 --- a/orderer/common/broadcast/broadcast.go +++ b/orderer/common/broadcast/broadcast.go @@ -85,14 +85,23 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { payload := &cb.Payload{} err = proto.Unmarshal(msg.Payload, payload) - if payload.Header == nil /* || payload.Header.ChannelHeader == nil */ { - logger.Debugf("Received malformed message, dropping connection") + if err != nil { + if logger.IsEnabledFor(logging.WARNING) { + logger.Warningf("Received malformed message, dropping connection: %s", err) + } + return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) + } + + if payload.Header == nil { + logger.Warningf("Received malformed message, with missing header, dropping connection") return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) } chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) if err != nil { - logger.Debugf("Received malformed message (bad channel header), dropping connection") + if logger.IsEnabledFor(logging.WARNING) { + logger.Warningf("Received malformed message (bad channel header), dropping connection: %s", err) + } return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) } @@ -100,51 +109,59 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { logger.Debugf("Preprocessing CONFIG_UPDATE") msg, err = bh.sm.Process(msg) if err != nil { + if logger.IsEnabledFor(logging.WARNING) { + logger.Warningf("Rejecting CONFIG_UPDATE because: %s", err) + } return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) } err = proto.Unmarshal(msg.Payload, payload) - if payload.Header == nil { + if err != nil || payload.Header == nil { logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing") return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR}) } chdr, err = utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) if err != nil { - logger.Debugf("Generated bad transaction after CONFIG_UPDATE processing (bad channel header)") - return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) + logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (bad channel header): %s", err) + return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR}) } if chdr.ChannelId == "" { - logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing") + logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (empty channel ID)") return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR}) } } support, ok := bh.sm.GetChain(chdr.ChannelId) if !ok { + if logger.IsEnabledFor(logging.WARNING) { + logger.Warningf("Rejecting broadcast because channel %s was not found", chdr.ChannelId) + } return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND}) } if logger.IsEnabledFor(logging.DEBUG) { - logger.Debugf("Broadcast is filtering message for channel %s", chdr.ChannelId) + logger.Debugf("Broadcast is filtering message of type %d for channel %s", chdr.Type, chdr.ChannelId) } // Normal transaction for existing chain _, filterErr := support.Filters().Apply(msg) if filterErr != nil { - logger.Debugf("Rejecting broadcast message") + if logger.IsEnabledFor(logging.WARNING) { + logger.Warningf("Rejecting broadcast message because of filter error: %s", err) + } return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) } if !support.Enqueue(msg) { - logger.Debugf("Consenter instructed us to shut down") + logger.Infof("Consenter instructed us to shut down") return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE}) } if logger.IsEnabledFor(logging.DEBUG) { - logger.Debugf("Broadcast is successfully enqueued message for chain %s", chdr.ChannelId) + logger.Debugf("Broadcast has successfully enqueued message of type %d for chain %s", chdr.Type, chdr.ChannelId) } err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS}) diff --git a/orderer/common/deliver/deliver.go b/orderer/common/deliver/deliver.go index 5d538d17c74..d3e3f82456d 100644 --- a/orderer/common/deliver/deliver.go +++ b/orderer/common/deliver/deliver.go @@ -17,8 +17,6 @@ limitations under the License. package deliver import ( - "fmt" - "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/orderer/common/filter" "github.com/hyperledger/fabric/orderer/common/sigfilter" @@ -69,19 +67,24 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { logger.Debugf("Attempting to read seek info message") envelope, err := srv.Recv() if err != nil { - logger.Errorf("Error reading from stream: %s", err) + if logger.IsEnabledFor(logging.WARNING) { + logger.Warningf("Error reading from stream: %s", err) + } return err } payload := &cb.Payload{} if err = proto.Unmarshal(envelope.Payload, payload); err != nil { - logger.Errorf("Received an envelope with no payload: %s", err) - return err + if logger.IsEnabledFor(logging.WARNING) { + logger.Warningf("Received an envelope with no payload: %s", err) + } + return sendStatusReply(srv, cb.Status_BAD_REQUEST) } - if payload.Header == nil /* || payload.Header.ChannelHeader == nil */ { - err = fmt.Errorf("Malformed envelope received with bad header") - logger.Error(err) - return err + if payload.Header == nil { + if logger.IsEnabledFor(logging.WARNING) { + logger.Warningf("Malformed envelope received with bad header") + } + return sendStatusReply(srv, cb.Status_BAD_REQUEST) } chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) @@ -92,23 +95,40 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { chain, ok := ds.sm.GetChain(chdr.ChannelId) if !ok { + // Note, we log this at DEBUG because SDKs will poll waiting for channels to be created + // So we would expect our log to be somewhat flooded with these + if logger.IsEnabledFor(logging.DEBUG) { + logger.Debugf("Client request for channel %s not found", chdr.ChannelId) + } return sendStatusReply(srv, cb.Status_NOT_FOUND) } sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager()) result, _ := sf.Apply(envelope) if result != filter.Forward { + if logger.IsEnabledFor(logging.WARNING) { + logger.Warningf("Received unauthorized deliver request for channel %s", chdr.ChannelId) + } return sendStatusReply(srv, cb.Status_FORBIDDEN) } seekInfo := &ab.SeekInfo{} if err = proto.Unmarshal(payload.Data, seekInfo); err != nil { - logger.Errorf("Received a signed deliver request with malformed seekInfo payload: %s", err) - return err + if logger.IsEnabledFor(logging.WARNING) { + logger.Warningf("Received a signed deliver request with malformed seekInfo payload: %s", err) + } + return sendStatusReply(srv, cb.Status_BAD_REQUEST) + } + + if seekInfo.Start == nil || seekInfo.Stop == nil { + if logger.IsEnabledFor(logging.WARNING) { + logger.Warningf("Received seekInfo message with missing start or stop %v, %v", seekInfo.Start, seekInfo.Stop) + } + return sendStatusReply(srv, cb.Status_BAD_REQUEST) } if logger.IsEnabledFor(logging.DEBUG) { - logger.Debugf("Received seekInfo %v for chain %s", seekInfo, chdr.ChannelId) + logger.Debugf("Received seekInfo (%p) %v for chain %s", seekInfo, seekInfo, chdr.ChannelId) } cursor, number := chain.Reader().Iterator(seekInfo.Start) @@ -139,7 +159,9 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { return sendStatusReply(srv, status) } - logger.Debugf("Delivering block") + if logger.IsEnabledFor(logging.DEBUG) { + logger.Debugf("Delivering block for (%p) channel: %s", seekInfo, chdr.ChannelId) + } if err := sendBlockReply(srv, block); err != nil { return err } @@ -152,7 +174,9 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { if err := sendStatusReply(srv, cb.Status_SUCCESS); err != nil { return err } - logger.Debugf("Done delivering, waiting for new SeekInfo") + if logger.IsEnabledFor(logging.DEBUG) { + logger.Debugf("Done delivering for (%p), waiting for new SeekInfo", seekInfo) + } } }