Skip to content

Commit

Permalink
[FAB-1924]: Rework delivery client
Browse files Browse the repository at this point in the history
Move ordering service delivery client management into
gossip service, this to be able to maintain single
connection to the ordering service. Moreover this to
be widely used after gossip based leader election
integrated.

Change-Id: Iea9a70a1d6ba82caa55716444c54f3ddbc19673b
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Jan 31, 2017
1 parent 8762744 commit 945c4f7
Show file tree
Hide file tree
Showing 13 changed files with 789 additions and 200 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Copyright IBM Corp. 2017 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -14,150 +14,172 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package deliverclient
package blocksprovider

import (
"math"
"time"
"sync/atomic"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/committer"
gossipcommon "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"

gossip_proto "github.com/hyperledger/fabric/gossip/proto"
"github.com/hyperledger/fabric/gossip/service"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
"github.com/spf13/viper"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

var logger *logging.Logger // package-level logger
// LedgerInfo an adapter to provide the interface to query
// the ledger committer for current ledger height
type LedgerInfo interface {
// LedgerHeight returns current local ledger height
LedgerHeight() (uint64, error)
}

func init() {
logger = logging.MustGetLogger("deliveryService")
// GossipServiceAdapter serves to provide basic functionality
// required from gossip service by delivery service
type GossipServiceAdapter interface {
// PeersOfChannel returns slice with members of specified channel
PeersOfChannel(gossipcommon.ChainID) []discovery.NetworkMember

// AddPayload adds payload to the local state sync buffer
AddPayload(chainID string, payload *gossip_proto.Payload) error

// Gossip the message across the peers
Gossip(msg *gossip_proto.GossipMessage)
}

// BlocksProvider used to read blocks from the ordering service
// for specified chain it subscribed to
type BlocksProvider interface {
// RequestBlock acquire new blocks from ordering service based on
// information provided by ledger info instance
RequestBlocks(ledgerInfoProvider LedgerInfo) error

// DeliverBlocks starts delivering and disseminating blocks
DeliverBlocks()

// Stop shutdowns blocks provider and stops delivering new blocks
Stop()
}

// DeliverService used to communicate with orderers to obtain
// new block and send the to the committer service
type DeliverService struct {
client orderer.AtomicBroadcast_DeliverClient
// BlocksDeliverer defines interface which actually helps
// to abstract the AtomicBroadcast_DeliverClient with only
// required method for blocks provider. This also help to
// build up mocking facilities for testing purposes
type BlocksDeliverer interface {
// Recv capable to bring new blocks from the ordering service
Recv() (*orderer.DeliverResponse, error)

// Send used to send request to the ordering service to obtain new blocks
Send(*common.Envelope) error
}

// blocksProviderImpl the actual implementation for BlocksProvider interface
type blocksProviderImpl struct {
chainID string
conn *grpc.ClientConn

client BlocksDeliverer

gossip GossipServiceAdapter

done int32
}

// StopDeliveryService sends stop to the delivery service reference
func StopDeliveryService(service *DeliverService) {
if service != nil {
service.Stop()
var logger *logging.Logger // package-level logger

func init() {
logger = logging.MustGetLogger("blocksProvider")
}

// NewBlocksProvider constructor function to creare blocks deliverer instance
func NewBlocksProvider(chainID string, client BlocksDeliverer, gossip GossipServiceAdapter) BlocksProvider {
return &blocksProviderImpl{
chainID: chainID,
client: client,
gossip: gossip,
}
}

// NewDeliverService construction function to create and initilize
// delivery service instance
func NewDeliverService(chainID string) *DeliverService {
if viper.GetBool("peer.committer.enabled") {
logger.Infof("Creating committer for single noops endorser")
deliverService := &DeliverService{
// Instance of RawLedger
chainID: chainID,
// DeliverBlocks used to pull out blocks from the ordering service to
// distributed them across peers
func (b *blocksProviderImpl) DeliverBlocks() {
for !b.isDone() {
msg, err := b.client.Recv()
if err != nil {
logger.Warningf("Receive error: %s", err.Error())
return
}
switch t := msg.Type.(type) {
case *orderer.DeliverResponse_Status:
if t.Status == common.Status_SUCCESS {
logger.Warning("ERROR! Received success for a seek that should never complete")
return
}
logger.Warning("Got error ", t)
case *orderer.DeliverResponse_Block:
seqNum := t.Block.Header.Number

return deliverService
numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))
// Create payload with a block received
payload := createPayload(seqNum, t.Block)
// Use payload to create gossip message
gossipMsg := createGossipMsg(b.chainID, payload)

logger.Debugf("Adding payload locally, buffer seqNum = [%d], peers number [%d]", seqNum, numberOfPeers)
// Add payload to local state payloads buffer
b.gossip.AddPayload(b.chainID, payload)

// Gossip messages with other nodes
logger.Debugf("Gossiping block [%d], peers number [%d]", seqNum, numberOfPeers)
b.gossip.Gossip(gossipMsg)
default:
logger.Warning("Received unknown: ", t)
return
}
}
logger.Infof("Committer disabled")
return nil
}

func (d *DeliverService) startDeliver(committer committer.Committer) error {
logger.Info("Starting deliver service client")
err := d.initDeliver()
// Stops blocks delivery provider
func (b *blocksProviderImpl) Stop() {
atomic.StoreInt32(&b.done, 1)
}

if err != nil {
logger.Errorf("Can't initiate deliver protocol [%s]", err)
return err
}
// Check whenever provider is stopped
func (b *blocksProviderImpl) isDone() bool {
return atomic.LoadInt32(&b.done) == 1
}

height, err := committer.LedgerHeight()
func (b *blocksProviderImpl) RequestBlocks(ledgerInfoProvider LedgerInfo) error {
height, err := ledgerInfoProvider.LedgerHeight()
if err != nil {
logger.Errorf("Can't get legder height from committer [%s]", err)
return err
}

if height > 0 {
logger.Debugf("Starting deliver with block [%d]", height)
if err := d.seekLatestFromCommitter(height); err != nil {
if err := b.seekLatestFromCommitter(height); err != nil {
return err
}

} else {
logger.Debug("Starting deliver with olders block")
if err := d.seekOldest(); err != nil {
if err := b.seekOldest(); err != nil {
return err
}

}

d.readUntilClose()

return nil
}

func (d *DeliverService) initDeliver() error {
opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(3 * time.Second), grpc.WithBlock()}
endpoint := viper.GetString("peer.committer.ledger.orderer")
conn, err := grpc.Dial(endpoint, opts...)
if err != nil {
logger.Errorf("Cannot dial to %s, because of %s", endpoint, err)
return err
}
var abc orderer.AtomicBroadcast_DeliverClient
abc, err = orderer.NewAtomicBroadcastClient(conn).Deliver(context.TODO())
if err != nil {
logger.Errorf("Unable to initialize atomic broadcast, due to %s", err)
return err
}

// Atomic Broadcast Deliver Client
d.client = abc
d.conn = conn
return nil

}

func (d *DeliverService) stopDeliver() {
if d.conn != nil {
d.conn.Close()
}
}

// Stop all service and release resources
func (d *DeliverService) Stop() {
d.stopDeliver()
}

// Start delivery service
func (d *DeliverService) Start(committer committer.Committer) {
go d.checkLeaderAndRunDeliver(committer)
}

func (d *DeliverService) checkLeaderAndRunDeliver(committer committer.Committer) {
isLeader := viper.GetBool("peer.gossip.orgLeader")

if isLeader {
d.startDeliver(committer)
}
}

func (d *DeliverService) seekOldest() error {
return d.client.Send(&common.Envelope{
func (b *blocksProviderImpl) seekOldest() error {
return b.client.Send(&common.Envelope{
Payload: utils.MarshalOrPanic(&common.Payload{
Header: &common.Header{
ChainHeader: &common.ChainHeader{
ChainID: d.chainID,
ChainID: b.chainID,
},
SignatureHeader: &common.SignatureHeader{},
},
Expand All @@ -170,12 +192,12 @@ func (d *DeliverService) seekOldest() error {
})
}

func (d *DeliverService) seekLatestFromCommitter(height uint64) error {
return d.client.Send(&common.Envelope{
func (b *blocksProviderImpl) seekLatestFromCommitter(height uint64) error {
return b.client.Send(&common.Envelope{
Payload: utils.MarshalOrPanic(&common.Payload{
Header: &common.Header{
ChainHeader: &common.ChainHeader{
ChainID: d.chainID,
ChainID: b.chainID,
},
SignatureHeader: &common.SignatureHeader{},
},
Expand All @@ -188,43 +210,6 @@ func (d *DeliverService) seekLatestFromCommitter(height uint64) error {
})
}

func (d *DeliverService) readUntilClose() {
for {
msg, err := d.client.Recv()
if err != nil {
logger.Warningf("Receive error: %s", err.Error())
return
}
switch t := msg.Type.(type) {
case *orderer.DeliverResponse_Status:
if t.Status == common.Status_SUCCESS {
logger.Warning("ERROR! Received success for a seek that should never complete")
return
}
logger.Warning("Got error ", t)
case *orderer.DeliverResponse_Block:
seqNum := t.Block.Header.Number

numberOfPeers := len(service.GetGossipService().PeersOfChannel(gossipcommon.ChainID(d.chainID)))
// Create payload with a block received
payload := createPayload(seqNum, t.Block)
// Use payload to create gossip message
gossipMsg := createGossipMsg(d.chainID, payload)

logger.Debugf("Adding payload locally, buffer seqNum = [%d], peers number [%d]", seqNum, numberOfPeers)
// Add payload to local state payloads buffer
service.GetGossipService().AddPayload(d.chainID, payload)

// Gossip messages with other nodes
logger.Debugf("Gossiping block [%d], peers number [%d]", seqNum, numberOfPeers)
service.GetGossipService().Gossip(gossipMsg)
default:
logger.Warning("Received unknown: ", t)
return
}
}
}

func createGossipMsg(chainID string, payload *gossip_proto.Payload) *gossip_proto.GossipMessage {
gossipMsg := &gossip_proto.GossipMessage{
Nonce: 0,
Expand Down
Loading

0 comments on commit 945c4f7

Please sign in to comment.