-
Notifications
You must be signed in to change notification settings - Fork 22
/
acs.go
345 lines (321 loc) · 9.26 KB
/
acs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
package hbbft
import (
"fmt"
)
// ACSMessage represents a message sent between nodes in the ACS protocol.
type ACSMessage struct {
// Unique identifier of the "proposing" node.
ProposerID uint64
// Actual payload beeing sent.
Payload interface{}
}
// ACS implements the Asynchronous Common Subset protocol.
// ACS assumes a network of N nodes that send signed messages to each other.
// There can be f faulty nodes where (3 * f < N).
// Each participating node proposes an element for inlcusion. The protocol
// guarantees that all of the good nodes output the same set, consisting of
// at least (N -f) of the proposed values.
//
// Algorithm:
// ACS creates a Broadcast algorithm for each of the participating nodes.
// At least (N -f) of these will eventually output the element proposed by that
// node. ACS will also create and BBA instance for each participating node, to
// decide whether that node's proposed element should be inlcuded in common set.
// Whenever an element is received via broadcast, we imput "true" into the
// corresponding BBA instance. When (N-f) BBA instances have decided true we
// input false into the remaining ones, where we haven't provided input yet.
// Once all BBA instances have decided, ACS returns the set of all proposed
// values for which the decision was truthy.
type ACS struct {
// Config holds the ACS configuration.
Config
// Mapping of node ids and their rbc instance.
rbcInstances map[uint64]*RBC
// Mapping of node ids and their bba instance.
bbaInstances map[uint64]*BBA
// Results of the Reliable Broadcast.
rbcResults map[uint64][]byte
// Results of the Binary Byzantine Agreement.
bbaResults map[uint64]bool
// Final output of the ACS.
output map[uint64][]byte
// Que of ACSMessages that need to be broadcasted after each received
// and processed a message.
messageQue *messageQue
// Whether this ACS instance has already has decided output or not.
decided bool
// control flow tuples for internal channel communication.
closeCh chan struct{}
inputCh chan acsInputTuple
messageCh chan acsMessageTuple
}
// Control flow structure for internal channel communication. Allowing us to
// avoid the use of mutexes and eliminates race conditions.
type (
acsMessageTuple struct {
senderID uint64
msg *ACSMessage
err chan error
}
acsInputResponse struct {
rbcMessages []*BroadcastMessage
acsMessages []*ACSMessage
err error
}
acsInputTuple struct {
value []byte
response chan acsInputResponse
}
)
// NewACS returns a new ACS instance configured with the given Config and node
// ids.
func NewACS(cfg Config) *ACS {
if cfg.F == 0 {
cfg.F = (cfg.N - 1) / 3
}
acs := &ACS{
Config: cfg,
rbcInstances: make(map[uint64]*RBC),
bbaInstances: make(map[uint64]*BBA),
rbcResults: make(map[uint64][]byte),
bbaResults: make(map[uint64]bool),
messageQue: newMessageQue(),
closeCh: make(chan struct{}),
inputCh: make(chan acsInputTuple),
messageCh: make(chan acsMessageTuple),
}
// Create all the instances for the participating nodes
for _, id := range cfg.Nodes {
acs.rbcInstances[id] = NewRBC(cfg, id)
acs.bbaInstances[id] = NewBBA(cfg)
}
go acs.run()
return acs
}
// InputValue sets the input value for broadcast and returns an initial set of
// Broadcast and ACS Messages to be broadcasted in the network.
func (a *ACS) InputValue(val []byte) error {
t := acsInputTuple{
value: val,
response: make(chan acsInputResponse),
}
a.inputCh <- t
resp := <-t.response
return resp.err
}
// HandleMessage handles incoming messages to ACS and redirects them to the
// appropriate sub(protocol) instance.
func (a *ACS) HandleMessage(senderID uint64, msg *ACSMessage) error {
t := acsMessageTuple{
senderID: senderID,
msg: msg,
err: make(chan error),
}
a.messageCh <- t
return <-t.err
}
// handleMessage handles incoming messages to ACS and redirects them to the
// appropriate sub(protocol) instance.
func (a *ACS) handleMessage(senderID uint64, msg *ACSMessage) error {
switch t := msg.Payload.(type) {
case *AgreementMessage:
return a.handleAgreement(senderID, msg.ProposerID, t)
case *BroadcastMessage:
return a.handleBroadcast(senderID, msg.ProposerID, t)
default:
return fmt.Errorf("received unknown message (%v)", t)
}
}
// Output will return the output of the ACS instance. If the output was not nil
// then it will return the output else nil. Note that after consuming the output
// its will be set to nil forever.
func (a *ACS) Output() map[uint64][]byte {
if a.output != nil {
out := a.output
a.output = nil
return out
}
return nil
}
// Done returns true whether ACS has completed its agreements and cleared its
// messageQue.
func (a *ACS) Done() bool {
agreementsDone := true
for _, bba := range a.bbaInstances {
if !bba.done {
agreementsDone = false
}
}
return agreementsDone && a.messageQue.len() == 0
}
// inputValue sets the input value for broadcast and returns an initial set of
// Broadcast and ACS Messages to be broadcasted in the network.
func (a *ACS) inputValue(data []byte) error {
rbc, ok := a.rbcInstances[a.ID]
if !ok {
return fmt.Errorf("could not find rbc instance (%d)", a.ID)
}
reqs, err := rbc.InputValue(data)
if err != nil {
return err
}
if len(reqs) != a.N-1 {
return fmt.Errorf("expecting (%d) proof messages got (%d)", a.N, len(reqs))
}
for i, id := range uint64sWithout(a.Nodes, a.ID) {
a.messageQue.addMessage(&ACSMessage{a.ID, reqs[i]}, id)
}
for _, msg := range rbc.Messages() {
a.addMessage(a.ID, msg)
}
if output := rbc.Output(); output != nil {
a.rbcResults[a.ID] = output
a.processAgreement(a.ID, func(bba *BBA) error {
if bba.AcceptInput() {
return bba.InputValue(true)
}
return nil
})
}
return nil
}
func (a *ACS) stop() {
close(a.closeCh)
}
func (a *ACS) run() {
for {
select {
case <-a.closeCh:
return
case t := <-a.inputCh:
err := a.inputValue(t.value)
t.response <- acsInputResponse{err: err}
case t := <-a.messageCh:
t.err <- a.handleMessage(t.senderID, t.msg)
}
}
}
// handleAgreement processes the received AgreementMessage from sender (sid)
// for a value proposed by the proposing node (pid).
func (a *ACS) handleAgreement(sid, pid uint64, msg *AgreementMessage) error {
return a.processAgreement(pid, func(bba *BBA) error {
return bba.HandleMessage(sid, msg)
})
}
// handleBroadcast processes the received BroadcastMessage.
func (a *ACS) handleBroadcast(sid, pid uint64, msg *BroadcastMessage) error {
return a.processBroadcast(pid, func(rbc *RBC) error {
return rbc.HandleMessage(sid, msg)
})
}
func (a *ACS) processBroadcast(pid uint64, fun func(rbc *RBC) error) error {
rbc, ok := a.rbcInstances[pid]
if !ok {
return fmt.Errorf("could not find rbc instance for (%d)", pid)
}
if err := fun(rbc); err != nil {
return err
}
for _, msg := range rbc.Messages() {
a.addMessage(pid, msg)
}
if output := rbc.Output(); output != nil {
a.rbcResults[pid] = output
return a.processAgreement(pid, func(bba *BBA) error {
if bba.AcceptInput() {
return bba.InputValue(true)
}
return nil
})
}
return nil
}
func (a *ACS) processAgreement(pid uint64, fun func(bba *BBA) error) error {
bba, ok := a.bbaInstances[pid]
if !ok {
return fmt.Errorf("could not find bba instance for (%d)", pid)
}
if bba.done {
return nil
}
if err := fun(bba); err != nil {
return err
}
for _, msg := range bba.Messages() {
a.addMessage(pid, msg)
}
// Check if we got an output.
if output := bba.Output(); output != nil {
if _, ok := a.bbaResults[pid]; ok {
return fmt.Errorf("multiple bba results for (%d)", pid)
}
a.bbaResults[pid] = output.(bool)
// When received 1 from at least (N - f) instances of BA, provide input 0.
// to each other instance of BBA that has not provided his input yet.
if output.(bool) && a.countTruthyAgreements() == a.N-a.F {
for id, bba := range a.bbaInstances {
if bba.AcceptInput() {
if err := bba.InputValue(false); err != nil {
return err
}
for _, msg := range bba.Messages() {
a.addMessage(id, msg)
}
if output := bba.Output(); output != nil {
a.bbaResults[id] = output.(bool)
}
}
}
}
a.tryCompleteAgreement()
}
return nil
}
func (a *ACS) tryCompleteAgreement() {
if a.decided || a.countTruthyAgreements() < a.N-a.F {
return
}
if len(a.bbaResults) < a.N {
return
}
// At this point all bba instances have provided their output.
nodesThatProvidedTrue := []uint64{}
for id, ok := range a.bbaResults {
if ok {
nodesThatProvidedTrue = append(nodesThatProvidedTrue, id)
}
}
bcResults := make(map[uint64][]byte)
for _, id := range nodesThatProvidedTrue {
val, _ := a.rbcResults[id]
bcResults[id] = val
}
if len(nodesThatProvidedTrue) == len(bcResults) {
a.output = bcResults
a.decided = true
}
}
func (a *ACS) addMessage(from uint64, msg interface{}) {
for _, id := range uint64sWithout(a.Nodes, a.ID) {
a.messageQue.addMessage(&ACSMessage{from, msg}, id)
}
}
// countTruthyAgreements returns the number of truthy received agreement messages.
func (a *ACS) countTruthyAgreements() int {
n := 0
for _, ok := range a.bbaResults {
if ok {
n++
}
}
return n
}
func uint64sWithout(s []uint64, val uint64) []uint64 {
dest := []uint64{}
for i := 0; i < len(s); i++ {
if s[i] != val {
dest = append(dest, s[i])
}
}
return dest
}