Skip to content

Commit c1ff9e6

Browse files
guogeryacovm
authored andcommitted
[FAB-12552] Add support of type B tx for raft
This commit adds support for Raft cluster reconfiguration, takes care to extract cluster config changes from the config block and apply those changes to the replic set. Change-Id: I564fafa418757e565ec2c0d872174d5d4f1ad719 Signed-off-by: Artem Barger <bartem@il.ibm.com> Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
1 parent 99a7f5e commit c1ff9e6

File tree

7 files changed

+773
-152
lines changed

7 files changed

+773
-152
lines changed

common/configtx/util.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package configtx
99
import (
1010
"github.com/golang/protobuf/proto"
1111
cb "github.com/hyperledger/fabric/protos/common"
12+
"github.com/hyperledger/fabric/protos/utils"
1213
)
1314

1415
// UnmarshalConfig attempts to unmarshal bytes to a *cb.Config
@@ -86,3 +87,17 @@ func UnmarshalConfigEnvelopeOrPanic(data []byte) *cb.ConfigEnvelope {
8687
}
8788
return result
8889
}
90+
91+
// UnmarshalConfigUpdateFromPayload unmarshals configuration update from given payload
92+
func UnmarshalConfigUpdateFromPayload(payload *cb.Payload) (*cb.ConfigUpdate, error) {
93+
configEnv, err := UnmarshalConfigEnvelope(payload.Data)
94+
if err != nil {
95+
return nil, err
96+
}
97+
configUpdateEnv, err := utils.EnvelopeToConfigUpdate(configEnv.LastUpdate)
98+
if err != nil {
99+
return nil, err
100+
}
101+
102+
return UnmarshalConfigUpdate(configUpdateEnv.ConfigUpdate)
103+
}

orderer/consensus/etcdraft/chain.go

Lines changed: 138 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"context"
1111
"encoding/pem"
1212
"fmt"
13-
"reflect"
1413
"sync"
1514
"sync/atomic"
1615
"time"
@@ -112,6 +111,10 @@ type Chain struct {
112111
startC chan struct{} // Closes when the node is started
113112
snapC chan *raftpb.Snapshot // Signal to catch up with snapshot
114113

114+
configChangeApplyC chan struct{} // Notifies that a Raft configuration change has been applied
115+
configChangeInProgress uint32 // Flag to indicate node waiting for Raft config change to be applied
116+
raftMetadataLock sync.RWMutex
117+
115118
clock clock.Clock // Tests can inject a fake clock
116119

117120
support consensus.ConsenterSupport
@@ -168,28 +171,29 @@ func NewChain(
168171
}
169172

170173
return &Chain{
171-
configurator: conf,
172-
rpc: rpc,
173-
channelID: support.ChainID(),
174-
raftID: opts.RaftID,
175-
submitC: make(chan *orderer.SubmitRequest),
176-
commitC: make(chan block),
177-
haltC: make(chan struct{}),
178-
doneC: make(chan struct{}),
179-
resignC: make(chan struct{}),
180-
startC: make(chan struct{}),
181-
syncC: make(chan struct{}),
182-
snapC: make(chan *raftpb.Snapshot),
183-
observeC: observeC,
184-
support: support,
185-
fresh: fresh,
186-
appliedIndex: appliedi,
187-
lastSnapBlockNum: snapBlkNum,
188-
puller: puller,
189-
clock: opts.Clock,
190-
logger: lg,
191-
storage: storage,
192-
opts: opts,
174+
configurator: conf,
175+
rpc: rpc,
176+
channelID: support.ChainID(),
177+
raftID: opts.RaftID,
178+
submitC: make(chan *orderer.SubmitRequest),
179+
commitC: make(chan block),
180+
haltC: make(chan struct{}),
181+
doneC: make(chan struct{}),
182+
resignC: make(chan struct{}),
183+
startC: make(chan struct{}),
184+
syncC: make(chan struct{}),
185+
snapC: make(chan *raftpb.Snapshot),
186+
configChangeApplyC: make(chan struct{}),
187+
observeC: observeC,
188+
support: support,
189+
fresh: fresh,
190+
appliedIndex: appliedi,
191+
lastSnapBlockNum: snapBlkNum,
192+
puller: puller,
193+
clock: opts.Clock,
194+
logger: lg,
195+
storage: storage,
196+
opts: opts,
193197
}, nil
194198
}
195199

@@ -219,10 +223,10 @@ func (c *Chain) Start() {
219223
raftPeers := RaftPeers(c.opts.RaftMetadata.Consenters)
220224

221225
if c.fresh {
222-
c.logger.Infof("starting new raft node %d", c.raftID)
226+
c.logger.Info("starting new raft node")
223227
c.node = raft.StartNode(config, raftPeers)
224228
} else {
225-
c.logger.Infof("restarting raft node %d", c.raftID)
229+
c.logger.Info("restarting raft node")
226230
c.node = raft.RestartNode(config)
227231
}
228232

@@ -261,15 +265,7 @@ func (c *Chain) checkConfigUpdateValidity(ctx *common.Envelope) error {
261265
case int32(common.HeaderType_ORDERER_TRANSACTION):
262266
return nil
263267
case int32(common.HeaderType_CONFIG):
264-
configEnv, err := configtx.UnmarshalConfigEnvelope(payload.Data)
265-
if err != nil {
266-
return err
267-
}
268-
configUpdateEnv, err := utils.EnvelopeToConfigUpdate(configEnv.LastUpdate)
269-
if err != nil {
270-
return err
271-
}
272-
configUpdate, err := configtx.UnmarshalConfigUpdate(configUpdateEnv.ConfigUpdate)
268+
configUpdate, err := configtx.UnmarshalConfigUpdateFromPayload(payload)
273269
if err != nil {
274270
return err
275271
}
@@ -479,14 +475,18 @@ func (c *Chain) serveRequest() {
479475
}
480476

481477
func (c *Chain) writeBlock(b block) {
482-
c.opts.RaftMetadata.RaftIndex = b.i
483-
m := utils.MarshalOrPanic(c.opts.RaftMetadata)
484-
485478
if utils.IsConfigBlock(b.b) {
486-
c.support.WriteConfigBlock(b.b, m)
479+
if err := c.writeConfigBlock(b); err != nil {
480+
c.logger.Panicf("failed to write configuration block, %+v", err)
481+
}
487482
return
488483
}
489484

485+
c.raftMetadataLock.Lock()
486+
c.opts.RaftMetadata.RaftIndex = b.i
487+
m := utils.MarshalOrPanic(c.opts.RaftMetadata)
488+
c.raftMetadataLock.Unlock()
489+
490490
c.support.WriteBlock(b.b, m)
491491
}
492492

@@ -668,9 +668,29 @@ func (c *Chain) apply(ents []raftpb.Entry) {
668668
break
669669
}
670670

671-
b := block{utils.UnmarshalBlockOrPanic(ents[i].Data), ents[i].Index}
672-
c.commitC <- b
673-
appliedb = b.b.Header.Number
671+
b := utils.UnmarshalBlockOrPanic(ents[i].Data)
672+
// need to check whenever given block carries updates
673+
// which will lead to membership change and eventually
674+
// to the cluster reconfiguration
675+
c.raftMetadataLock.RLock()
676+
m := c.opts.RaftMetadata
677+
c.raftMetadataLock.RUnlock()
678+
679+
isConfigMembershipUpdate, err := IsMembershipUpdate(b, m)
680+
if err != nil {
681+
c.logger.Warnf("Error while attempting to determine membership update, due to %s", err)
682+
}
683+
// if error occurred isConfigMembershipUpdate will be false, hence will skip setting config change in
684+
// progress
685+
if isConfigMembershipUpdate {
686+
// set flag config change is progress only if config block
687+
// and has updates for raft replica set
688+
atomic.StoreUint32(&c.configChangeInProgress, uint32(1))
689+
}
690+
691+
c.commitC <- block{b, ents[i].Index}
692+
693+
appliedb = b.Header.Number
674694
position = i
675695

676696
case raftpb.EntryConfChange:
@@ -681,6 +701,14 @@ func (c *Chain) apply(ents []raftpb.Entry) {
681701
}
682702

683703
c.confState = *c.node.ApplyConfChange(cc)
704+
705+
// assert that configuration changes result of the processing
706+
// of configuration block of type B
707+
isConfChangeInProgress := atomic.LoadUint32(&c.configChangeInProgress)
708+
if isConfChangeInProgress == 1 {
709+
// signal that config changes has been applied
710+
c.configChangeApplyC <- struct{}{}
711+
}
684712
}
685713

686714
if ents[i].Index > c.appliedIndex {
@@ -780,56 +808,93 @@ func (c *Chain) pemToDER(pemBytes []byte, id uint64, certType string) ([]byte, e
780808
return bl.Bytes, nil
781809
}
782810

811+
// checkConsentersSet validates correctness of the consenters set provided within configuration value
783812
func (c *Chain) checkConsentersSet(configValue *common.ConfigValue) error {
784-
consensusTypeValue := &orderer.ConsensusType{}
785-
if err := proto.Unmarshal(configValue.Value, consensusTypeValue); err != nil {
786-
return errors.Wrap(err, "failed to unmarshal consensusType config update")
813+
// read metadata update from configuration
814+
updatedMetadata, err := MetadataFromConfigValue(configValue)
815+
if err != nil {
816+
return err
787817
}
788818

789-
updatedMetadata := &etcdraft.Metadata{}
790-
if err := proto.Unmarshal(consensusTypeValue.Metadata, updatedMetadata); err != nil {
791-
return errors.Wrap(err, "failed to unmarshal updated (new) etcdraft metadata configuration")
792-
}
819+
c.raftMetadataLock.RLock()
820+
changes := ComputeMembershipChanges(c.opts.RaftMetadata.Consenters, updatedMetadata.Consenters)
821+
c.raftMetadataLock.RUnlock()
793822

794-
if !ConsentersChanged(c.opts.RaftMetadata.Consenters, updatedMetadata.Consenters) {
795-
return errors.New("update of consenters set is not supported yet")
823+
if changes.TotalChanges > 1 {
824+
return errors.New("update of more than one consenters at a time is not supported")
796825
}
797826

798827
return nil
799828
}
800829

801-
func (c *Chain) consentersChanged(newConsenters []*etcdraft.Consenter) bool {
802-
if len(c.opts.RaftMetadata.Consenters) != len(newConsenters) {
803-
return false
830+
// updateMembership updates raft metadata with new membership changes, apply raft changes to replica set
831+
// by proposing config change and blocking until it get applied
832+
func (c *Chain) updateMembership(metadata *etcdraft.RaftMetadata, change *raftpb.ConfChange) error {
833+
lead := atomic.LoadUint64(&c.leader)
834+
// leader to propose configuration change
835+
if lead == c.raftID {
836+
if err := c.node.ProposeConfChange(context.TODO(), *change); err != nil {
837+
return errors.Errorf("failed to propose configuration update to Raft node: %s", err)
838+
}
804839
}
805840

806-
consentersSet1 := c.membershipByCert()
807-
consentersSet2 := c.consentersToMap(newConsenters)
841+
var err error
842+
843+
select {
844+
case <-c.configChangeApplyC:
845+
// update metadata once we have block committed
846+
c.raftMetadataLock.Lock()
847+
c.opts.RaftMetadata = metadata
848+
c.raftMetadataLock.Unlock()
849+
850+
// new we need to reconfigure the communication layer with new updates
851+
err = c.configureComm()
852+
case <-c.resignC:
853+
// leadership has changed, new leader will have to take care
854+
// of reading last config block re-propose config update
855+
c.logger.Debug("Raft cluster leader has changed, new leader should re-propose config change based on last config block")
856+
case <-c.doneC:
857+
c.logger.Debug("shutting down node, aborting config change update")
858+
}
808859

809-
return reflect.DeepEqual(consentersSet1, consentersSet2)
860+
// set flag back
861+
atomic.StoreUint32(&c.configChangeInProgress, uint32(0))
862+
return err
810863
}
811864

812-
func (c *Chain) membershipByCert() map[string]struct{} {
813-
set := map[string]struct{}{}
814-
for _, c := range c.opts.RaftMetadata.Consenters {
815-
set[string(c.ClientTlsCert)] = struct{}{}
865+
// writeConfigBlock writes configuration blocks into the ledger in
866+
// addition extracts updates about raft replica set and if there
867+
// are changes updates cluster membership as well
868+
func (c *Chain) writeConfigBlock(b block) error {
869+
metadata, err := ConsensusMetadataFromConfigBlock(b.b)
870+
if err != nil {
871+
c.logger.Panicf("error reading consensus metadata, because of %s", err)
816872
}
817-
return set
818-
}
819873

820-
func (c *Chain) consentersToMap(consenters []*etcdraft.Consenter) map[string]struct{} {
821-
set := map[string]struct{}{}
822-
for _, c := range consenters {
823-
set[string(c.ClientTlsCert)] = struct{}{}
874+
c.raftMetadataLock.RLock()
875+
raftMetadata := proto.Clone(c.opts.RaftMetadata).(*etcdraft.RaftMetadata)
876+
// proto.Clone doesn't copy an empty map, hence need to initialize it after
877+
// cloning
878+
if raftMetadata.Consenters == nil {
879+
raftMetadata.Consenters = map[uint64]*etcdraft.Consenter{}
880+
}
881+
c.raftMetadataLock.RUnlock()
882+
883+
var changes *MembershipChanges
884+
if metadata != nil {
885+
changes = ComputeMembershipChanges(raftMetadata.Consenters, metadata.Consenters)
824886
}
825-
return set
826-
}
827887

828-
func (c *Chain) membershipToRaftPeers() []raft.Peer {
829-
var peers []raft.Peer
888+
confChange := changes.UpdateRaftMetadataAndConfChange(raftMetadata)
889+
raftMetadata.RaftIndex = b.i
830890

831-
for raftID := range c.opts.RaftMetadata.Consenters {
832-
peers = append(peers, raft.Peer{ID: raftID})
891+
raftMetadataBytes := utils.MarshalOrPanic(raftMetadata)
892+
// write block with metadata
893+
c.support.WriteConfigBlock(b.b, raftMetadataBytes)
894+
if confChange != nil {
895+
if err := c.updateMembership(raftMetadata, confChange); err != nil {
896+
return errors.Wrap(err, "failed to update Raft with consenters membership changes")
897+
}
833898
}
834-
return peers
899+
return nil
835900
}

0 commit comments

Comments
 (0)