diff --git a/orderer/common/deliver/deliver.go b/orderer/common/deliver/deliver.go index fd12c6791b0..8a332c9f14a 100644 --- a/orderer/common/deliver/deliver.go +++ b/orderer/common/deliver/deliver.go @@ -17,6 +17,8 @@ limitations under the License. package deliver import ( + "io" + "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/orderer/common/filter" "github.com/hyperledger/fabric/orderer/common/sigfilter" @@ -66,70 +68,60 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { for { logger.Debugf("Attempting to read seek info message") envelope, err := srv.Recv() + if err == io.EOF { + logger.Debugf("Received EOF, hangup") + return nil + } + if err != nil { - if logger.IsEnabledFor(logging.WARNING) { - logger.Warningf("Error reading from stream: %s", err) - } + logger.Warningf("Error reading from stream: %s", err) return err } - payload := &cb.Payload{} - if err = proto.Unmarshal(envelope.Payload, payload); err != nil { - if logger.IsEnabledFor(logging.WARNING) { - logger.Warningf("Received an envelope with no payload: %s", err) - } + + payload, err := utils.UnmarshalPayload(envelope.Payload) + if err != nil { + logger.Warningf("Received an envelope with no payload: %s", err) return sendStatusReply(srv, cb.Status_BAD_REQUEST) } if payload.Header == nil { - if logger.IsEnabledFor(logging.WARNING) { - logger.Warningf("Malformed envelope received with bad header") - } + logger.Warningf("Malformed envelope received with bad header") return sendStatusReply(srv, cb.Status_BAD_REQUEST) } chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) if err != nil { - logger.Error(err) - return err + logger.Warningf("Failed to unmarshal channel header: %s", err) + return sendStatusReply(srv, cb.Status_BAD_REQUEST) } chain, ok := ds.sm.GetChain(chdr.ChannelId) if !ok { // Note, we log this at DEBUG because SDKs will poll waiting for channels to be created // So we would expect our log to be somewhat flooded with these - if logger.IsEnabledFor(logging.DEBUG) { - logger.Debugf("Client request for channel %s not found", chdr.ChannelId) - } + logger.Debugf("Client request for channel %s not found", chdr.ChannelId) return sendStatusReply(srv, cb.Status_NOT_FOUND) } sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager()) result, _ := sf.Apply(envelope) if result != filter.Forward { - if logger.IsEnabledFor(logging.WARNING) { - logger.Warningf("Received unauthorized deliver request for channel %s", chdr.ChannelId) - } + logger.Warningf("Received unauthorized deliver request for channel %s", chdr.ChannelId) return sendStatusReply(srv, cb.Status_FORBIDDEN) } seekInfo := &ab.SeekInfo{} if err = proto.Unmarshal(payload.Data, seekInfo); err != nil { - if logger.IsEnabledFor(logging.WARNING) { - logger.Warningf("Received a signed deliver request with malformed seekInfo payload: %s", err) - } + logger.Warningf("Received a signed deliver request with malformed seekInfo payload: %s", err) return sendStatusReply(srv, cb.Status_BAD_REQUEST) } if seekInfo.Start == nil || seekInfo.Stop == nil { - if logger.IsEnabledFor(logging.WARNING) { - logger.Warningf("Received seekInfo message with missing start or stop %v, %v", seekInfo.Start, seekInfo.Stop) - } + logger.Warningf("Received seekInfo message with missing start or stop %v, %v", seekInfo.Start, seekInfo.Stop) return sendStatusReply(srv, cb.Status_BAD_REQUEST) } - if logger.IsEnabledFor(logging.DEBUG) { - logger.Debugf("Received seekInfo (%p) %v for chain %s", seekInfo, seekInfo, chdr.ChannelId) - } + logger.Debugf("Received seekInfo (%p) %v for chain %s", seekInfo, seekInfo, chdr.ChannelId) cursor, number := chain.Reader().Iterator(seekInfo.Start) var stopNum uint64 @@ -140,6 +132,10 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { stopNum = chain.Reader().Height() - 1 case *ab.SeekPosition_Specified: stopNum = stop.Specified.Number + if stopNum < number { + logger.Warningf("Received invalid seekInfo message where start number %d is greater than stop number %d", number, stopNum) + return sendStatusReply(srv, cb.Status_BAD_REQUEST) + } } for { @@ -159,10 +155,10 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { return sendStatusReply(srv, status) } - if logger.IsEnabledFor(logging.DEBUG) { - logger.Debugf("Delivering block for (%p) channel: %s", seekInfo, chdr.ChannelId) - } + logger.Debugf("Delivering block for (%p) channel: %s", seekInfo, chdr.ChannelId) + if err := sendBlockReply(srv, block); err != nil { + logger.Warningf("Error sending to stream: %s", err) return err } @@ -172,11 +168,11 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { } if err := sendStatusReply(srv, cb.Status_SUCCESS); err != nil { + logger.Warningf("Error sending to stream: %s", err) return err } - if logger.IsEnabledFor(logging.DEBUG) { - logger.Debugf("Done delivering for (%p), waiting for new SeekInfo", seekInfo) - } + + logger.Debugf("Done delivering for (%p), waiting for new SeekInfo", seekInfo) } } diff --git a/orderer/common/deliver/deliver_test.go b/orderer/common/deliver/deliver_test.go index 03e6684ce8d..0d993ba5da7 100644 --- a/orderer/common/deliver/deliver_test.go +++ b/orderer/common/deliver/deliver_test.go @@ -18,6 +18,7 @@ package deliver import ( "fmt" + "io" "testing" "time" @@ -30,6 +31,7 @@ import ( ab "github.com/hyperledger/fabric/protos/orderer" "github.com/hyperledger/fabric/protos/utils" logging "github.com/op/go-logging" + "github.com/stretchr/testify/assert" "google.golang.org/grpc" ) @@ -64,7 +66,7 @@ func (m *mockD) Send(br *ab.DeliverResponse) error { func (m *mockD) Recv() (*cb.Envelope, error) { msg, ok := <-m.recvChan if !ok { - return msg, fmt.Errorf("Channel closed") + return msg, io.EOF } return msg, nil } @@ -98,6 +100,16 @@ func NewRAMLedger() ledger.ReadWriter { return rl } +func initializeDeliverHandler() Handler { + mm := newMockMultichainManager() + for i := 1; i < ledgerSize; i++ { + l := mm.chains[systemChainID].ledger + l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) + } + + return NewHandlerImpl(mm) +} + func newMockMultichainManager() *mockSupportManager { rl := NewRAMLedger() mm := &mockSupportManager{ @@ -132,16 +144,10 @@ func makeSeek(chainID string, seekInfo *ab.SeekInfo) *cb.Envelope { } func TestOldestSeek(t *testing.T) { - mm := newMockMultichainManager() - for i := 1; i < ledgerSize; i++ { - l := mm.chains[systemChainID].ledger - l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm) + ds := initializeDeliverHandler() go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekOldest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -170,16 +176,10 @@ func TestOldestSeek(t *testing.T) { } func TestNewestSeek(t *testing.T) { - mm := newMockMultichainManager() - for i := 1; i < ledgerSize; i++ { - l := mm.chains[systemChainID].ledger - l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm) + ds := initializeDeliverHandler() go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -187,10 +187,7 @@ func TestNewestSeek(t *testing.T) { select { case deliverReply := <-m.sendChan: if deliverReply.GetBlock() == nil { - if deliverReply.GetStatus() != cb.Status_SUCCESS { - t.Fatalf("Received an error on the reply channel") - } - return + t.Fatalf("Received an error on the reply channel") } if deliverReply.GetBlock().Header.Number != uint64(ledgerSize-1) { t.Fatalf("Expected only the most recent block") @@ -201,20 +198,14 @@ func TestNewestSeek(t *testing.T) { } func TestSpecificSeek(t *testing.T) { - mm := newMockMultichainManager() - for i := 1; i < ledgerSize; i++ { - l := mm.chains[systemChainID].ledger - l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm) - specifiedStart := uint64(3) - specifiedStop := uint64(7) + ds := initializeDeliverHandler() go ds.Handle(m) + specifiedStart := uint64(3) + specifiedStop := uint64(7) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(specifiedStart), Stop: seekSpecified(specifiedStop), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) count := uint64(0) @@ -264,16 +255,10 @@ func TestUnauthorizedSeek(t *testing.T) { } func TestBadSeek(t *testing.T) { - mm := newMockMultichainManager() - for i := 1; i < ledgerSize; i++ { - l := mm.chains[systemChainID].ledger - l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm) + ds := initializeDeliverHandler() go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(3 * ledgerSize)), Stop: seekSpecified(uint64(3 * ledgerSize)), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -289,16 +274,10 @@ func TestBadSeek(t *testing.T) { } func TestFailFastSeek(t *testing.T) { - mm := newMockMultichainManager() - for i := 1; i < ledgerSize; i++ { - l := mm.chains[systemChainID].ledger - l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm) + ds := initializeDeliverHandler() go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(ledgerSize - 1)), Stop: seekSpecified(ledgerSize), Behavior: ab.SeekInfo_FAIL_IF_NOT_READY}) @@ -373,3 +352,32 @@ func TestBlockingSeek(t *testing.T) { t.Fatalf("Timed out waiting to get all blocks") } } + +func TestSGracefulShutdown(t *testing.T) { + m := newMockD() + ds := NewHandlerImpl(nil) + + close(m.recvChan) + assert.NoError(t, ds.Handle(m), "Expected no error for hangup") +} + +func TestReversedSeqSeek(t *testing.T) { + m := newMockD() + defer close(m.recvChan) + + ds := initializeDeliverHandler() + go ds.Handle(m) + + specifiedStart := uint64(7) + specifiedStop := uint64(3) + m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(specifiedStart), Stop: seekSpecified(specifiedStop), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) + + select { + case deliverReply := <-m.sendChan: + if deliverReply.GetStatus() != cb.Status_BAD_REQUEST { + t.Fatalf("Received wrong error on the reply channel") + } + case <-time.After(time.Second): + t.Fatalf("Timed out waiting to get all blocks") + } +}