Skip to content

Commit

Permalink
[FAB-7273] Update deliver to facilitate usage on peer
Browse files Browse the repository at this point in the history
This CR updates the deliver functionality to facilitate its usage on
a peer as well as an orderer.

This required:
- modifying the signal logic for when a new block is available due to
 the difference in addition of blocks to the ledger between the orderer
 and the peer. The signal logic is now handled using the iterator
 itself, which signals when it finds a new block
- adding a policy variable to the deliver handler to ensure the peer
and orderer each can control access to deliver

Change-Id: Iebb6c25a8c5ac32d65f909eb0519f26bfde0dc31
Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
  • Loading branch information
wlahti committed Dec 7, 2017
1 parent 0dfe4f3 commit c39d69b
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 95 deletions.
54 changes: 33 additions & 21 deletions common/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ type Support interface {
}

type deliverServer struct {
sm SupportManager
sm SupportManager
policyName string
}

// NewHandlerImpl creates an implementation of the Handler interface
func NewHandlerImpl(sm SupportManager) Handler {
func NewHandlerImpl(sm SupportManager, policyName string) Handler {
return &deliverServer{
sm: sm,
sm: sm,
policyName: policyName,
}
}

Expand Down Expand Up @@ -137,7 +139,7 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env

lastConfigSequence := chain.Sequence()

sf := NewSigFilter(policies.ChannelReaders, chain)
sf := NewSigFilter(ds.policyName, chain)
if err := sf.Apply(envelope); err != nil {
logger.Warningf("[channel: %s] Received unauthorized deliver request from %s: %s", chdr.ChannelId, addr, err)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
Expand Down Expand Up @@ -173,21 +175,21 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
}

for {
if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY {
select {
case <-erroredChan:
logger.Warningf("[channel: %s] Aborting deliver for request because of consenter error", chdr.ChannelId, addr)
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
case <-cursor.ReadyChan():
}
} else {
select {
case <-cursor.ReadyChan():
default:
if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
if number > chain.Reader().Height()-1 {
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}
}

block, status := nextBlock(cursor, erroredChan)
if status != cb.Status_SUCCESS {
cursor.Close()
logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
return sendStatusReply(srv, status)
}
// increment block number to support FAIL_IF_NOT_READY deliver behavior
number++

currentConfigSequence := chain.Sequence()
if currentConfigSequence > lastConfigSequence {
lastConfigSequence = currentConfigSequence
Expand All @@ -197,12 +199,6 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
}
}

block, status := cursor.Next()
if status != cb.Status_SUCCESS {
logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
return sendStatusReply(srv, status)
}

logger.Debugf("[channel: %s] Delivering block for (%p) for %s", chdr.ChannelId, seekInfo, addr)

if err := sendBlockReply(srv, block); err != nil {
Expand All @@ -226,6 +222,22 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env

}

func nextBlock(cursor blockledger.Iterator, cancel <-chan struct{}) (block *cb.Block, status cb.Status) {
done := make(chan struct{})
go func() {
defer close(done)
block, status = cursor.Next()
}()

select {
case <-done:
return
case <-cancel:
logger.Warningf("Aborting deliver for request because of background error")
return nil, cb.Status_SERVICE_UNAVAILABLE
}
}

func sendStatusReply(srv ab.AtomicBroadcast_DeliverServer, status cb.Status) error {
return srv.Send(&ab.DeliverResponse{
Type: &ab.DeliverResponse_Status{Status: status},
Expand Down
20 changes: 11 additions & 9 deletions common/deliver/deliver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var genesisBlock = cb.NewBlock(0, nil)

var systemChainID = "systemChain"

var policyName = policies.ChannelReaders

const ledgerSize = 10

func init() {
Expand Down Expand Up @@ -156,7 +158,7 @@ func initializeDeliverHandler() Handler {
l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
}

return NewHandlerImpl(mm)
return NewHandlerImpl(mm, policyName)
}

func newMockMultichainManager() *mockSupportManager {
Expand Down Expand Up @@ -288,7 +290,7 @@ func TestUnauthorizedSeek(t *testing.T) {

m := newMockD()
defer close(m.recvChan)
ds := NewHandlerImpl(mm)
ds := NewHandlerImpl(mm, policyName)

go ds.Handle(m)

Expand All @@ -313,7 +315,7 @@ func TestRevokedAuthorizationSeek(t *testing.T) {

m := newMockD()
defer close(m.recvChan)
ds := NewHandlerImpl(mm)
ds := NewHandlerImpl(mm, policyName)

go ds.Handle(m)

Expand Down Expand Up @@ -396,7 +398,7 @@ func TestBlockingSeek(t *testing.T) {

m := newMockD()
defer close(m.recvChan)
ds := NewHandlerImpl(mm)
ds := NewHandlerImpl(mm, policyName)

go ds.Handle(m)

Expand Down Expand Up @@ -450,7 +452,7 @@ func TestErroredSeek(t *testing.T) {

m := newMockD()
defer close(m.recvChan)
ds := NewHandlerImpl(mm)
ds := NewHandlerImpl(mm, policyName)

go ds.Handle(m)

Expand All @@ -474,7 +476,7 @@ func TestErroredBlockingSeek(t *testing.T) {

m := newMockD()
defer close(m.recvChan)
ds := NewHandlerImpl(mm)
ds := NewHandlerImpl(mm, policyName)

go ds.Handle(m)

Expand All @@ -499,7 +501,7 @@ func TestErroredBlockingSeek(t *testing.T) {

func TestSGracefulShutdown(t *testing.T) {
m := newMockD()
ds := NewHandlerImpl(nil)
ds := NewHandlerImpl(nil, policyName)

close(m.recvChan)
assert.NoError(t, ds.Handle(m), "Expected no error for hangup")
Expand Down Expand Up @@ -527,7 +529,7 @@ func TestReversedSeqSeek(t *testing.T) {
}

func TestBadStreamRecv(t *testing.T) {
bh := NewHandlerImpl(nil)
bh := NewHandlerImpl(nil, policyName)
assert.Error(t, bh.Handle(&erroneousRecvMockD{}), "Should catch unexpected stream error")
}

Expand Down Expand Up @@ -616,7 +618,7 @@ func TestChainNotFound(t *testing.T) {
m := newMockD()
defer close(m.recvChan)

ds := NewHandlerImpl(mm)
ds := NewHandlerImpl(mm, policyName)
go ds.Handle(m)

m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})
Expand Down
2 changes: 1 addition & 1 deletion common/ledger/blockledger/file/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (flf *fileLedgerFactory) GetOrCreate(chainID string) (blockledger.ReadWrite
if err != nil {
return nil, err
}
ledger = &fileLedger{blockStore: blockStore, signal: make(chan struct{})}
ledger = NewFileLedger(blockStore)
flf.ledgers[key] = ledger
return ledger, nil
}
Expand Down
40 changes: 23 additions & 17 deletions common/ledger/blockledger/file/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package fileledger
import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/blockledger"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
Expand All @@ -40,31 +39,38 @@ func init() {
}

// FileLedger is a struct used to interact with a node's ledger
type fileLedger struct {
blockStore blkstorage.BlockStore
type FileLedger struct {
blockStore FileLedgerBlockStore
signal chan struct{}
}

// FileLedgerBlockStore defines the interface to interact with deliver when using a
// file ledger
type FileLedgerBlockStore interface {
AddBlock(block *cb.Block) error
GetBlockchainInfo() (*cb.BlockchainInfo, error)
RetrieveBlocks(startBlockNumber uint64) (ledger.ResultsIterator, error)
}

// NewFileLedger creates a new FileLedger for interaction with the ledger
func NewFileLedger(blockStore FileLedgerBlockStore) *FileLedger {
return &FileLedger{blockStore: blockStore, signal: make(chan struct{})}
}

type fileLedgerIterator struct {
ledger *fileLedger
ledger *FileLedger
blockNumber uint64
commonIterator ledger.ResultsIterator
}

// Next blocks until there is a new block available, or returns an error if the
// next block is no longer retrievable
func (i *fileLedgerIterator) Next() (*cb.Block, cb.Status) {
for {
if i.blockNumber < i.ledger.Height() {
result, err := i.commonIterator.Next()
if err != nil {
return nil, cb.Status_SERVICE_UNAVAILABLE
}
i.blockNumber++
return result.(*cb.Block), cb.Status_SUCCESS
}
<-i.ledger.signal
result, err := i.commonIterator.Next()
if err != nil {
return nil, cb.Status_SERVICE_UNAVAILABLE
}
return result.(*cb.Block), cb.Status_SUCCESS
}

// ReadyChan supplies a channel which will block until Next will not block
Expand All @@ -83,7 +89,7 @@ func (i *fileLedgerIterator) Close() {

// Iterator returns an Iterator, as specified by an ab.SeekInfo message, and its
// starting block number
func (fl *fileLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iterator, uint64) {
func (fl *FileLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iterator, uint64) {
var startingBlockNumber uint64
switch start := startPosition.Type.(type) {
case *ab.SeekPosition_Oldest:
Expand Down Expand Up @@ -114,7 +120,7 @@ func (fl *fileLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iter
}

// Height returns the number of blocks on the ledger
func (fl *fileLedger) Height() uint64 {
func (fl *FileLedger) Height() uint64 {
info, err := fl.blockStore.GetBlockchainInfo()
if err != nil {
logger.Panic(err)
Expand All @@ -123,7 +129,7 @@ func (fl *fileLedger) Height() uint64 {
}

// Append a new block to the ledger
func (fl *fileLedger) Append(block *cb.Block) error {
func (fl *FileLedger) Append(block *cb.Block) error {
err := fl.blockStore.AddBlock(block)
if err == nil {
close(fl.signal)
Expand Down
Loading

0 comments on commit c39d69b

Please sign in to comment.