-
Notifications
You must be signed in to change notification settings - Fork 0
/
inflight.go
93 lines (78 loc) · 2.11 KB
/
inflight.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package yaft
import (
"sync"
)
// Inflight is used to track operations that are still in-flight
type inflight struct {
sync.Mutex
commitCh chan *DeferLog
operations map[uint64]*inflightLog
}
// inflightLog represents a single log entry that is in-flight
type inflightLog struct {
future *DeferLog
commitCount int
quorum int
}
// NewInflight returns an in-flight struct that notifies
// the provided channel when logs are finished committing.
func NewInflight(commitCh chan *DeferLog) *inflight {
return &inflight{
commitCh: commitCh,
operations: make(map[uint64]*inflightLog),
}
}
// Start is used to mark a logFuture as being inflight
func (i *inflight) Start(l *DeferLog, quorum int) {
i.Lock()
defer i.Unlock()
op := &inflightLog{
future: l,
commitCount: 0,
quorum: quorum,
}
i.operations[l.log.Index] = op
}
// Cancel is used to cancel all in-flight operations.
// This is done when the leader steps down, and all futures
// are sent the given error.
func (i *inflight) Cancel(err error) {
i.Lock()
defer i.Unlock()
// Respond to all in-flight operations
for _, op := range i.operations {
op.future.response = err
if op.future.DeferError.errCh == nil || cap(op.future.DeferError.errCh) == 0 {
//fmt.Println("NIL channel *****")
op.future.DeferError.init()
}
//fmt.Println("Pushing into channel", cap(op.future.DeferError.errCh))
op.future.DeferError.errCh <- err
//fmt.Println("Pushing into channel ends")
op.future.response = nil
op.future.Response()
}
// Clear the map
i.operations = make(map[uint64]*inflightLog)
}
// Commit is used by leader replication routines to indicate that
// a follower was finished committing a log to disk.
func (i *inflight) Commit(index uint64) {
i.Lock()
defer i.Unlock()
op, ok := i.operations[index]
if !ok {
// Ignore if not in the map, as it may be committed already
return
}
// Increment the commit count
op.commitCount++
// Check if we have committed this
if op.commitCount < op.quorum {
return
}
// Stop tracking since it is committed
delete(i.operations, index)
// Notify of commit
i.commitCh <- op.future
}