Skip to content

Commit

Permalink
[FAB-12477]: add etcd/raft membership message
Browse files Browse the repository at this point in the history
This commit adds new message to hold information about mapping from
consenters to theirs ids and to keep track on the next id to be assigned
for new coming replica. Moreover update write block path to actually
persist information about raft cluster membership mapping.

Change-Id: I638b30fc5729a0d02f21a40989deb93e42b09836
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin authored and guoger committed Oct 29, 2018
1 parent d92a41d commit fc0c4e9
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 114 deletions.
58 changes: 13 additions & 45 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"context"
"encoding/pem"
"fmt"
"reflect"
"sync/atomic"
"time"

Expand Down Expand Up @@ -68,7 +67,7 @@ type Options struct {
HeartbeatTick int
MaxSizePerMsg uint64
MaxInflightMsgs int
Peers []raft.Peer
RaftMetadata *etcdraft.RaftMetadata
}

// Chain implements consensus.Chain interface.
Expand Down Expand Up @@ -148,7 +147,10 @@ func (c *Chain) Start() {
return
}

c.node = raft.StartNode(config, c.opts.Peers)
raftPeers := RaftPeers(c.opts.RaftMetadata.Consenters)

c.node = raft.StartNode(config, raftPeers)

close(c.startC)

go c.serveRaft()
Expand Down Expand Up @@ -371,12 +373,13 @@ func (c *Chain) serveRequest() {
}

func (c *Chain) writeBlock(b *common.Block) {
metadata := utils.MarshalOrPanic(c.opts.RaftMetadata)
if utils.IsConfigBlock(b) {
c.support.WriteConfigBlock(b, nil)
c.support.WriteConfigBlock(b, metadata)
return
}

c.support.WriteBlock(b, nil)
c.support.WriteBlock(b, metadata)
}

// Orders the envelope in the `msg` content. SubmitRequest.
Expand Down Expand Up @@ -425,12 +428,7 @@ func (c *Chain) commitBatches(batches ...[]*common.Envelope) error {

select {
case block := <-c.commitC:
if utils.IsConfigBlock(block) {
c.support.WriteConfigBlock(block, nil)
} else {
c.support.WriteBlock(block, nil)
}

c.writeBlock(block)
case <-c.resignC:
return errors.Errorf("aborted block committing: lost leadership")

Expand Down Expand Up @@ -555,7 +553,7 @@ func (c *Chain) isConfig(env *common.Envelope) bool {
}

func (c *Chain) configureComm() error {
nodes, err := c.nodeConfigFromMetadata()
nodes, err := c.remotePeers()
if err != nil {
return err
}
Expand All @@ -564,15 +562,9 @@ func (c *Chain) configureComm() error {
return nil
}

func (c *Chain) nodeConfigFromMetadata() ([]cluster.RemoteNode, error) {
func (c *Chain) remotePeers() ([]cluster.RemoteNode, error) {
var nodes []cluster.RemoteNode
m := &etcdraft.Metadata{}
if err := proto.Unmarshal(c.support.SharedConfig().ConsensusMetadata(), m); err != nil {
return nil, errors.Wrap(err, "failed to extract consensus metadata")
}

for id, consenter := range m.Consenters {
raftID := uint64(id + 1)
for raftID, consenter := range c.opts.RaftMetadata.Consenters {
// No need to know yourself
if raftID == c.raftID {
continue
Expand Down Expand Up @@ -615,33 +607,9 @@ func (c *Chain) checkConsentersSet(configValue *common.ConfigValue) error {
return errors.Wrap(err, "failed to unmarshal updated (new) etcdraft metadata configuration")
}

currentMetadata := &etcdraft.Metadata{}
if err := proto.Unmarshal(c.support.SharedConfig().ConsensusMetadata(), currentMetadata); err != nil {
return errors.Wrap(err, "failed to unmarshal current etcdraft metadata configuration")
}

if !c.consentersSetEqual(currentMetadata.Consenters, updatedMetadata.Consenters) {
if !ConsentersChanged(c.opts.RaftMetadata.Consenters, updatedMetadata.Consenters) {
return errors.New("update of consenters set is not supported yet")
}

return nil
}

func (c *Chain) consentersSetEqual(c1 []*etcdraft.Consenter, c2 []*etcdraft.Consenter) bool {
if len(c1) != len(c2) {
return false
}

consentersSet1 := c.consentersToMap(c1)
consentersSet2 := c.consentersToMap(c2)

return reflect.DeepEqual(consentersSet1, consentersSet2)
}

func (c *Chain) consentersToMap(c1 []*etcdraft.Consenter) map[string]*etcdraft.Consenter {
set := map[string]*etcdraft.Consenter{}
for _, c := range c1 {
set[string(c.ClientTlsCert)] = c
}
return set
}
57 changes: 42 additions & 15 deletions orderer/consensus/etcdraft/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,27 @@ var _ = Describe("Chain", func() {
clock = fakeclock.NewFakeClock(time.Now())
storage = raft.NewMemoryStorage()
observeC = make(chan uint64, 1)

support = &consensusmocks.FakeConsenterSupport{}
support.ChainIDReturns(channelID)
consenterMetadata = createMetadata(1, tlsCA)
support.SharedConfigReturns(&mockconfig.Orderer{
BatchTimeoutVal: time.Hour,
ConsensusMetadataVal: marshalOrPanic(consenterMetadata),
})
cutter = mockblockcutter.NewReceiver()
support.BlockCutterReturns(cutter)

membership := &raftprotos.RaftMetadata{
Consenters: map[uint64]*raftprotos.Consenter{},
NextConsenterID: 1,
}

for _, c := range consenterMetadata.Consenters {
membership.Consenters[membership.NextConsenterID] = c
membership.NextConsenterID++
}

opts = etcdraft.Options{
RaftID: 1,
Clock: clock,
Expand All @@ -87,19 +108,10 @@ var _ = Describe("Chain", func() {
HeartbeatTick: HEARTBEAT_TICK,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
Peers: []raft.Peer{{ID: 1}},
RaftMetadata: membership,
Logger: logger,
Storage: storage,
}
support = &consensusmocks.FakeConsenterSupport{}
support.ChainIDReturns(channelID)
consenterMetadata = createMetadata(3, tlsCA)
support.SharedConfigReturns(&mockconfig.Orderer{
BatchTimeoutVal: time.Hour,
ConsensusMetadataVal: marshalOrPanic(consenterMetadata),
})
cutter = mockblockcutter.NewReceiver()
support.BlockCutterReturns(cutter)

var err error
chain, err = etcdraft.NewChain(support, opts, configurator, nil, observeC)
Expand Down Expand Up @@ -487,7 +499,7 @@ var _ = Describe("Chain", func() {

Context("when a type B config update comes", func() {

Context("for existing channel", func() {
Context("updating protocol values", func() {
// use to prepare the Orderer Values
BeforeEach(func() {
values := map[string]*common.ConfigValue{
Expand Down Expand Up @@ -842,11 +854,25 @@ func newChain(timeout time.Duration, channel string, id uint64, all []uint64) *c
rpc := &mocks.FakeRPC{}
clock := fakeclock.NewFakeClock(time.Now())
storage := raft.NewMemoryStorage()
tlsCA, _ := tlsgen.NewCA()

peers := []raft.Peer{}
for _, i := range all {
peers = append(peers, raft.Peer{ID: i})
membership := &raftprotos.RaftMetadata{
Consenters: map[uint64]*raftprotos.Consenter{},
NextConsenterID: 1,
}

for _, raftID := range all {
membership.Consenters[uint64(raftID)] = &raftprotos.Consenter{
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
}
if uint64(raftID) > membership.NextConsenterID {
membership.NextConsenterID = uint64(raftID)
}
}
membership.NextConsenterID++

opts := etcdraft.Options{
RaftID: uint64(id),
Expand All @@ -856,7 +882,7 @@ func newChain(timeout time.Duration, channel string, id uint64, all []uint64) *c
HeartbeatTick: HEARTBEAT_TICK,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
Peers: peers,
RaftMetadata: membership,
Logger: flogging.NewFabricLogger(zap.NewNop()),
Storage: storage,
}
Expand Down Expand Up @@ -965,6 +991,7 @@ func (n *network) start(ids ...uint64) {
wg.Add(len(nodes))
for _, i := range nodes {
go func(id uint64) {
defer GinkgoRecover()
n.chains[id].Start()
n.chains[id].unstarted = nil

Expand Down
44 changes: 34 additions & 10 deletions orderer/consensus/etcdraft/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ func (c *Consenter) ReceiverByChain(channelID string) MessageReceiver {
return nil
}

func (c *Consenter) detectSelfID(m *etcdraft.Metadata) (uint64, error) {
func (c *Consenter) detectSelfID(consenters map[uint64]*etcdraft.Consenter) (uint64, error) {
var serverCertificates []string
for i, cst := range m.Consenters {
for nodeID, cst := range consenters {
serverCertificates = append(serverCertificates, string(cst.ServerTlsCert))
if bytes.Equal(c.Cert, cst.ServerTlsCert) {
return uint64(i + 1), nil
return nodeID, nil
}
}

Expand All @@ -99,16 +99,19 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
return nil, errors.New("etcdraft options have not been provided")
}

id, err := c.detectSelfID(m)
// determine raft replica set mapping for each node to its id
// for newly started chain we need to read and initialize raft
// metadata by creating mapping between conseter and its id.
// In case chain has been restarted we restore raft metadata
// information from the recently committed block meta data
// field.
raftMetadata, err := raftMetadata(metadata, m)

id, err := c.detectSelfID(raftMetadata.Consenters)
if err != nil {
return nil, errors.WithStack(err)
}

peers := make([]raft.Peer, len(m.Consenters))
for i := range peers {
peers[i].ID = uint64(i + 1)
}

opts := Options{
RaftID: id,
Clock: clock.NewClock(),
Expand All @@ -121,13 +124,34 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
MaxInflightMsgs: int(m.Options.MaxInflightMsgs),
MaxSizePerMsg: m.Options.MaxSizePerMsg,

Peers: peers,
RaftMetadata: raftMetadata,
}

rpc := &cluster.RPC{Channel: support.ChainID(), Comm: c.Communication}
return NewChain(support, opts, c.Communication, rpc, nil)
}

func raftMetadata(blockMetadata *common.Metadata, configMetadata *etcdraft.Metadata) (*etcdraft.RaftMetadata, error) {
membership := &etcdraft.RaftMetadata{
Consenters: map[uint64]*etcdraft.Consenter{},
NextConsenterID: 1,
}
if blockMetadata != nil && len(blockMetadata.Value) != 0 { // we have consenters mapping from block
if err := proto.Unmarshal(blockMetadata.Value, membership); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal block's metadata")
}
return membership, nil
}

// need to read consenters from the configuration
for _, consenter := range configMetadata.Consenters {
membership.Consenters[membership.NextConsenterID] = consenter
membership.NextConsenterID++
}

return membership, nil
}

func New(clusterDialer *cluster.PredicateDialer, conf *localconfig.TopLevel,
srvConf comm.ServerConfig, srv *comm.GRPCServer, r *multichannel.Registrar) *Consenter {
logger := flogging.MustGetLogger("orderer/consensus/etcdraft")
Expand Down
44 changes: 44 additions & 0 deletions orderer/consensus/etcdraft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ package etcdraft

import (
"encoding/pem"
"reflect"

"github.com/coreos/etcd/raft"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer/etcdraft"
"github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -111,3 +114,44 @@ func newBlockPuller(support consensus.ConsenterSupport,
Dialer: stdDialer,
}, nil
}

// RaftPeers maps consenters to slice of raft.Peer
func RaftPeers(consenters map[uint64]*etcdraft.Consenter) []raft.Peer {
var peers []raft.Peer

for raftID := range consenters {
peers = append(peers, raft.Peer{ID: raftID})
}
return peers
}

// ConsentersToMap maps consenters into set where key is client TLS certificate
func ConsentersToMap(consenters []*etcdraft.Consenter) map[string]struct{} {
set := map[string]struct{}{}
for _, c := range consenters {
set[string(c.ClientTlsCert)] = struct{}{}
}
return set
}

// MembershipByCert convert consenters map into set encapsulated by map
// where key is client TLS certificate
func MembershipByCert(consenters map[uint64]*etcdraft.Consenter) map[string]struct{} {
set := map[string]struct{}{}
for _, c := range consenters {
set[string(c.ClientTlsCert)] = struct{}{}
}
return set
}

// ConsentersChanged cheks whenever slice of new consenters contains changes for consenters mapping
func ConsentersChanged(oldConsenters map[uint64]*etcdraft.Consenter, newConsenters []*etcdraft.Consenter) bool {
if len(oldConsenters) != len(newConsenters) {
return false
}

consentersSet1 := MembershipByCert(oldConsenters)
consentersSet2 := ConsentersToMap(newConsenters)

return reflect.DeepEqual(consentersSet1, consentersSet2)
}
Loading

0 comments on commit fc0c4e9

Please sign in to comment.