From a14912f2c2aa10bbda228dfaae5e0a074fc38ffa Mon Sep 17 00:00:00 2001 From: Jay Guo Date: Wed, 31 May 2017 17:01:32 +0800 Subject: [PATCH] [FAB-4201] Fix error validation in deliver This patch fixes several issues in error validation of deliver: 1. EOF is now caught as normal hangup of gRPC connection, hence exit without returning error. 2. Error of inverse seek position was not caught, which results in undesired behavior. Now it returns BAD_REQUEST status. 3. Some errors were not logged. 4. Explicit logging level checks are removed, i.e. `isEnabledFor` This was intended to improve the performance by reducing memory allocations caused by passing in string arguments. However, the performance gain is not significant (~ns) and we prefer cleaner code. On the other hand, common logic in tests are extracted into a reusable function. Change-Id: Ib2ff52cd1d9ef767f0918728084ec31c075cc38a Signed-off-by: Jay Guo --- orderer/common/deliver/deliver.go | 64 +++++++++--------- orderer/common/deliver/deliver_test.go | 92 ++++++++++++++------------ 2 files changed, 80 insertions(+), 76 deletions(-) diff --git a/orderer/common/deliver/deliver.go b/orderer/common/deliver/deliver.go index fd12c6791b0..8a332c9f14a 100644 --- a/orderer/common/deliver/deliver.go +++ b/orderer/common/deliver/deliver.go @@ -17,6 +17,8 @@ limitations under the License. package deliver import ( + "io" + "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/orderer/common/filter" "github.com/hyperledger/fabric/orderer/common/sigfilter" @@ -66,70 +68,60 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { for { logger.Debugf("Attempting to read seek info message") envelope, err := srv.Recv() + if err == io.EOF { + logger.Debugf("Received EOF, hangup") + return nil + } + if err != nil { - if logger.IsEnabledFor(logging.WARNING) { - logger.Warningf("Error reading from stream: %s", err) - } + logger.Warningf("Error reading from stream: %s", err) return err } - payload := &cb.Payload{} - if err = proto.Unmarshal(envelope.Payload, payload); err != nil { - if logger.IsEnabledFor(logging.WARNING) { - logger.Warningf("Received an envelope with no payload: %s", err) - } + + payload, err := utils.UnmarshalPayload(envelope.Payload) + if err != nil { + logger.Warningf("Received an envelope with no payload: %s", err) return sendStatusReply(srv, cb.Status_BAD_REQUEST) } if payload.Header == nil { - if logger.IsEnabledFor(logging.WARNING) { - logger.Warningf("Malformed envelope received with bad header") - } + logger.Warningf("Malformed envelope received with bad header") return sendStatusReply(srv, cb.Status_BAD_REQUEST) } chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) if err != nil { - logger.Error(err) - return err + logger.Warningf("Failed to unmarshal channel header: %s", err) + return sendStatusReply(srv, cb.Status_BAD_REQUEST) } chain, ok := ds.sm.GetChain(chdr.ChannelId) if !ok { // Note, we log this at DEBUG because SDKs will poll waiting for channels to be created // So we would expect our log to be somewhat flooded with these - if logger.IsEnabledFor(logging.DEBUG) { - logger.Debugf("Client request for channel %s not found", chdr.ChannelId) - } + logger.Debugf("Client request for channel %s not found", chdr.ChannelId) return sendStatusReply(srv, cb.Status_NOT_FOUND) } sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager()) result, _ := sf.Apply(envelope) if result != filter.Forward { - if logger.IsEnabledFor(logging.WARNING) { - logger.Warningf("Received unauthorized deliver request for channel %s", chdr.ChannelId) - } + logger.Warningf("Received unauthorized deliver request for channel %s", chdr.ChannelId) return sendStatusReply(srv, cb.Status_FORBIDDEN) } seekInfo := &ab.SeekInfo{} if err = proto.Unmarshal(payload.Data, seekInfo); err != nil { - if logger.IsEnabledFor(logging.WARNING) { - logger.Warningf("Received a signed deliver request with malformed seekInfo payload: %s", err) - } + logger.Warningf("Received a signed deliver request with malformed seekInfo payload: %s", err) return sendStatusReply(srv, cb.Status_BAD_REQUEST) } if seekInfo.Start == nil || seekInfo.Stop == nil { - if logger.IsEnabledFor(logging.WARNING) { - logger.Warningf("Received seekInfo message with missing start or stop %v, %v", seekInfo.Start, seekInfo.Stop) - } + logger.Warningf("Received seekInfo message with missing start or stop %v, %v", seekInfo.Start, seekInfo.Stop) return sendStatusReply(srv, cb.Status_BAD_REQUEST) } - if logger.IsEnabledFor(logging.DEBUG) { - logger.Debugf("Received seekInfo (%p) %v for chain %s", seekInfo, seekInfo, chdr.ChannelId) - } + logger.Debugf("Received seekInfo (%p) %v for chain %s", seekInfo, seekInfo, chdr.ChannelId) cursor, number := chain.Reader().Iterator(seekInfo.Start) var stopNum uint64 @@ -140,6 +132,10 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { stopNum = chain.Reader().Height() - 1 case *ab.SeekPosition_Specified: stopNum = stop.Specified.Number + if stopNum < number { + logger.Warningf("Received invalid seekInfo message where start number %d is greater than stop number %d", number, stopNum) + return sendStatusReply(srv, cb.Status_BAD_REQUEST) + } } for { @@ -159,10 +155,10 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { return sendStatusReply(srv, status) } - if logger.IsEnabledFor(logging.DEBUG) { - logger.Debugf("Delivering block for (%p) channel: %s", seekInfo, chdr.ChannelId) - } + logger.Debugf("Delivering block for (%p) channel: %s", seekInfo, chdr.ChannelId) + if err := sendBlockReply(srv, block); err != nil { + logger.Warningf("Error sending to stream: %s", err) return err } @@ -172,11 +168,11 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { } if err := sendStatusReply(srv, cb.Status_SUCCESS); err != nil { + logger.Warningf("Error sending to stream: %s", err) return err } - if logger.IsEnabledFor(logging.DEBUG) { - logger.Debugf("Done delivering for (%p), waiting for new SeekInfo", seekInfo) - } + + logger.Debugf("Done delivering for (%p), waiting for new SeekInfo", seekInfo) } } diff --git a/orderer/common/deliver/deliver_test.go b/orderer/common/deliver/deliver_test.go index 03e6684ce8d..0d993ba5da7 100644 --- a/orderer/common/deliver/deliver_test.go +++ b/orderer/common/deliver/deliver_test.go @@ -18,6 +18,7 @@ package deliver import ( "fmt" + "io" "testing" "time" @@ -30,6 +31,7 @@ import ( ab "github.com/hyperledger/fabric/protos/orderer" "github.com/hyperledger/fabric/protos/utils" logging "github.com/op/go-logging" + "github.com/stretchr/testify/assert" "google.golang.org/grpc" ) @@ -64,7 +66,7 @@ func (m *mockD) Send(br *ab.DeliverResponse) error { func (m *mockD) Recv() (*cb.Envelope, error) { msg, ok := <-m.recvChan if !ok { - return msg, fmt.Errorf("Channel closed") + return msg, io.EOF } return msg, nil } @@ -98,6 +100,16 @@ func NewRAMLedger() ledger.ReadWriter { return rl } +func initializeDeliverHandler() Handler { + mm := newMockMultichainManager() + for i := 1; i < ledgerSize; i++ { + l := mm.chains[systemChainID].ledger + l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) + } + + return NewHandlerImpl(mm) +} + func newMockMultichainManager() *mockSupportManager { rl := NewRAMLedger() mm := &mockSupportManager{ @@ -132,16 +144,10 @@ func makeSeek(chainID string, seekInfo *ab.SeekInfo) *cb.Envelope { } func TestOldestSeek(t *testing.T) { - mm := newMockMultichainManager() - for i := 1; i < ledgerSize; i++ { - l := mm.chains[systemChainID].ledger - l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm) + ds := initializeDeliverHandler() go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekOldest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -170,16 +176,10 @@ func TestOldestSeek(t *testing.T) { } func TestNewestSeek(t *testing.T) { - mm := newMockMultichainManager() - for i := 1; i < ledgerSize; i++ { - l := mm.chains[systemChainID].ledger - l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm) + ds := initializeDeliverHandler() go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -187,10 +187,7 @@ func TestNewestSeek(t *testing.T) { select { case deliverReply := <-m.sendChan: if deliverReply.GetBlock() == nil { - if deliverReply.GetStatus() != cb.Status_SUCCESS { - t.Fatalf("Received an error on the reply channel") - } - return + t.Fatalf("Received an error on the reply channel") } if deliverReply.GetBlock().Header.Number != uint64(ledgerSize-1) { t.Fatalf("Expected only the most recent block") @@ -201,20 +198,14 @@ func TestNewestSeek(t *testing.T) { } func TestSpecificSeek(t *testing.T) { - mm := newMockMultichainManager() - for i := 1; i < ledgerSize; i++ { - l := mm.chains[systemChainID].ledger - l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm) - specifiedStart := uint64(3) - specifiedStop := uint64(7) + ds := initializeDeliverHandler() go ds.Handle(m) + specifiedStart := uint64(3) + specifiedStop := uint64(7) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(specifiedStart), Stop: seekSpecified(specifiedStop), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) count := uint64(0) @@ -264,16 +255,10 @@ func TestUnauthorizedSeek(t *testing.T) { } func TestBadSeek(t *testing.T) { - mm := newMockMultichainManager() - for i := 1; i < ledgerSize; i++ { - l := mm.chains[systemChainID].ledger - l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm) + ds := initializeDeliverHandler() go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(3 * ledgerSize)), Stop: seekSpecified(uint64(3 * ledgerSize)), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -289,16 +274,10 @@ func TestBadSeek(t *testing.T) { } func TestFailFastSeek(t *testing.T) { - mm := newMockMultichainManager() - for i := 1; i < ledgerSize; i++ { - l := mm.chains[systemChainID].ledger - l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm) + ds := initializeDeliverHandler() go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(ledgerSize - 1)), Stop: seekSpecified(ledgerSize), Behavior: ab.SeekInfo_FAIL_IF_NOT_READY}) @@ -373,3 +352,32 @@ func TestBlockingSeek(t *testing.T) { t.Fatalf("Timed out waiting to get all blocks") } } + +func TestSGracefulShutdown(t *testing.T) { + m := newMockD() + ds := NewHandlerImpl(nil) + + close(m.recvChan) + assert.NoError(t, ds.Handle(m), "Expected no error for hangup") +} + +func TestReversedSeqSeek(t *testing.T) { + m := newMockD() + defer close(m.recvChan) + + ds := initializeDeliverHandler() + go ds.Handle(m) + + specifiedStart := uint64(7) + specifiedStop := uint64(3) + m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(specifiedStart), Stop: seekSpecified(specifiedStop), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) + + select { + case deliverReply := <-m.sendChan: + if deliverReply.GetStatus() != cb.Status_BAD_REQUEST { + t.Fatalf("Received wrong error on the reply channel") + } + case <-time.After(time.Second): + t.Fatalf("Timed out waiting to get all blocks") + } +}