-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.go
227 lines (188 loc) · 6.08 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package main
import (
"context"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
pb "github.com/devMYC/raft/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
const (
port = ":8080"
)
// Server handles RPC requests between nodes in a cluster to update
// state of consensus module. Requests from client will be processed
// if the current server is the leader.
type Server struct {
cm *ConsensusModule
peers map[int]pb.RpcClient
pb.UnimplementedRpcServer
}
// RequestVote RPC is received when a node becomes candidate and tries
// to collect votes from other servers to be elected as the new leader.
func (s *Server) RequestVote(ctx context.Context, in *pb.RequestVoteArgs) (*pb.RequestVoteResult, error) {
s.cm.mu.Lock()
defer s.cm.mu.Unlock()
resp := &pb.RequestVoteResult{
Term: int32(s.cm.state.currentTerm),
VoteGranted: false,
}
term := int(in.Term)
candidateID := int(in.CandidateId)
if term < s.cm.state.currentTerm {
return resp, nil
}
entry, i := s.cm.state.getLastLogEntry()
if term > s.cm.state.currentTerm {
// Higher term discovered, fall back to follower.
s.cm.becomeFollower(term, s.peers)
resp.Term = in.Term
}
if term == s.cm.state.currentTerm &&
(s.cm.state.votedFor == -1 || s.cm.state.votedFor == candidateID) &&
(entry == nil || in.LastLogTerm > entry.Term || in.LastLogTerm == entry.Term && int(in.LastLogIndex) >= i) {
// If we haven't voted for any candidate, or the requester is the one we voted for.
// Then compare our own log with the candidate's log,
// grant vote only if candidate's log is as up-to-date as ours.
s.cm.latestUpdateAt = time.Now() // update time to avoid election timeout
resp.VoteGranted = true
s.cm.state.votedFor = candidateID
}
return resp, nil
}
// AppendEntries RPC replicates log entries from leader node to other
// nodes in the cluster. This RPC also serves as heartbeat check when
// the log entry list in the request is empty. This heartbeat will
// prevent followers becoming candidate and start new elections.
func (s *Server) AppendEntries(ctx context.Context, in *pb.AppendEntriesArgs) (*pb.AppendEntriesResult, error) {
s.cm.mu.Lock()
defer s.cm.mu.Unlock()
term := int(in.Term)
resp := &pb.AppendEntriesResult{
Term: int32(s.cm.state.currentTerm),
Success: false,
}
if term < s.cm.state.currentTerm {
return resp, nil
}
if term > s.cm.state.currentTerm || s.cm.role == Candidate {
// If higher term is discovered, or we're still collecting votes.
// Then fall back to follower.
s.cm.becomeFollower(term, s.peers)
} else {
s.cm.latestUpdateAt = time.Now() // update time to avoid election timeout
}
_, idx := s.cm.state.getLastLogEntry()
if in.PrevLogIndex >= 0 && (in.PrevLogIndex > int32(idx) || s.cm.state.log[in.PrevLogIndex].Term != in.PrevLogTerm) {
// Our log does not contain an entry at PrevLogIndex,
// or the log entry at PrevLogIndex has a different term than PrevLogTerm.
return resp, nil
}
resp.Success = true
i := int(in.PrevLogIndex + 1) // index pointing to entries in our log
j := 0 // index pointing to entries in request
for ; i < len(s.cm.state.log) && j < len(in.Entries); i, j = i+1, j+1 {
if s.cm.state.log[i].Term != in.Entries[j].Term {
// Found first entry that does not match
break
}
}
leaderCommit := int(in.LeaderCommit)
if j < len(in.Entries) {
// Delete all log entries starting from the first unmatched one
// from our log sequence, and append all remaining entries in request.
s.cm.state.log = append(s.cm.state.log[:i], in.Entries[j:]...)
}
if leaderCommit > s.cm.state.commitIndex {
s.cm.state.commitIndex = Min(leaderCommit, len(s.cm.state.log)-1)
// Apply commited log entries from the one after the latest applied one.
for _, entry := range s.cm.state.log[s.cm.state.lastApplied+1 : s.cm.state.commitIndex+1] {
log.Printf("[AppendEntries] applying log entry=%+v to state machine\n", *entry)
s.cm.state.lastApplied = int(entry.Idx)
}
}
return resp, nil
}
// ClientRequest RPC appends new log entry containing the `command`
// to be applied to the state machine only if the server is the leader.
func (s *Server) ClientRequest(ctx context.Context, in *pb.ClientRequestArgs) (*pb.ClientRequestResult, error) {
s.cm.mu.Lock()
defer s.cm.mu.Unlock()
resp := &pb.ClientRequestResult{
IsLeader: false,
}
if s.cm.role != Leader {
return resp, nil
}
resp.IsLeader = true
s.cm.state.log = append(s.cm.state.log, &pb.LogEntry{
Idx: int32(len(s.cm.state.log)),
Term: int32(s.cm.state.currentTerm),
Cmd: in.Cmd,
})
log.Printf("[ClientRequest] new command=%s appended\n", in.Cmd)
return resp, nil
}
func main() {
args := os.Args[1:]
peerIDStrs := strings.Split(args[1], ",")
id, err := strconv.Atoi(args[0])
if err != nil {
log.Fatalf("Invalid node ID '%s'.\n", args[0])
}
peers := make(map[int]pb.RpcClient)
peerIds := make([]int, 0, len(peerIDStrs))
var cm *ConsensusModule
var wg sync.WaitGroup
wg.Add(len(peerIDStrs))
for _, s := range peerIDStrs {
peerID, err := strconv.Atoi(s)
if err != nil {
log.Fatalf("Invalid peer ID '%s'.\n", s)
}
peerIds = append(peerIds, peerID)
peerAddr := "node" + s + port
go func() {
time.Sleep(3 * time.Second)
log.Printf("Peer Address: %s\n", peerAddr)
conn, err := grpc.Dial(peerAddr, grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to dial peer '%s': %v\n", peerAddr, err)
}
for {
time.Sleep(time.Second)
if conn.GetState() == connectivity.Ready {
log.Printf("Connection to node %d %s\n", peerID, conn.GetState().String())
wg.Done()
break
}
}
cm.mu.Lock()
defer cm.mu.Unlock()
peers[peerID] = pb.NewRpcClient(conn)
}()
}
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("Failed to listen: %v\n", err)
}
cm = NewConsensusModule(id, peerIds)
s := grpc.NewServer()
pb.RegisterRpcServer(s, &Server{
cm: cm,
peers: peers,
})
log.Printf("server listening at: %v\n", lis.Addr())
go func() {
wg.Wait()
cm.prepareElection(0, peers)
}()
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v\n", err)
}
}