Skip to content

Commit

Permalink
[FAB-2759] DeliveryService peer<->OS high availability
Browse files Browse the repository at this point in the history
In previous commits, I added a delivery client:
https://gerrit.hyperledger.org/r/#/c/7343
and a connection producer:
https://gerrit.hyperledger.org/r/#/c/7337

In this commit, I integrate the two into the existing delivery service,
and:
- Decouple the blocks request action from the blocks provider
  and instead- put it into requester.go, which is used by
  the delivery client (client.go) in the following way-
  the client has a function that is invoked upon each successful (re)connection
  to the ordering service.
  This function utilizes the BlockRequester (requester.go) and makes it
  send a seekInfo message to the ordering service.
- Instead of the BlocksDeliverer to be created once at startup-
  we have the broadcastClient (client.go) from a previous change set
  that implements the BlocksDeliverer, and it does reconnection
  logic upon demand and the delivery service is oblivious of this.

This change set makes the delivery service automatically failover/reconnect to
backup ordering service endpoints once it disconnects from
the ordering service.

I added the following test cases:
	- Peer reconnect upon restart of the ordering service
	- Peer failover to other ordering service node
	- Peer is disconnected from ordering service upon close
          of the delivery service

Total code coverage of deliveryclient.go went up to 97%

Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
Change-Id: I2b4687cc3b5fc767150fa0de607890a68fd38449
  • Loading branch information
yacovm committed Apr 7, 2017
1 parent c0f8d75 commit 08b456e
Show file tree
Hide file tree
Showing 9 changed files with 530 additions and 200 deletions.
89 changes: 18 additions & 71 deletions core/deliverservice/blocksprovider/blocksprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@ limitations under the License.
package blocksprovider

import (
"math"
"sync/atomic"

"github.com/golang/protobuf/proto"
gossipcommon "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"

"github.com/hyperledger/fabric/common/localmsp"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/protos/common"
gossip_proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
)

Expand All @@ -56,10 +53,6 @@ type GossipServiceAdapter interface {
// BlocksProvider used to read blocks from the ordering service
// for specified chain it subscribed to
type BlocksProvider interface {
// RequestBlock acquire new blocks from ordering service based on
// information provided by ledger info instance
RequestBlocks(ledgerInfoProvider LedgerInfo) error

// DeliverBlocks starts delivering and disseminating blocks
DeliverBlocks()

Expand All @@ -69,21 +62,29 @@ type BlocksProvider interface {

// BlocksDeliverer defines interface which actually helps
// to abstract the AtomicBroadcast_DeliverClient with only
// required method for blocks provider. This also help to
// build up mocking facilities for testing purposes
// required method for blocks provider.
// This also decouples the production implementation of the gRPC stream
// from the code in order for the code to be more modular and testable.
type BlocksDeliverer interface {
// Recv capable to bring new blocks from the ordering service
// Recv retrieves a response from the ordering service
Recv() (*orderer.DeliverResponse, error)

// Send used to send request to the ordering service to obtain new blocks
// Send sends an envelope to the ordering service
Send(*common.Envelope) error
}

type streamClient interface {
BlocksDeliverer

// Close closes the stream and its underlying connection
Close()
}

// blocksProviderImpl the actual implementation for BlocksProvider interface
type blocksProviderImpl struct {
chainID string

client BlocksDeliverer
client streamClient

gossip GossipServiceAdapter

Expand All @@ -98,8 +99,8 @@ func init() {
logger = logging.MustGetLogger("blocksProvider")
}

// NewBlocksProvider constructor function to creare blocks deliverer instance
func NewBlocksProvider(chainID string, client BlocksDeliverer, gossip GossipServiceAdapter, mcs api.MessageCryptoService) BlocksProvider {
// NewBlocksProvider constructor function to create blocks deliverer instance
func NewBlocksProvider(chainID string, client streamClient, gossip GossipServiceAdapter, mcs api.MessageCryptoService) BlocksProvider {
return &blocksProviderImpl{
chainID: chainID,
client: client,
Expand All @@ -111,6 +112,7 @@ func NewBlocksProvider(chainID string, client BlocksDeliverer, gossip GossipServ
// DeliverBlocks used to pull out blocks from the ordering service to
// distributed them across peers
func (b *blocksProviderImpl) DeliverBlocks() {
defer b.client.Close()
for !b.isDone() {
msg, err := b.client.Recv()
if err != nil {
Expand Down Expand Up @@ -157,72 +159,17 @@ func (b *blocksProviderImpl) DeliverBlocks() {
}
}

// Stops blocks delivery provider
// Stop stops blocks delivery provider
func (b *blocksProviderImpl) Stop() {
atomic.StoreInt32(&b.done, 1)
b.client.Close()
}

// Check whenever provider is stopped
func (b *blocksProviderImpl) isDone() bool {
return atomic.LoadInt32(&b.done) == 1
}

func (b *blocksProviderImpl) RequestBlocks(ledgerInfoProvider LedgerInfo) error {
height, err := ledgerInfoProvider.LedgerHeight()
if err != nil {
logger.Errorf("Can't get legder height from committer [%s]", err)
return err
}

if height > 0 {
logger.Debugf("Starting deliver with block [%d]", height)
if err := b.seekLatestFromCommitter(height); err != nil {
return err
}
} else {
logger.Debug("Starting deliver with olders block")
if err := b.seekOldest(); err != nil {
return err
}
}

return nil
}

func (b *blocksProviderImpl) seekOldest() error {
seekInfo := &orderer.SeekInfo{
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
}

//TODO- epoch and msgVersion may need to be obtained for nowfollowing usage in orderer/configupdate/configupdate.go
msgVersion := int32(0)
epoch := uint64(0)
env, err := utils.CreateSignedEnvelope(common.HeaderType_CONFIG_UPDATE, b.chainID, localmsp.NewSigner(), seekInfo, msgVersion, epoch)
if err != nil {
return err
}
return b.client.Send(env)
}

func (b *blocksProviderImpl) seekLatestFromCommitter(height uint64) error {
seekInfo := &orderer.SeekInfo{
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: height}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
}

//TODO- epoch and msgVersion may need to be obtained for nowfollowing usage in orderer/configupdate/configupdate.go
msgVersion := int32(0)
epoch := uint64(0)
env, err := utils.CreateSignedEnvelope(common.HeaderType_CONFIG_UPDATE, b.chainID, localmsp.NewSigner(), seekInfo, msgVersion, epoch)
if err != nil {
return err
}
return b.client.Send(env)
}

func createGossipMsg(chainID string, payload *gossip_proto.Payload) *gossip_proto.GossipMessage {
gossipMsg := &gossip_proto.GossipMessage{
Nonce: 0,
Expand Down
24 changes: 12 additions & 12 deletions core/deliverservice/blocksprovider/blocksprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (*mockMCS) ValidateIdentity(peerIdentity api.PeerIdentityType) error {
// from given block sequence number.
func makeTestCase(ledgerHeight uint64) func(*testing.T) {
return func(t *testing.T) {
gossipServiceAdapter := &mocks.MockGossipServiceAdapter{}
gossipServiceAdapter := &mocks.MockGossipServiceAdapter{GossipBlockDisseminations: make(chan uint64)}
deliverer := &mocks.MockBlocksDeliverer{Pos: ledgerHeight}
deliverer.MockRecv = mocks.MockRecv

Expand All @@ -70,15 +70,9 @@ func makeTestCase(ledgerHeight uint64) func(*testing.T) {
mcs: &mockMCS{},
}

provider.RequestBlocks(&mocks.MockLedgerInfo{ledgerHeight})

var wg sync.WaitGroup
wg.Add(1)

ready := make(chan struct{})
go func() {
provider.DeliverBlocks()
wg.Done()
go provider.DeliverBlocks()
// Send notification
ready <- struct{}{}
}()
Expand All @@ -91,7 +85,11 @@ func makeTestCase(ledgerHeight uint64) func(*testing.T) {
{
// Check that all blocks received eventually get gossiped and locally committed
assert.True(t, deliverer.RecvCnt == gossipServiceAdapter.AddPayloadsCnt)
assert.True(t, deliverer.RecvCnt == gossipServiceAdapter.GossipCallsCnt)
select {
case <-gossipServiceAdapter.GossipBlockDisseminations:
case <-time.After(time.Second):
assert.Fail(t, "Didn't gossip a block within a timely manner")
}
return
}
case <-time.After(time.Duration(1) * time.Second):
Expand Down Expand Up @@ -140,8 +138,6 @@ func TestBlocksProvider_CheckTerminationDeliveryResponseStatus(t *testing.T) {
client: &tmp,
}

provider.RequestBlocks(&mocks.MockLedgerInfo{0})

var wg sync.WaitGroup
wg.Add(1)

Expand All @@ -163,7 +159,11 @@ func TestBlocksProvider_CheckTerminationDeliveryResponseStatus(t *testing.T) {
// No payload should commit locally
assert.Equal(t, int32(0), gossipServiceAdapter.AddPayloadsCnt)
// No payload should be transfered to other peers
assert.Equal(t, int32(0), gossipServiceAdapter.GossipCallsCnt)
select {
case <-gossipServiceAdapter.GossipBlockDisseminations:
assert.Fail(t, "Gossiped block but shouldn't have")
case <-time.After(time.Second):
}
return
}
case <-time.After(time.Duration(1) * time.Second):
Expand Down
5 changes: 4 additions & 1 deletion core/deliverservice/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,11 +565,12 @@ func (s *signerMock) Sign(message []byte) ([]byte, error) {
}

func TestProductionUsage(t *testing.T) {
defer ensureNoGoroutineLeak(t)()
// This test configures the client in a similar fashion as will be
// in production, and tests against a live gRPC server.
os := mocks.NewOrderer(5612, t)
os.SetNextExpectedSeek(5)
defer os.Shutdown()

connFact := func(endpoint string) (*grpc.ClientConn, error) {
return grpc.Dial(endpoint, grpc.WithInsecure(), grpc.WithBlock())
}
Expand All @@ -593,6 +594,8 @@ func TestProductionUsage(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, resp)
assert.Equal(t, uint64(5), resp.GetBlock().Header.Number)
os.Shutdown()
cl.Close()
}

func newTestSeekInfo() *orderer.SeekInfo {
Expand Down
Loading

0 comments on commit 08b456e

Please sign in to comment.