Skip to content

Commit

Permalink
[FAB-1938]: Read orderers endpoints from CB.
Browse files Browse the repository at this point in the history
Currently there is a paramter in core.yaml file
which defines the ordering service endpoint,
CORE_PEER_COMMITTER_LEDGER_ORDERER. This commit
takes care to read configuration transaction to
extract list of orderers endpoints and use it for
delivery client.

There is no more need in CORE_PEER_COMMITTER_LEDGER_ORDERER,
hence relevant section in core.yaml files cleaned as well.

Change-Id: I07a5bf19a1725194510afb90dae23c68af6ab95f
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Mar 2, 2017
1 parent 01cc491 commit 7dc370a
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 48 deletions.
38 changes: 19 additions & 19 deletions core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ package deliverclient

import (
"errors"
"fmt"
"math/rand"
"sync"
"time"

"fmt"

"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/op/go-logging"
"github.com/spf13/viper"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -102,26 +101,27 @@ type deliverServiceImpl struct {
// delivery service instance. It tries to establish connection to
// the specified in the configuration ordering service, in case it
// fails to dial to it, return nil
func NewDeliverService(gossip blocksprovider.GossipServiceAdapter) (DeliverService, error) {
// TODO: Has to be fixed as ordering service configuration is part of the part of configuration block
endpoint := viper.GetString("peer.committer.ledger.orderer")
logger.Infof("Creating delivery service to get blocks from the ordering service, %s", endpoint)
func NewDeliverService(gossip blocksprovider.GossipServiceAdapter, endpoints []string) (DeliverService, error) {
indices := rand.Perm(len(endpoints))
for _, idx := range indices {
logger.Infof("Creating delivery service to get blocks from the ordering service, %s", endpoints[idx])

dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(3 * time.Second), grpc.WithBlock()}
dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(3 * time.Second), grpc.WithBlock()}

if comm.TLSEnabled() {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.InitTLSForPeer()))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
if comm.TLSEnabled() {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.InitTLSForPeer()))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}

conn, err := grpc.Dial(endpoint, dialOpts...)
if err != nil {
logger.Errorf("Cannot dial to %s, because of %s", endpoint, err)
return nil, err
conn, err := grpc.Dial(endpoints[idx], dialOpts...)
if err != nil {
logger.Errorf("Cannot dial to %s, because of %s", endpoints[idx], err)
continue
}
return NewFactoryDeliverService(gossip, &blocksDelivererFactoryImpl{conn}, conn), nil
}

return NewFactoryDeliverService(gossip, &blocksDelivererFactoryImpl{conn}, conn), nil
return nil, fmt.Errorf("Wasn't able to connect to any of ordering service endpoints %s", endpoints)
}

// NewFactoryDeliverService construction function to create and initialize
Expand Down
7 changes: 6 additions & 1 deletion core/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package peer

import (
"errors"
"fmt"
"math"
"net"
Expand Down Expand Up @@ -199,7 +200,11 @@ func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error {
}

c := committer.NewLedgerCommitter(ledger, txvalidator.NewTxValidator(cs))
service.GetGossipService().InitializeChannel(cs.ChainID(), c)
ordererAddresses := configtxManager.ChannelConfig().OrdererAddresses()
if len(ordererAddresses) == 0 {
return errors.New("No orderering service endpoint provided in configuration block")
}
service.GetGossipService().InitializeChannel(cs.ChainID(), c, ordererAddresses)

chains.Lock()
defer chains.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion core/peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (*mockDeliveryClient) Stop() {
type mockDeliveryClientFactory struct {
}

func (*mockDeliveryClientFactory) Service(g service.GossipService) (deliverclient.DeliverService, error) {
func (*mockDeliveryClientFactory) Service(g service.GossipService, endpoints []string) (deliverclient.DeliverService, error) {
return &mockDeliveryClient{}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/scc/cscc/configure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (*mockDeliveryClient) Stop() {
type mockDeliveryClientFactory struct {
}

func (*mockDeliveryClientFactory) Service(g service.GossipService) (deliverclient.DeliverService, error) {
func (*mockDeliveryClientFactory) Service(g service.GossipService, endpoints []string) (deliverclient.DeliverService, error) {
return &mockDeliveryClient{}, nil
}

Expand Down
12 changes: 6 additions & 6 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type GossipService interface {
// NewConfigEventer creates a ConfigProcessor which the configtx.Manager can ultimately route config updates to
NewConfigEventer() ConfigProcessor
// InitializeChannel allocates the state provider and should be invoked once per channel per execution
InitializeChannel(chainID string, committer committer.Committer)
InitializeChannel(chainID string, committer committer.Committer, endpoints []string)
// GetBlock returns block for given chain
GetBlock(chainID string, index uint64) *common.Block
// AddPayload appends message payload to for given chain
Expand All @@ -61,15 +61,15 @@ type GossipService interface {
// DeliveryServiceFactory factory to create and initialize delivery service instance
type DeliveryServiceFactory interface {
// Returns an instance of delivery client
Service(g GossipService) (deliverclient.DeliverService, error)
Service(g GossipService, endpoints []string) (deliverclient.DeliverService, error)
}

type deliveryFactoryImpl struct {
}

// Returns an instance of delivery client
func (*deliveryFactoryImpl) Service(g GossipService) (deliverclient.DeliverService, error) {
return deliverclient.NewDeliverService(g)
func (*deliveryFactoryImpl) Service(g GossipService, endpoints []string) (deliverclient.DeliverService, error) {
return deliverclient.NewDeliverService(g, endpoints)
}

type gossipServiceImpl struct {
Expand Down Expand Up @@ -159,15 +159,15 @@ func (g *gossipServiceImpl) NewConfigEventer() ConfigProcessor {
}

// InitializeChannel allocates the state provider and should be invoked once per channel per execution
func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committer.Committer) {
func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committer.Committer, endpoints []string) {
g.lock.Lock()
defer g.lock.Unlock()
// Initialize new state provider for given committer
logger.Debug("Creating state provider for chainID", chainID)
g.chains[chainID] = state.NewGossipStateProvider(chainID, g, committer, g.mcs)
if g.deliveryService == nil {
var err error
g.deliveryService, err = g.deliveryFactory.Service(gossipServiceInstance)
g.deliveryService, err = g.deliveryFactory.Service(gossipServiceInstance, endpoints)
if err != nil {
logger.Warning("Cannot create delivery client, due to", err)
}
Expand Down
12 changes: 6 additions & 6 deletions gossip/service/gossip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestLeaderElectionWithDeliverClient(t *testing.T) {
gossips[i].(*gossipServiceImpl).deliveryFactory = deliverServiceFactory
deliverServiceFactory.service.running[channelName] = false

gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0})
gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0}, []string{"localhost:5005"})
service, exist := gossips[i].(*gossipServiceImpl).leaderElection[channelName]
assert.True(t, exist, "Leader election service should be created for peer %d and channel %s", i, channelName)
services[i] = &electionService{nil, false, 0}
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestWithStaticDeliverClientLeader(t *testing.T) {
for i := 0; i < n; i++ {
gossips[i].(*gossipServiceImpl).deliveryFactory = deliverServiceFactory
deliverServiceFactory.service.running[channelName] = false
gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0})
gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0}, []string{"localhost:5005"})
}

for i := 0; i < n; i++ {
Expand All @@ -180,7 +180,7 @@ func TestWithStaticDeliverClientLeader(t *testing.T) {
channelName = "chanB"
for i := 0; i < n; i++ {
deliverServiceFactory.service.running[channelName] = false
gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0})
gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0}, []string{"localhost:5005"})
}

for i := 0; i < n; i++ {
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestWithStaticDeliverClientNotLeader(t *testing.T) {
for i := 0; i < n; i++ {
gossips[i].(*gossipServiceImpl).deliveryFactory = deliverServiceFactory
deliverServiceFactory.service.running[channelName] = false
gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0})
gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0}, []string{"localhost:5005"})
}

for i := 0; i < n; i++ {
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestWithStaticDeliverClientBothStaticAndLeaderElection(t *testing.T) {
for i := 0; i < n; i++ {
gossips[i].(*gossipServiceImpl).deliveryFactory = deliverServiceFactory
assert.Panics(t, func() {
gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0})
gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0}, []string{"localhost:5005"})
}, "Dynamic leader lection based and static connection to ordering service can't exist simultaniosly")
}

Expand All @@ -265,7 +265,7 @@ type mockDeliverServiceFactory struct {
service *mockDeliverService
}

func (mf *mockDeliverServiceFactory) Service(g GossipService) (deliverclient.DeliverService, error) {
func (mf *mockDeliverServiceFactory) Service(g GossipService, endpoints []string) (deliverclient.DeliverService, error) {
return mf.service, nil
}

Expand Down
14 changes: 0 additions & 14 deletions peer/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,6 @@ peer:
# if > 0, if buffer full, blocks till timeout
timeout: 10

# ----!!!!IMPORTANT!!!-!!!IMPORTANT!!!-!!!IMPORTANT!!!!----
# THIS HAS TO BE DONE IN THE CONTEXT OF BOOTSTRAP. TILL THAT
# IS DESIGNED AND FINALIZED, THE FOLLOWING COMMITTER/ORDERER
# DEFINITIONS HAVE TO SERVE AS THE MEANS TO DRIVE A SIMPLE
# SKELETON.
#
# All "chaincode" commands from CLI (except "query") will
# send response from the endorser to the Committer defined below.
committer:
enabled: true
ledger:
# orderer to talk to
orderer: 0.0.0.0:7050

# TLS Settings for p2p communications
tls:
enabled: false
Expand Down

0 comments on commit 7dc370a

Please sign in to comment.