Skip to content

Commit

Permalink
Merge "[FAB-2691] Improve Bcst/Dlvr log serviceability"
Browse files Browse the repository at this point in the history
  • Loading branch information
Srinivasan Muralidharan authored and Gerrit Code Review committed Mar 9, 2017
2 parents 7d900f3 + 88cb6cc commit 450b122
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 25 deletions.
39 changes: 28 additions & 11 deletions orderer/common/broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,66 +85,83 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {

payload := &cb.Payload{}
err = proto.Unmarshal(msg.Payload, payload)
if payload.Header == nil /* || payload.Header.ChannelHeader == nil */ {
logger.Debugf("Received malformed message, dropping connection")
if err != nil {
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Received malformed message, dropping connection: %s", err)
}
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

if payload.Header == nil {
logger.Warningf("Received malformed message, with missing header, dropping connection")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Debugf("Received malformed message (bad channel header), dropping connection")
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Received malformed message (bad channel header), dropping connection: %s", err)
}
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
logger.Debugf("Preprocessing CONFIG_UPDATE")
msg, err = bh.sm.Process(msg)
if err != nil {
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Rejecting CONFIG_UPDATE because: %s", err)
}
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

err = proto.Unmarshal(msg.Payload, payload)
if payload.Header == nil {
if err != nil || payload.Header == nil {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}

chdr, err = utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Debugf("Generated bad transaction after CONFIG_UPDATE processing (bad channel header)")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (bad channel header): %s", err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}

if chdr.ChannelId == "" {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing")
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (empty channel ID)")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}
}

support, ok := bh.sm.GetChain(chdr.ChannelId)
if !ok {
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Rejecting broadcast because channel %s was not found", chdr.ChannelId)
}
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
}

if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("Broadcast is filtering message for channel %s", chdr.ChannelId)
logger.Debugf("Broadcast is filtering message of type %d for channel %s", chdr.Type, chdr.ChannelId)
}

// Normal transaction for existing chain
_, filterErr := support.Filters().Apply(msg)

if filterErr != nil {
logger.Debugf("Rejecting broadcast message")
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Rejecting broadcast message because of filter error: %s", err)
}
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

if !support.Enqueue(msg) {
logger.Debugf("Consenter instructed us to shut down")
logger.Infof("Consenter instructed us to shut down")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}

if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("Broadcast is successfully enqueued message for chain %s", chdr.ChannelId)
logger.Debugf("Broadcast has successfully enqueued message of type %d for chain %s", chdr.Type, chdr.ChannelId)
}

err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
Expand Down
52 changes: 38 additions & 14 deletions orderer/common/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package deliver

import (
"fmt"

"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/sigfilter"
Expand Down Expand Up @@ -69,19 +67,24 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
logger.Debugf("Attempting to read seek info message")
envelope, err := srv.Recv()
if err != nil {
logger.Errorf("Error reading from stream: %s", err)
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Error reading from stream: %s", err)
}
return err
}
payload := &cb.Payload{}
if err = proto.Unmarshal(envelope.Payload, payload); err != nil {
logger.Errorf("Received an envelope with no payload: %s", err)
return err
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Received an envelope with no payload: %s", err)
}
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

if payload.Header == nil /* || payload.Header.ChannelHeader == nil */ {
err = fmt.Errorf("Malformed envelope received with bad header")
logger.Error(err)
return err
if payload.Header == nil {
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Malformed envelope received with bad header")
}
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
Expand All @@ -92,23 +95,40 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {

chain, ok := ds.sm.GetChain(chdr.ChannelId)
if !ok {
// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
// So we would expect our log to be somewhat flooded with these
if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("Client request for channel %s not found", chdr.ChannelId)
}
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}

sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Received unauthorized deliver request for channel %s", chdr.ChannelId)
}
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}

seekInfo := &ab.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
logger.Errorf("Received a signed deliver request with malformed seekInfo payload: %s", err)
return err
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Received a signed deliver request with malformed seekInfo payload: %s", err)
}
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

if seekInfo.Start == nil || seekInfo.Stop == nil {
if logger.IsEnabledFor(logging.WARNING) {
logger.Warningf("Received seekInfo message with missing start or stop %v, %v", seekInfo.Start, seekInfo.Stop)
}
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("Received seekInfo %v for chain %s", seekInfo, chdr.ChannelId)
logger.Debugf("Received seekInfo (%p) %v for chain %s", seekInfo, seekInfo, chdr.ChannelId)
}

cursor, number := chain.Reader().Iterator(seekInfo.Start)
Expand Down Expand Up @@ -139,7 +159,9 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
return sendStatusReply(srv, status)
}

logger.Debugf("Delivering block")
if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("Delivering block for (%p) channel: %s", seekInfo, chdr.ChannelId)
}
if err := sendBlockReply(srv, block); err != nil {
return err
}
Expand All @@ -152,7 +174,9 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
if err := sendStatusReply(srv, cb.Status_SUCCESS); err != nil {
return err
}
logger.Debugf("Done delivering, waiting for new SeekInfo")
if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("Done delivering for (%p), waiting for new SeekInfo", seekInfo)
}
}
}

Expand Down

0 comments on commit 450b122

Please sign in to comment.