Skip to content

Commit

Permalink
[FAB-2691] Improve Bcst/Dlvr log serviceability
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-2691

The orderer replies with HTTP status codes with no error text.  Although
in some cases this is sufficient, especially in development scenarios,
the log does not provide enough information for the rejection in order
to diagnose the root cause of the bad status.

This CR enhances the logging of both Deliver and Broadcast to always log
a verbose error if logging is enabled at debug for why a client request
is being rejected.

Change-Id: I4150a5c43a101d0a3d727cee1bde1aeeb8b8ada2
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Mar 8, 2017
1 parent f27a039 commit 88cb6cc
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 25 deletions.
39 changes: 28 additions & 11 deletions orderer/common/broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,66 +85,83 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {

payload := &cb.Payload{}
err = proto.Unmarshal(msg.Payload, payload)
if payload.Header == nil /* || payload.Header.ChannelHeader == nil */ {
logger.Debugf("Received malformed message, dropping connection")
if err != nil {
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Received malformed message, dropping connection: %s", err)
}
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

if payload.Header == nil {
logger.Warningf("Received malformed message, with missing header, dropping connection")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Debugf("Received malformed message (bad channel header), dropping connection")
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Received malformed message (bad channel header), dropping connection: %s", err)
}
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
logger.Debugf("Preprocessing CONFIG_UPDATE")
msg, err = bh.sm.Process(msg)
if err != nil {
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Rejecting CONFIG_UPDATE because: %s", err)
}
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

err = proto.Unmarshal(msg.Payload, payload)
if payload.Header == nil {
if err != nil || payload.Header == nil {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}

chdr, err = utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Debugf("Generated bad transaction after CONFIG_UPDATE processing (bad channel header)")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (bad channel header): %s", err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}

if chdr.ChannelId == "" {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing")
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (empty channel ID)")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}
}

support, ok := bh.sm.GetChain(chdr.ChannelId)
if !ok {
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Rejecting broadcast because channel %s was not found", chdr.ChannelId)
}
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
}

if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("Broadcast is filtering message for channel %s", chdr.ChannelId)
logger.Debugf("Broadcast is filtering message of type %d for channel %s", chdr.Type, chdr.ChannelId)
}

// Normal transaction for existing chain
_, filterErr := support.Filters().Apply(msg)

if filterErr != nil {
logger.Debugf("Rejecting broadcast message")
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Rejecting broadcast message because of filter error: %s", err)
}
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

if !support.Enqueue(msg) {
logger.Debugf("Consenter instructed us to shut down")
logger.Infof("Consenter instructed us to shut down")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}

if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("Broadcast is successfully enqueued message for chain %s", chdr.ChannelId)
logger.Debugf("Broadcast has successfully enqueued message of type %d for chain %s", chdr.Type, chdr.ChannelId)
}

err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
Expand Down
52 changes: 38 additions & 14 deletions orderer/common/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package deliver

import (
"fmt"

"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/sigfilter"
Expand Down Expand Up @@ -69,19 +67,24 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
logger.Debugf("Attempting to read seek info message")
envelope, err := srv.Recv()
if err != nil {
logger.Errorf("Error reading from stream: %s", err)
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Error reading from stream: %s", err)
}
return err
}
payload := &cb.Payload{}
if err = proto.Unmarshal(envelope.Payload, payload); err != nil {
logger.Errorf("Received an envelope with no payload: %s", err)
return err
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Received an envelope with no payload: %s", err)
}
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

if payload.Header == nil /* || payload.Header.ChannelHeader == nil */ {
err = fmt.Errorf("Malformed envelope received with bad header")
logger.Error(err)
return err
if payload.Header == nil {
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Malformed envelope received with bad header")
}
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
Expand All @@ -92,23 +95,40 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {

chain, ok := ds.sm.GetChain(chdr.ChannelId)
if !ok {
// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
// So we would expect our log to be somewhat flooded with these
if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("Client request for channel %s not found", chdr.ChannelId)
}
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}

sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Received unauthorized deliver request for channel %s", chdr.ChannelId)
}
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}

seekInfo := &ab.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
logger.Errorf("Received a signed deliver request with malformed seekInfo payload: %s", err)
return err
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Received a signed deliver request with malformed seekInfo payload: %s", err)
}
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

if seekInfo.Start == nil || seekInfo.Stop == nil {
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Received seekInfo message with missing start or stop %v, %v", seekInfo.Start, seekInfo.Stop)
}
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("Received seekInfo %v for chain %s", seekInfo, chdr.ChannelId)
logger.Debugf("Received seekInfo (%p) %v for chain %s", seekInfo, seekInfo, chdr.ChannelId)
}

cursor, number := chain.Reader().Iterator(seekInfo.Start)
Expand Down Expand Up @@ -139,7 +159,9 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
return sendStatusReply(srv, status)
}

logger.Debugf("Delivering block")
if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("Delivering block for (%p) channel: %s", seekInfo, chdr.ChannelId)
}
if err := sendBlockReply(srv, block); err != nil {
return err
}
Expand All @@ -152,7 +174,9 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
if err := sendStatusReply(srv, cb.Status_SUCCESS); err != nil {
return err
}
logger.Debugf("Done delivering, waiting for new SeekInfo")
if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("Done delivering for (%p), waiting for new SeekInfo", seekInfo)
}
}
}

Expand Down

0 comments on commit 88cb6cc

Please sign in to comment.