Skip to content

Commit 7f12d1b

Browse files
guogeryacovm
authored andcommitted
[FAB-11162] Implement bare minimum Raft-based chain
This CR implements a bare minimum raft-based chain, which only supports single node. It does not support following features: - process config type message - raft snapshot - raft WAL - reconfigure raft This CR all pulls in a fake clock library for testing purpose. Change-Id: I5e93ff9eeb524ea1afd0348e1180fb9f4535fb5c Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
1 parent de6c840 commit 7f12d1b

File tree

16 files changed

+1952
-0
lines changed

16 files changed

+1952
-0
lines changed

Gopkg.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

orderer/consensus/consensus.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ type Chain interface {
6767
Halt()
6868
}
6969

70+
//go:generate counterfeiter -o mocks/mock_consenter_support.go . ConsenterSupport
71+
7072
// ConsenterSupport provides the resources available to a Consenter implementation.
7173
type ConsenterSupport interface {
7274
crypto.LocalSigner

orderer/consensus/etcdraft/chain.go

Lines changed: 351 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,351 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package etcdraft
8+
9+
import (
10+
"context"
11+
"sync"
12+
"sync/atomic"
13+
"time"
14+
15+
"github.com/hyperledger/fabric/common/flogging"
16+
"github.com/hyperledger/fabric/orderer/consensus"
17+
"github.com/hyperledger/fabric/protos/common"
18+
"github.com/hyperledger/fabric/protos/orderer"
19+
"github.com/hyperledger/fabric/protos/utils"
20+
21+
"code.cloudfoundry.org/clock"
22+
"github.com/coreos/etcd/raft"
23+
"github.com/coreos/etcd/raft/raftpb"
24+
"github.com/pkg/errors"
25+
)
26+
27+
// Storage essentially represents etcd/raft.MemoryStorage.
28+
//
29+
// This interface is defined to expose dependencies of fsm
30+
// so that it may be swapped in the future.
31+
//
32+
// TODO(jay) add other necessary methods to this interface
33+
// once we need them in implementation, e.g. ApplySnapshot
34+
type Storage interface {
35+
raft.Storage
36+
Append(entries []raftpb.Entry) error
37+
}
38+
39+
// Options contains necessary artifacts to start raft-based chain
40+
type Options struct {
41+
RaftID uint64
42+
43+
Clock clock.Clock
44+
45+
Storage Storage
46+
Logger *flogging.FabricLogger
47+
48+
TickInterval time.Duration
49+
ElectionTick int
50+
HeartbeatTick int
51+
MaxSizePerMsg uint64
52+
MaxInflightMsgs int
53+
Peers []raft.Peer
54+
}
55+
56+
// Chain implements consensus.Chain interface with raft-based consensus
57+
type Chain struct {
58+
raftID uint64
59+
60+
submitC chan *orderer.SubmitRequest
61+
commitC chan *common.Block
62+
observeC chan<- uint64 // notify external observer on leader change
63+
64+
haltC chan struct{}
65+
doneC chan struct{}
66+
67+
clock clock.Clock // test could inject a fake clock
68+
69+
support consensus.ConsenterSupport
70+
71+
leaderLock sync.RWMutex
72+
leader uint64
73+
appliedIndex uint64
74+
75+
node raft.Node
76+
storage Storage
77+
opts Options
78+
79+
logger *flogging.FabricLogger
80+
}
81+
82+
// NewChain constructs a chain object
83+
func NewChain(support consensus.ConsenterSupport, opts Options, observe chan<- uint64) (*Chain, error) {
84+
return &Chain{
85+
raftID: opts.RaftID,
86+
submitC: make(chan *orderer.SubmitRequest),
87+
commitC: make(chan *common.Block),
88+
haltC: make(chan struct{}),
89+
doneC: make(chan struct{}),
90+
observeC: observe,
91+
support: support,
92+
clock: opts.Clock,
93+
logger: opts.Logger.With("channel", support.ChainID()),
94+
storage: opts.Storage,
95+
opts: opts,
96+
}, nil
97+
}
98+
99+
// Start starts the chain
100+
func (c *Chain) Start() {
101+
config := &raft.Config{
102+
ID: c.raftID,
103+
ElectionTick: c.opts.ElectionTick,
104+
HeartbeatTick: c.opts.HeartbeatTick,
105+
MaxSizePerMsg: c.opts.MaxSizePerMsg,
106+
MaxInflightMsgs: c.opts.MaxInflightMsgs,
107+
Logger: c.logger,
108+
Storage: c.opts.Storage,
109+
}
110+
111+
c.node = raft.StartNode(config, c.opts.Peers)
112+
113+
go c.serveRaft()
114+
go c.serveRequest()
115+
}
116+
117+
// Order submits normal type transactions
118+
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
119+
return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Content: env}, 0)
120+
}
121+
122+
// Configure submits config type transactins
123+
func (c *Chain) Configure(env *common.Envelope, configSeq uint64) error {
124+
c.logger.Panicf("Configure not implemented yet")
125+
return nil
126+
}
127+
128+
// WaitReady is currently no-op
129+
func (c *Chain) WaitReady() error {
130+
return nil
131+
}
132+
133+
// Errored indicates if chain is still running
134+
func (c *Chain) Errored() <-chan struct{} {
135+
return c.doneC
136+
}
137+
138+
// Halt stops chain
139+
func (c *Chain) Halt() {
140+
select {
141+
case c.haltC <- struct{}{}:
142+
case <-c.doneC:
143+
return
144+
}
145+
<-c.doneC
146+
}
147+
148+
// Submit submits requests to
149+
// - local serveRequest go routine if this is leader
150+
// - actual leader via transport
151+
// - fails if there's no leader elected yet
152+
func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
153+
c.leaderLock.RLock()
154+
defer c.leaderLock.RUnlock()
155+
156+
if c.leader == raft.None {
157+
return errors.Errorf("no raft leader")
158+
}
159+
160+
if c.leader == c.raftID {
161+
select {
162+
case c.submitC <- req:
163+
return nil
164+
case <-c.doneC:
165+
return errors.Errorf("chain is stopped")
166+
}
167+
}
168+
169+
// TODO forward request to actual leader when we implement multi-node raft
170+
return errors.Errorf("only single raft node is currently supported")
171+
}
172+
173+
func (c *Chain) serveRequest() {
174+
clocking := false
175+
timer := c.clock.NewTimer(c.support.SharedConfig().BatchTimeout())
176+
if !timer.Stop() {
177+
// drain the channel. see godoc of time#Timer.Stop
178+
<-timer.C()
179+
}
180+
181+
for {
182+
seq := c.support.Sequence()
183+
184+
select {
185+
case msg := <-c.submitC:
186+
if c.isConfig(msg.Content) {
187+
c.logger.Panicf("Processing config envelope is not implemented yet")
188+
}
189+
190+
if msg.LastValidationSeq < seq {
191+
if _, err := c.support.ProcessNormalMsg(msg.Content); err != nil {
192+
c.logger.Warningf("Discarding bad normal message: %s", err)
193+
continue
194+
}
195+
}
196+
197+
batches, _ := c.support.BlockCutter().Ordered(msg.Content)
198+
if len(batches) == 0 {
199+
if !clocking {
200+
clocking = true
201+
timer.Reset(c.support.SharedConfig().BatchTimeout())
202+
}
203+
continue
204+
}
205+
206+
if !timer.Stop() && clocking {
207+
<-timer.C()
208+
}
209+
clocking = false
210+
211+
if err := c.commitBatches(batches...); err != nil {
212+
c.logger.Errorf("Failed to commit block: %s", err)
213+
}
214+
215+
case <-timer.C():
216+
clocking = false
217+
218+
batch := c.support.BlockCutter().Cut()
219+
if len(batch) == 0 {
220+
c.logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
221+
continue
222+
}
223+
224+
c.logger.Debugf("Batch timer expired, creating block")
225+
if err := c.commitBatches(batch); err != nil {
226+
c.logger.Errorf("Failed to commit block: %s", err)
227+
}
228+
229+
case <-c.doneC:
230+
c.logger.Infof("Stop serving requests")
231+
return
232+
}
233+
}
234+
}
235+
236+
func (c *Chain) commitBatches(batches ...[]*common.Envelope) error {
237+
for _, batch := range batches {
238+
b := c.support.CreateNextBlock(batch)
239+
data := utils.MarshalOrPanic(b)
240+
if err := c.node.Propose(context.TODO(), data); err != nil {
241+
return errors.Errorf("failed to propose data to raft: %s", err)
242+
}
243+
244+
select {
245+
case block := <-c.commitC:
246+
if utils.IsConfigBlock(block) {
247+
c.logger.Panicf("Config block is not supported yet")
248+
}
249+
c.support.WriteBlock(block, nil)
250+
251+
case <-c.doneC:
252+
return nil
253+
}
254+
}
255+
256+
return nil
257+
}
258+
259+
func (c *Chain) serveRaft() {
260+
ticker := c.clock.NewTicker(c.opts.TickInterval)
261+
262+
for {
263+
select {
264+
case <-ticker.C():
265+
c.node.Tick()
266+
267+
case rd := <-c.node.Ready():
268+
c.storage.Append(rd.Entries)
269+
// TODO send messages to other peers when we implement multi-node raft
270+
c.apply(c.entriesToApply(rd.CommittedEntries))
271+
c.node.Advance()
272+
273+
if rd.SoftState != nil {
274+
c.leaderLock.Lock()
275+
newLead := atomic.LoadUint64(&rd.SoftState.Lead)
276+
if newLead != c.leader {
277+
c.logger.Infof("Raft leader changed on node %x: %x -> %x", c.raftID, c.leader, newLead)
278+
c.leader = newLead
279+
280+
// notify external observer
281+
select {
282+
case c.observeC <- newLead:
283+
default:
284+
}
285+
}
286+
c.leaderLock.Unlock()
287+
}
288+
289+
case <-c.haltC:
290+
close(c.doneC)
291+
ticker.Stop()
292+
c.node.Stop()
293+
c.logger.Infof("Raft node %x stopped", c.raftID)
294+
return
295+
}
296+
}
297+
}
298+
299+
func (c *Chain) apply(ents []raftpb.Entry) {
300+
for i := range ents {
301+
switch ents[i].Type {
302+
case raftpb.EntryNormal:
303+
if len(ents[i].Data) == 0 {
304+
break
305+
}
306+
307+
c.commitC <- utils.UnmarshalBlockOrPanic(ents[i].Data)
308+
309+
case raftpb.EntryConfChange:
310+
var cc raftpb.ConfChange
311+
if err := cc.Unmarshal(ents[i].Data); err != nil {
312+
c.logger.Warnf("Failed to unmarshal ConfChange data: %s", err)
313+
continue
314+
}
315+
316+
c.node.ApplyConfChange(cc)
317+
}
318+
319+
c.appliedIndex = ents[i].Index
320+
}
321+
}
322+
323+
// this is taken from coreos/contrib/raftexample/raft.go
324+
func (c *Chain) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) {
325+
if len(ents) == 0 {
326+
return
327+
}
328+
329+
firstIdx := ents[0].Index
330+
if firstIdx > c.appliedIndex+1 {
331+
c.logger.Panicf("first index of committed entry[%d] should <= progress.appliedIndex[%d]+1", firstIdx, c.appliedIndex)
332+
}
333+
334+
// If we do have unapplied entries in nents.
335+
// | applied | unapplied |
336+
// |----------------|----------------------|
337+
// firstIdx appliedIndex last
338+
if c.appliedIndex-firstIdx+1 < uint64(len(ents)) {
339+
nents = ents[c.appliedIndex-firstIdx+1:]
340+
}
341+
return nents
342+
}
343+
344+
func (c *Chain) isConfig(env *common.Envelope) bool {
345+
h, err := utils.ChannelHeader(env)
346+
if err != nil {
347+
c.logger.Panicf("programming error: failed to extract channel header from envelope")
348+
}
349+
350+
return h.Type == int32(common.HeaderType_CONFIG) || h.Type == int32(common.HeaderType_ORDERER_TRANSACTION)
351+
}

0 commit comments

Comments
 (0)