Skip to content

Commit

Permalink
[FAB-5802] Add logging of client ip to orderer
Browse files Browse the repository at this point in the history
This commit adds logging of client ip address to the orderer,
for enhanced user experience when reading logs.

Change-Id: I5704f3af008f7c493ca27e92d87a838bc3c8b1e2
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Aug 16, 2017
1 parent 140edf7 commit 13724f7
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 43 deletions.
26 changes: 14 additions & 12 deletions orderer/common/broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"

"github.com/hyperledger/fabric/orderer/common/util"
"github.com/op/go-logging"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -63,61 +64,62 @@ func NewHandlerImpl(sm ChannelSupportRegistrar) Handler {

// Handle starts a service thread for a given gRPC connection and services the broadcast connection
func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
logger.Debugf("Starting new broadcast loop")
addr := util.ExtractRemoteAddress(srv.Context())
logger.Debugf("Starting new broadcast loop for %s", addr)
for {
msg, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF, hangup")
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
logger.Warningf("Error reading from stream: %s", err)
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}

chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg)
if err != nil {
logger.Warningf("[channel: %s] Could not get message processor: %s", chdr.ChannelId, err)
logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR, Info: err.Error()})
}

if !isConfig {
logger.Debugf("[channel: %s] Broadcast is processing normal message with txid '%s' of type %s", chdr.ChannelId, chdr.TxId, cb.HeaderType_name[chdr.Type])
logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type])

configSeq, err := processor.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message because of error: %s", chdr.ChannelId, err)
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()})
}

err = processor.Order(msg, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, err)
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()})
}
} else { // isConfig
logger.Debugf("[channel: %s] Broadcast is processing config update message", chdr.ChannelId)
logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)

config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message because of error: %s", chdr.ChannelId, err)
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()})
}

err = processor.Configure(msg, config, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, err)
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()})
}
}

if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type])
logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr)
}

err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
if err != nil {
logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err)
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
return err
}
}
Expand Down
19 changes: 16 additions & 3 deletions orderer/common/broadcast/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,28 @@ import (
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"

logging "github.com/op/go-logging"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
)

func init() {
logging.SetLevel(logging.DEBUG, "")
}

type mockB struct {
type mockStream struct {
grpc.ServerStream
}

func (mockStream) Context() context.Context {
return peer.NewContext(context.Background(), &peer.Peer{})
}

type mockB struct {
mockStream
recvChan chan *cb.Envelope
sendChan chan *ab.BroadcastResponse
}
Expand Down Expand Up @@ -56,6 +65,10 @@ type erroneousRecvMockB struct {
grpc.ServerStream
}

func (m *erroneousRecvMockB) Context() context.Context {
return peer.NewContext(context.Background(), &peer.Peer{})
}

func (m *erroneousRecvMockB) Send(br *ab.BroadcastResponse) error {
return nil
}
Expand All @@ -67,7 +80,7 @@ func (m *erroneousRecvMockB) Recv() (*cb.Envelope, error) {
}

type erroneousSendMockB struct {
grpc.ServerStream
mockStream
recvVal *cb.Envelope
}

Expand Down
46 changes: 24 additions & 22 deletions orderer/common/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/op/go-logging"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/orderer/common/util"
"github.com/hyperledger/fabric/protos/utils"
)

Expand Down Expand Up @@ -69,59 +70,60 @@ func NewHandlerImpl(sm SupportManager) Handler {
}

func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
logger.Debugf("Starting new deliver loop")
addr := util.ExtractRemoteAddress(srv.Context())
logger.Debugf("Starting new deliver loop for %s", addr)
for {
logger.Debugf("Attempting to read seek info message")
logger.Debugf("Attempting to read seek info message from %s", addr)
envelope, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF, hangup")
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}

if err != nil {
logger.Warningf("Error reading from stream: %s", err)
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}

if err := ds.deliverBlocks(srv, envelope); err != nil {
return err
}

logger.Debugf("Waiting for new SeekInfo")
logger.Debugf("Waiting for new SeekInfo from %s", addr)
}
}

func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, envelope *cb.Envelope) error {

addr := util.ExtractRemoteAddress(srv.Context())
payload, err := utils.UnmarshalPayload(envelope.Payload)
if err != nil {
logger.Warningf("Received an envelope with no payload: %s", err)
logger.Warningf("Received an envelope from %s with no payload: %s", addr, err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

if payload.Header == nil {
logger.Warningf("Malformed envelope received with bad header")
logger.Warningf("Malformed envelope received from %s with bad header", addr)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Warningf("Failed to unmarshal channel header: %s", err)
logger.Warningf("Failed to unmarshal channel header from %s: %s", addr, err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

chain, ok := ds.sm.GetChain(chdr.ChannelId)
if !ok {
// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
// So we would expect our log to be somewhat flooded with these
logger.Debugf("Rejecting deliver because channel %s not found", chdr.ChannelId)
logger.Debugf("Rejecting deliver for %s because channel %s not found", addr, chdr.ChannelId)
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}

erroredChan := chain.Errored()
select {
case <-erroredChan:
logger.Warningf("[channel: %s] Rejecting deliver request because of consenter error", chdr.ChannelId)
logger.Warningf("[channel: %s] Rejecting deliver request for %s because of consenter error", chdr.ChannelId, addr)
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
default:

Expand All @@ -131,22 +133,22 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env

sf := msgprocessor.NewSigFilter(policies.ChannelReaders, chain.PolicyManager())
if err := sf.Apply(envelope); err != nil {
logger.Warningf("[channel: %s] Received unauthorized deliver request: %s", chdr.ChannelId, err)
logger.Warningf("[channel: %s] Received unauthorized deliver request from %s: %s", chdr.ChannelId, addr, err)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}

seekInfo := &ab.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
logger.Warningf("[channel: %s] Received a signed deliver request with malformed seekInfo payload: %s", chdr.ChannelId, err)
logger.Warningf("[channel: %s] Received a signed deliver request from %s with malformed seekInfo payload: %s", chdr.ChannelId, addr, err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

if seekInfo.Start == nil || seekInfo.Stop == nil {
logger.Warningf("[channel: %s] Received seekInfo message with missing start or stop %v, %v", chdr.ChannelId, seekInfo.Start, seekInfo.Stop)
logger.Warningf("[channel: %s] Received seekInfo message from %s with missing start or stop %v, %v", chdr.ChannelId, addr, seekInfo.Start, seekInfo.Stop)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

logger.Debugf("[channel: %s] Received seekInfo (%p) %v", chdr.ChannelId, seekInfo, seekInfo)
logger.Debugf("[channel: %s] Received seekInfo (%p) %v from %s", chdr.ChannelId, seekInfo, seekInfo, addr)

cursor, number := chain.Reader().Iterator(seekInfo.Start)
defer cursor.Close()
Expand All @@ -159,7 +161,7 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
case *ab.SeekPosition_Specified:
stopNum = stop.Specified.Number
if stopNum < number {
logger.Warningf("[channel: %s] Received invalid seekInfo message: start number %d greater than stop number %d", chdr.ChannelId, number, stopNum)
logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
}
Expand All @@ -168,7 +170,7 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY {
select {
case <-erroredChan:
logger.Warningf("[channel: %s] Aborting deliver request because of consenter error", chdr.ChannelId)
logger.Warningf("[channel: %s] Aborting deliver for request because of consenter error", chdr.ChannelId, addr)
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
case <-cursor.ReadyChan():
}
Expand All @@ -184,7 +186,7 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
if currentConfigSequence > lastConfigSequence {
lastConfigSequence = currentConfigSequence
if err := sf.Apply(envelope); err != nil {
logger.Warningf("[channel: %s] Client authorization revoked for deliver request: %s", chdr.ChannelId, err)
logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}
}
Expand All @@ -195,10 +197,10 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
return sendStatusReply(srv, status)
}

logger.Debugf("[channel: %s] Delivering block for (%p)", chdr.ChannelId, seekInfo)
logger.Debugf("[channel: %s] Delivering block for (%p) for %s", chdr.ChannelId, seekInfo, addr)

if err := sendBlockReply(srv, block); err != nil {
logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err)
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
return err
}

Expand All @@ -208,11 +210,11 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
}

if err := sendStatusReply(srv, cb.Status_SUCCESS); err != nil {
logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err)
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
return err
}

logger.Debugf("[channel: %s] Done delivering for (%p)", chdr.ChannelId, seekInfo)
logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo)

return nil

Expand Down
16 changes: 13 additions & 3 deletions orderer/common/deliver/deliver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import (
"github.com/hyperledger/fabric/protos/utils"
logging "github.com/op/go-logging"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
)

var genesisBlock = cb.NewBlock(0, nil)
Expand All @@ -45,8 +47,16 @@ func init() {
logging.SetLevel(logging.DEBUG, "")
}

type mockD struct {
type mockStream struct {
grpc.ServerStream
}

func (mockStream) Context() context.Context {
return peer.NewContext(context.Background(), &peer.Peer{})
}

type mockD struct {
mockStream
recvChan chan *cb.Envelope
sendChan chan *ab.DeliverResponse
}
Expand All @@ -72,7 +82,7 @@ func (m *mockD) Recv() (*cb.Envelope, error) {
}

type erroneousRecvMockD struct {
grpc.ServerStream
mockStream
}

func (m *erroneousRecvMockD) Send(br *ab.DeliverResponse) error {
Expand All @@ -86,7 +96,7 @@ func (m *erroneousRecvMockD) Recv() (*cb.Envelope, error) {
}

type erroneousSendMockD struct {
grpc.ServerStream
mockStream
recvVal *cb.Envelope
}

Expand Down
10 changes: 10 additions & 0 deletions orderer/common/performance/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/op/go-logging"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
)

var logger = logging.MustGetLogger("orderer/performance")
Expand Down Expand Up @@ -119,6 +121,10 @@ type BroadcastClient struct {
errChan chan error
}

func (BroadcastClient) Context() context.Context {
return peer.NewContext(context.Background(), &peer.Peer{})
}

// SendRequest sends an envelope to `broadcast` API synchronously
func (bc *BroadcastClient) SendRequest(request *cb.Envelope) {
// TODO make this async
Expand Down Expand Up @@ -177,6 +183,10 @@ type DeliverClient struct {
ResultChan chan error
}

func (DeliverClient) Context() context.Context {
return peer.NewContext(context.Background(), &peer.Peer{})
}

// SendRequest sends an envelope to `deliver` API synchronously
func (bc *DeliverClient) SendRequest(request *cb.Envelope) {
// TODO make this async
Expand Down
5 changes: 2 additions & 3 deletions orderer/common/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ import (
"runtime/debug"
"time"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/orderer/common/broadcast"
"github.com/hyperledger/fabric/orderer/common/deliver"
localconfig "github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/multichannel"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"

"github.com/golang/protobuf/proto"
)

type broadcastSupport struct {
Expand All @@ -47,7 +46,7 @@ type server struct {
}

// NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Reader
func NewServer(r *multichannel.Registrar, signer crypto.LocalSigner, debug *localconfig.Debug) ab.AtomicBroadcastServer {
func NewServer(r *multichannel.Registrar, _ crypto.LocalSigner, debug *localconfig.Debug) ab.AtomicBroadcastServer {
s := &server{
dh: deliver.NewHandlerImpl(deliverSupport{Registrar: r}),
bh: broadcast.NewHandlerImpl(broadcastSupport{Registrar: r}),
Expand Down
Loading

0 comments on commit 13724f7

Please sign in to comment.