-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.ml
1220 lines (1094 loc) · 46.3 KB
/
server.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
open Lwt
open Websocket
open Websocket_cohttp_lwt
open Log
open Request_vote_rpc
open Append_entries_rpc
open Yojson.Basic.Util
type role = | Follower | Candidate | Leader
type ip = (string*int) list (* (ip * port) *)
type next_index = (string*int) list (* (id * next index) *)
type match_index = (string*int) list (* (id * match index) *)
type state = {
mutable id : string;
mutable leader_id: string;
mutable role : role;
mutable curr_term : int;
mutable voted_for : string option;
mutable log : (int * entry) list; (* index * entry list *)
mutable commit_index : int;
mutable last_applied : int;
mutable heartbeat : float;
mutable neighboring_ips : ip;
mutable next_index_lst : next_index;
mutable match_index_lst : match_index;
mutable received_heartbeat : bool;
mutable started : bool;
mutable is_server : bool;
}
let get_ae_response_from = ref []
let index_responses = ref []
(* the lower range of the election timeout, in th is case 150-300ms*)
let generate_heartbeat () =
let lower = 0.150 in
let range = 0.150 in
let timer = (Random.float range) +. lower in
timer
let serv_state = {
id = "";
leader_id = "";
role = Follower;
curr_term = 0;
voted_for = None;
log = [];
commit_index = 0;
last_applied = 0;
heartbeat = 0.;
neighboring_ips = [];
next_index_lst = [];
match_index_lst = [];
received_heartbeat = false;
started = false;
is_server = true;
}
(* [get_my_addr ()] gets the current address of this host *)
let get_my_addr () =
(Unix.gethostbyname(Unix.gethostname())).Unix.h_addr_list.(0)
(* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
NON-STATE SERVER FIELDS
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *)
(* keeps track of votes in the election *)
let vote_counter = ref 0
(* a string * (input_channel * output_channel) list
* mapping each server ID to its ic and oc *)
let channels = ref []
let listen_address = get_my_addr ()
let port = 9000
let backlog = 10
let () = Lwt_log.add_rule "*" Lwt_log.Info
let hb_interval = (Lwt_unix.sleep 1.)
(* (append_entries_req * int) list that maps the specific request to the number
* of responses to it with success=true *)
let ae_req_to_count = ref []
(* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
WEBSOCKET CLIENT SERVER FIELDS
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *)
let res_client_msg = ref "connected!"
let leader_ip = ref ""
(* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
HELPER FUNCTIONS
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *)
(* [id_from_oc cl oc] takes an output channel [oc] and channels+id list [cl] and
* finds the id corresponding to the channel [oc] *)
let rec id_from_oc cl oc =
match cl with
| [] -> None
| (ip, (_, oc2))::t -> if (oc == oc2) then Some ip else id_from_oc t oc
(* [last_entry ()] is the last entry added to the server's log
* The log must be sorted in reverse chronological order *)
let last_entry () =
match serv_state.log with
| [] -> None
| (_, e)::_ -> Some e
(* [get_p_log_idx ()] returns the 1-based index of the most recently added
* entry in the log *)
let get_p_log_idx () =
match last_entry () with
| None -> 0
| Some e -> e.index
(* [get_p_log_term ()] returns the 1-based term of the most recently added
* entry in the log *)
let get_p_log_term () =
match last_entry () with
| None -> 0
| Some e -> e.entry_term
(* [full_addr_str p] returns the concatenation of this server's IP and port *)
let full_addr_str port_num =
Unix.string_of_inet_addr (get_my_addr ()) ^ ":" ^ (string_of_int port_num)
(* [change_heartbeat ()] changes the current server's heartbeat to a randomized,
* value on a fixed interval *)
let change_heartbeat () =
let new_heartbeat = generate_heartbeat () in
serv_state.heartbeat <- new_heartbeat
(* [update_neighbors ips id] updates this server's neighboring IPs list with
* [ips] and updates the server's [id] *)
let update_neighbors ips id =
serv_state.neighboring_ips <- ips;
serv_state.id <- id
(* [send_msg str oc] is a Leader-side function that sends the string message
* [msg] to the Follower connected by output channel [oc] *)
let send_msg str oc =
print_endline ("sending: "^str);
ignore (Lwt_io.write_line oc str); Lwt_io.flush oc
(* [stringify_e e] converts an entry record [e] to a string *)
let stringify_e (e:entry): string =
let json =
"{" ^
"\"value\":" ^ (string_of_int e.value) ^ "," ^
"\"entry_term\":" ^ (string_of_int e.entry_term) ^ "," ^
"\"index\":" ^ (string_of_int e.index) ^
"}"
in json
(* [nindex_from_id id] takes a server id [id] and the next_index_lst and
* finds the nextIndex of server [id] *)
let nindex_from_id id =
match List.assoc_opt id serv_state.next_index_lst with
| None -> -1
| Some i -> i
(* [req_append_entries msg ip oc] is a Leader-side function that sends an
* AppendEntries Request to the Follower at output channel [oc] *)
let req_append_entries (msg : append_entries_req) (ip : string) oc =
let string_entries = List.map (fun x -> stringify_e x) msg.entries in
let entries_str =
List.fold_right (fun x y -> x ^ "," ^ y) string_entries "" in
let final_entries_str = "[" ^
(String.sub entries_str 0 ((String.length entries_str) - 1)) ^ "]" in
let json =
"{" ^
"\"type\": \"appd_req\"," ^
"\"ap_term\":" ^ (string_of_int msg.ap_term) ^"," ^
"\"leader_id\":" ^ "\"" ^ (msg.leader_id) ^ "\"," ^
"\"prev_log_index\": " ^ (string_of_int msg.prev_log_index) ^ "," ^
"\"prev_log_term\": " ^ (string_of_int msg.prev_log_term) ^ "," ^
"\"entries\":" ^ final_entries_str
^ "," ^
"\"leader_commit\":" ^ (string_of_int msg.leader_commit) ^
"}"
in
send_msg json oc
(* [res_append_entries ae_res oc] is a Follower-side function that sends the
* stringified append entries response [ae_res] to the output channel [oc] *)
let res_append_entries (ae_res:append_entries_res) oc =
if (not serv_state.is_server) then Lwt.return(()) else
let json =
"{" ^
"\"type\":" ^ "\"appd_res\"" ^ "," ^
"\"success\":" ^ string_of_bool ae_res.success ^ "," ^
"\"curr_term\":" ^ string_of_int ae_res.curr_term ^
"}"
in
send_msg json oc
(* [json_es entries] should return a list of entries parsed from the string
* [entries].
* - requires: [entries] has at least one entry in it
*)
let json_es (entries): entry list =
let json = Yojson.Basic.Util.to_list entries in
let assoc_list = List.map (fun x -> Yojson.Basic.Util.to_assoc x) (json) in
let extract_record e =
let value = Yojson.Basic.Util.to_int (List.assoc "value" e) in
let entry_term = Yojson.Basic.Util.to_int (List.assoc "entry_term" e) in
let ind = Yojson.Basic.Util.to_int (List.assoc "index" e) in
{
value = value;
entry_term = entry_term;
index = ind;
}
in
let ocaml_entries = List.map extract_record assoc_list in
ocaml_entries
(* [send_rpcs f] is a Leader-side function that recursively sends RPCs to every
* ip in [ips] using the partially applied function [f], which is assumed to be
* one of the following:
* [req_append_entries msg]
* [req_request_vote msg] *)
let rec send_rpcs f =
let lst_o = List.map (fun (ip, channel) -> channel) !channels in
let rec send_to_ocs lst =
match lst with
| [] -> print_endline "sent all rpcs!"
| (ic, oc)::t -> f oc; send_to_ocs t in
send_to_ocs lst_o
(* [mismatch_log l pli plt] is a Follower-side function that returns true if log
* [l] doesnt contain an entry at index [pli] with entry term [plt];
* else false *)
let mismatch_log (my_log:(int*entry) list) prev_log_index prev_log_term =
(* returns nth entry of the log *)
if (List.length my_log <= 1) then false
else if prev_log_index > (List.length my_log + 1) then true
else
match (List.find_opt (fun (i,e) -> i = prev_log_index) my_log) with
| None -> true
| Some (_,e) -> if e.entry_term = prev_log_term then false else true
(* [process_conflicts entries] is a Follower-side function that goes through the
* server's log and removes entries that conflict (same index different term)
* with those in [entries] *)
let process_conflicts entries =
(* [does_conflict e1 e2] returns true if e1 has a different term than e2
* -requires e1 and e2 to have the same index *)
let does_conflict log_e new_e =
if log_e.entry_term = new_e.entry_term then false else true in
(* given a new entry e, go through the log and return a new (shorter) log
* if we find a conflict; otherwise return the original log *)
let rec iter_log old_l new_l new_e =
match new_l with
| [] -> old_l
| (i,log_e)::t ->
if (i = new_e.index && does_conflict log_e new_e) then t
else iter_log old_l t new_e in
let old_log = serv_state.log in
(* [iter_entries el s_log] looks for conflicts for each entry in [el]
* and keeps track of the shortest log [short_log] *)
let rec iter_entries el short_log =
match el with
| [] -> short_log
| e::t -> let new_log = iter_log old_log old_log e in
if (List.length new_log < List.length short_log) then
iter_entries t new_log
else iter_entries t short_log in
let n_log = iter_entries entries old_log in
serv_state.log <- n_log
(* [append_new_entries entries] is a Follower-side function that adds the new
* entries to the log. The entries must be in reverse chronological order as
* with the log *)
let rec append_new_entries (entries : entry list) : unit =
let entries = List.rev_append entries [] in
let rec append_new (entries: entry list):unit =
match entries with
| [] -> ()
| h::t ->
begin
if List.exists (fun (_,e) -> e = h) serv_state.log
then append_new t
else
let old_st_log = serv_state.log in
let new_ind = List.length old_st_log + 1 in
let new_entry = {h with index = new_ind} in
let new_addition = (new_ind, new_entry) in
serv_state.log <- new_addition::old_st_log;
append_new t
end
in
append_new entries
(* [check_majority ()] is a Leader-side function that checks if a majority for a
* specific entry index has been reached, and if so, updates the commit_index. *)
let check_majority () =
let total_num_servers = List.length serv_state.neighboring_ips in
let index_to_commit = if (total_num_servers = 0) then (List.length (serv_state.log)) else
match List.find_opt (fun (ind, count) -> count >= (total_num_servers/2)) !index_responses with
| None -> serv_state.commit_index
| Some (ind, count) -> ind in
serv_state.commit_index <- index_to_commit
(* [send_heartbeat oc ()] is a Leader-side function that sends one heartbeat to
* the Follower server corresponding to output channel [oc] *)
let rec send_heartbeat oc () =
check_majority ();
let ind_to_send = (List.length (serv_state.log)) - serv_state.commit_index in
let int_entry_tuple =
match List.nth_opt (serv_state.log) ind_to_send with
| None -> (1,
{value=0;
entry_term=(-1);
index=(-1)}
)
| Some x -> x in
let string_entry = stringify_e (snd int_entry_tuple) in
let final_entries_str = "[" ^ string_entry ^ "]" in
ignore (Lwt_io.write_line oc (
"{" ^
"\"type\":\"heartbeat\"," ^
"\"leader_id\":" ^ "\"" ^ serv_state.id ^ "\"" ^ "," ^
"\"curr_term\":" ^ string_of_int serv_state.curr_term ^ "," ^
"\"prev_log_index\": " ^ (get_p_log_idx () |> string_of_int) ^ "," ^
"\"prev_log_term\": " ^ (get_p_log_term () |> string_of_int) ^ "," ^
"\"entries\": " ^ final_entries_str ^ "," ^
"\"leader_commit\":" ^ string_of_int serv_state.commit_index ^
"}"));
ignore (Lwt_io.flush oc);
let id_of_oc occ =
match (List.find_opt (fun (_, (_, o)) -> o == occ) (!channels)) with
| Some (idd, (i, oo)) -> idd
| None -> "" in
List.iter (fun (oc, rpc) ->
ignore (req_append_entries rpc (id_of_oc oc) oc); ())
!get_ae_response_from;
Lwt.on_termination (Lwt_unix.sleep serv_state.heartbeat)
(fun () -> send_heartbeat oc ())
(* [create_rpc msg i t] is a Leader-side function that creates an rpc to be sent
* to the servers based on the [msg] containing the value the client wants to
* add as well as the leader's term and the index of the entry in the log. *)
let create_rpc msg id p_log_idx p_log_term =
(* let p_log_idx = get_p_log_idx () in *)
(* let p_log_term = get_p_log_term () in *)
let e = [] in
let next_index = nindex_from_id id in
let entries_ =
let rec add_relevant es = function
| [] -> es
| (i, e)::t ->
if i >= next_index
then add_relevant (e::es) t
else add_relevant es t
in
add_relevant e (List.rev serv_state.log)
in
{
ap_term = serv_state.curr_term;
leader_id = serv_state.id;
prev_log_index = p_log_idx;
prev_log_term = p_log_term;
entries = entries_;
leader_commit = serv_state.commit_index
}
(* [force_conform id] is a Leader-side function that forces a Follower server
* with id [id] to conform to the leader's log if there is an inconsistency
* between the logs (aka the AERes success would be false) *)
let force_conform id =
let ni = nindex_from_id id in
(* update the nextIndex for this server to be ni - 1 *)
let new_indices =
List.filter (fun (lst_ip, _) -> lst_ip <> id) serv_state.next_index_lst in
serv_state.next_index_lst <- (id, ni-1)::new_indices;
()
(* [update_matchIndex oc] is a Leader-side function that finds the id of the
* server corresponding to [oc] and updates its matchIndex in this server's
* matchIndex list *)
let rec update_match_index oc =
match (id_from_oc !channels oc) with
| None -> failwith "uh wtf"
| Some id ->
(* basically rebuild the entire matchIndex list lol *)
let rec apply build mi_list idx =
match mi_list with
| [] -> failwith "this should literally never happen"
| (s,i)::t ->
if s = idx then
(* note: nextIndex - matchIndex > 1 if and only if a new
* leader comes into power with a significantly larger log
* which is a result of unifying a network partition, which
* is NOT a feature that we support *)
(let n_matchi = List.length serv_state.log in
serv_state.match_index_lst <- ([(s,n_matchi)]@t@build); ())
else apply ((s,i)::build) t idx
in
apply [] serv_state.match_index_lst id; ()
(* [update_next_index ] is a Leader-side function that updates the next_index
* for a given Follower server connected to [oc] *)
let update_next_index oc =
let (ip, (_,_)) =
List.find (fun (_, (_, list_oc)) -> oc == list_oc) !channels in
let new_indices =
List.filter (fun (lst_ip, _) -> lst_ip <> ip) serv_state.next_index_lst in
serv_state.next_index_lst <- (ip, List.length serv_state.log)::new_indices
(* [update_commit_index ()] is a Leader-side function that updates the
* commit_index for the Leader by finding an N such that N > commit_index, a
* majority of matchIndex values >= N, and the term of the Nth entry in the
* leader's log is equal to curr_term *)
let update_commit_index () =
(* upper bound on N, which is the index of the last entry *)
let ub = get_p_log_idx () in
let init_N = serv_state.commit_index + 1 in
(* find whether the majority of followers have matchIndex >= N *)
let rec mi_geq_n count total n li =
match li with
| [] -> (count > (total / 2))
| (_,i)::t ->
if i >= n then mi_geq_n (count+1) total n t
else mi_geq_n count total n t in
(* find the highest such n, n <= ub, such that the above function
* returns true
*)
let rec find_n ub n high =
let l = serv_state.match_index_lst in
if n > ub then high
else if (mi_geq_n 0 (List.length l) n l) then find_n ub (n+1) n
else find_n ub (n+1) high in
let old_ci = serv_state.commit_index in
let n_ci = find_n ub init_N serv_state.commit_index in
assert (n_ci >= old_ci);
serv_state.commit_index <- n_ci; ()
(* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
MAIN SERVER FUNCTIONS
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *)
let read_neighboring_ips port_num =
let ip_regex = "[0-9]*.[0-9]*.[0-9]*.[0-9]*" in
let rec process_file f_channel =
try
let line = Pervasives.input_line f_channel in
let _ = Str.search_forward (Str.regexp ip_regex) line 0 in
let ip_str = Str.matched_string line in
let ip_len = String.length ip_str in
let port_int = int_of_string (Str.string_after line (ip_len + 1)) in
let new_ip = (ip_str, port_int) in
if new_ip <> ( Unix.string_of_inet_addr (get_my_addr ()), port_num) then
let updated_ips = new_ip::serv_state.neighboring_ips in
serv_state.neighboring_ips <- updated_ips;
else
();
process_file f_channel
with
| End_of_file -> Pervasives.close_in f_channel; ()
in
process_file (Pervasives.open_in "ips.txt")
(* [req_request_vote ballot oc] is a Candidate-side function that sets a
* RequestVote Request to the Follower at output channel [oc] with a ballot
* [ballot] containing the information necessary for the Follower to make a
* vote decision. *)
let req_request_vote ballot oc =
let json =
"{\"type\": \"vote_req\",\"term\": " ^
(string_of_int ballot.term) ^
",\"candidate_id\": \"" ^ ballot.candidate_id ^
"\",\"last_log_index\": " ^ (string_of_int ballot.last_log_index) ^
",\"last_log_term\": " ^ (string_of_int ballot.last_log_term) ^ "}"
in send_msg json oc
(* [res_request_vote msg oc] is a Follower-side function that handles receiving
* a vote request message [msg] from the Candidate at output channel [oc] *)
let res_request_vote msg oc =
let candidate_id = msg |> member "candidate_id" |> to_string in
let continue = match serv_state.voted_for with
| None -> true
| Some id -> id = candidate_id in
let otherTerm = msg |> member "term" |> to_int in
let vote_granted = continue && otherTerm >= serv_state.curr_term in
if (vote_granted) then serv_state.voted_for <- (Some candidate_id);
let json =
"{\"type\": \"vote_res\"," ^
" \"curr_term\": " ^ (string_of_int serv_state.curr_term) ^
",\"vote_granted\": " ^ (string_of_bool vote_granted) ^
"}"
in send_msg json oc
(* [send_heartbeats ()] is a Leader-side function that sends heartbeats to all
* the Followers *)
let send_heartbeats () =
let lst_o = List.map (fun (ip, chans) -> chans) !channels in
let rec send_to_ocs lst =
match lst with
| (ic, oc)::t ->
begin
let start_timer oc_in =
Lwt.on_termination (Lwt_unix.sleep serv_state.heartbeat)
(fun () -> send_heartbeat oc_in ())
in
ignore (Thread.create start_timer oc); send_to_ocs t;
end
| [] -> () in
send_to_ocs lst_o
(* [act_all ()] is a simple check that all servers perform regularly, regardless
* of role. It should be called at the start of every iteration of one of the
* act functions for roles *)
let act_all () =
let la = serv_state.last_applied in
if serv_state.commit_index > la then
(serv_state.last_applied <- la + 1); ()
(* [process_leader_death ()] handles leader death by removing the leader from
* neighboring_ips in order to reflect that the server has gone done in order
* to correctly represent the new number of connections, and to reset the
* leader_id state. This is called upon a new election
*)
let process_leader_death () =
let l_id = serv_state.leader_id in
if l_id <> "" then
let ip_regex = "[0-9]*.[0-9]*.[0-9]*.[0-9]*" in
let port_regex = "[0-9]*" in
let _ = Str.search_forward (Str.regexp ip_regex) l_id 0 in
let ip = Str.matched_string l_id in
let _ = Str.search_forward (Str.regexp port_regex) l_id 0 in
let ip_len = String.length ip in
let port = int_of_string (Str.string_after l_id (ip_len + 1)) in
serv_state.neighboring_ips <-
List.filter (fun (i,p) -> i <> ip || p <> port)serv_state.neighboring_ips;
channels := List.remove_assoc l_id !channels;
serv_state.leader_id <- ""; ()
(* [start_election ()] is a Follower-side function that starts the election for
* this server by incrementing its term and sending RequestVote RPCs to every
* other server in the clique *)
let rec start_election () =
print_endline "election started!";
(* increment term and vote for self *)
let curr_term = serv_state.curr_term in
serv_state.curr_term <- curr_term + 1;
serv_state.voted_for <- (Some serv_state.id);
vote_counter := 1;
(* ballot is a vote_req *)
let ballot = {
term = serv_state.curr_term;
candidate_id = serv_state.id;
last_log_index = get_p_log_idx ();
last_log_term = get_p_log_term ();
} in
print_endline "sending rpcs...";
send_rpcs (req_request_vote ballot);
(* [act_leader ()] is a Leader-side function that executes all leader
* responsibilities, namely sending RPCs and listening for client requests
*
* if a leader receives a client request, they will process it accordingly *)
and act_leader () =
(* start thread to periodically send heartbeats *)
print_endline "I am a leader----------------------------------------------";
update_commit_index ();
act_all();
send_heartbeats (); ()
(* [init_leader ()] is a Leader-side function that initializes the state to be
* ready for Leader functions *)
and init_leader () =
let rec build_match_index build ips =
match ips with
| [] -> serv_state.match_index_lst <- build
| (inum,port)::t ->
let nbuild = (inum^":"^(string_of_int port), 0)::build in
build_match_index nbuild t in
let rec build_next_index build ips n_idx =
match ips with
| [] -> serv_state.next_index_lst <- build
| (inum,port)::t ->
let nbuild = (inum^":"^(string_of_int port), n_idx)::build in
build_next_index nbuild t n_idx in
print_endline "Initialized a leader!";
build_match_index [] serv_state.neighboring_ips;
let n_idx = (get_p_log_idx ()) + 1 in
build_next_index [] serv_state.neighboring_ips n_idx;
act_leader ();
(* [act_candidate ()] is a Candidate-side function that executes all candidate
* responsibilities, namely sending vote requests and ending an election as a
* winner/loser/stall
*
* if a candidate receives a client request, they will reply with an error *)
and act_candidate () =
(* if the candidate is still a follower, then start a new election
* otherwise terminate. *)
print_endline "I am a candidate-------------------------------------------";
act_all ();
let check_election_complete () =
(* if false, then election has not completed, so start new election.
* Otherwise if true, then don't do anything (equiv of cancelling timer)
*)
if serv_state.role = Candidate
then begin
serv_state.voted_for <- None;
Lwt.on_termination (Lwt_unix.sleep serv_state.heartbeat)
(fun () -> act_candidate ())
end
else () in
(* call act_candidate again if timer runs out *)
start_election ();
(* continuously check if election has completed and
* listen for responses to the req_votes *)
if (List.length serv_state.neighboring_ips)<=1 then win_election ();
Lwt.on_termination (Lwt_unix.sleep serv_state.heartbeat)
(fun () -> check_election_complete ())
(* [init_candidate ()] is a Candidate-side function that makes the transition to
* acting as a Candidate server. It serves no purpose other than to call the
* act_candidate function, but makes for a clean, consistent style. *)
and init_candidate () =
act_candidate ()
(* [act_follower ()] is a Follower-side function that executes all follower
* responsibilities, namely starting elections, responding to RPCs, and
* redirecting client calls to the Leader
*
* if a Follower receives a client request, they will send it as a special RPC
* to the Leader, and then receive the special RPC and reply back to the client
*)
and act_follower () =
print_endline "I am a follower--------------------------------------------";
serv_state.role <- Follower;
act_all ();
(* check if the timeout has expired, and that it has voted for no one *)
if (serv_state.voted_for = None && serv_state.received_heartbeat = false)
then begin
process_leader_death ();
serv_state.role <- Candidate;
init_candidate ()
end
(* if condition satisfied, continue being follower, otherwise start elec *)
else begin
serv_state.received_heartbeat <- false;
Lwt.on_termination (Lwt_unix.sleep (serv_state.heartbeat))
(fun () -> act_follower ())
end
(* [init_follower ()] is a Follower-side function that initializes a server
* to act as a follower by beginning a timeout cycle the same length as the
* server's heartbeat. This is a server side function, the client should never
* run this. *)
and init_follower () =
Lwt.on_termination (Lwt_unix.sleep serv_state.heartbeat)
(fun () -> (act_follower ()));
(* [win_election ()] is a Candidate-side function that transitions the server
* from a Candidate role to a Leader role and executes the appropriate actions
*)
and win_election () =
(* transition to Leader role *)
serv_state.role <- Leader;
(* send heartbeats *)
init_leader ()
(* [lose_election ()] is a Candidate-side function that transitions the server
* from a candidate to a follower and executes the appropriate actions *)
and lose_election () =
(* transition to Follower role *)
serv_state.role <- Follower;
act_follower ()
(* [terminate_election ()] is a Candidate-side function that executes when
* timeout occurs in the middle of an election with no resolution
* (i.e. no one wins or loses) *)
and terminate_election () =
start_election ()
(* [id_from_oc cl oc] grabs the server id of the given [oc] in a list of [cl]
* Returns string option. *)
let rec id_from_oc cl oc =
match cl with
| [] -> None
| (ip, (_, oc2))::t -> if (oc == oc2) then Some ip else id_from_oc t oc
(* [handle_precheck t] checks the term of the sending server and updates this
* server's term if it is outdated; also immediately reverts to follower role
* if not already a follower *)
let handle_precheck t =
let b = serv_state.role = Candidate || serv_state.role = Leader in
if t > serv_state.curr_term && b then
(serv_state.curr_term <- t;
(init_follower ());())
else if t > serv_state.curr_term then
serv_state.curr_term <- t;()
(* [handle_ae_req msg oc] is a Follower-side function that handles an
* AppendEntries Request [msg] send by the Leader at output channel [oc] and
* replies back with an AppendEntries Response via [oc] *)
let handle_ae_req msg oc =
let ap_term = msg |> member "ap_term" |> to_int in
let prev_log_index = msg |> member "prev_log_index" |> to_int in
let prev_log_term = msg |> member "prev_log_term" |> to_int in
let entries = msg |> member "entries" |> json_es in
let leader_commit = msg |> member "leader_commit" |> to_int in
handle_precheck ap_term;
let success_bool =
if ap_term < serv_state.curr_term then false (* 1 *)
else not (mismatch_log serv_state.log prev_log_index prev_log_term)
in
let ae_res = {
success = success_bool;
curr_term = serv_state.curr_term;
} in
if (success_bool) then
(process_conflicts entries; (* 3 *)
append_new_entries entries; (* 4 *)
if leader_commit > serv_state.commit_index then
(let new_commit = min leader_commit (get_p_log_idx ()) in
serv_state.commit_index <- new_commit); (* 5 *)
);
res_append_entries ae_res oc
(* [handle_ae_res msg oc] is a Leader-side function that handles an
* AppendEntries Response [msg] from a Follower at output channel [oc] *)
let handle_ae_res msg oc =
let curr_term = msg |> member "curr_term" |> to_int in
let success = msg |> member "success" |> to_bool in
handle_precheck curr_term;
let servid = match (id_from_oc !channels oc) with
| None -> "should be impossible"
| Some s -> s in
if (success) then
begin
match List.find_opt (fun (oc_l, rpc_l) -> oc == oc_l) !get_ae_response_from with
| None -> ()
| Some (o,r) ->
let last_entry_serv_committed =
begin try (List.hd (r.entries)).index with
| _ -> failwith "Impossible. Leader should always have at least one entry to send."
end in
let latest_ind_for_server =
match List.assoc_opt servid serv_state.match_index_lst with
| None -> (*serv_state.matchIndexList <-
(servid, if success then last_entry_serv_committed else 0)::serv_state.matchIndexList; 0*)
failwith "should not occur because we already update match index?"
| Some i -> i in
(*index responses is in decreasing log index, and it is (ind of entry, num servers
that contain that entry)*)
let num_to_add = match !index_responses with
| (indd, co)::t -> indd + 1
| [] -> 1 in
let rec add_to_index_responses ind_to_add ind_to_stop =
if (ind_to_add > ind_to_stop) then ()
else
(index_responses := (ind_to_add, 1)::!index_responses;
add_to_index_responses (ind_to_add + 1) ind_to_stop) in
add_to_index_responses num_to_add (last_entry_serv_committed);
assert (List.length !index_responses > 0);
index_responses := ((List.map (fun (ind, count) ->
if (ind > latest_ind_for_server && ind <= last_entry_serv_committed)
then (ind, (count + 1)) else (ind, count)) !index_responses));
check_majority ();
get_ae_response_from := (List.remove_assq oc !get_ae_response_from);
update_match_index oc;
update_next_index oc
end
else begin
force_conform servid;
let serv_id = match id_from_oc !channels oc with
| Some id -> id
| None -> failwith "oc should have corresponding id" in
let pli = get_p_log_idx () in
let plt = get_p_log_term () in
let tuple_to_add = (oc, create_rpc msg serv_id pli plt) in
get_ae_response_from := !get_ae_response_from @ (tuple_to_add::[]);
check_majority ();
get_ae_response_from := (List.remove_assq oc !get_ae_response_from);
end
(* [handle_vote_req msg oc] is a Follower-side function that handles receiving
* a vote request from a candidate. This will generally set the server's state
* to be a llower and proceed with a new election.
* requires:
* -[msg] is a valid vote_res json
*)
let handle_vote_req msg oc =
(* at this point, the current leader has died, so need to delete leader *)
process_leader_death ();
let t = msg |> member "term" |> to_int in
handle_precheck t;
ignore (res_request_vote msg oc); ()
(* [handle_vote_res msg] is a Candidate-side function that handles receiving a
* vote response message
* requires:
* -[msg] is a valid vote_res json
*)
let handle_vote_res msg =
let currTerm = msg |> member "curr_term" |> to_int in
let voted = msg |> member "vote_granted" |> to_bool in
handle_precheck currTerm;
if voted then vote_counter := !vote_counter + 1;
if serv_state.role <> Leader && !vote_counter >
(((List.length serv_state.neighboring_ips)) / 2)
then win_election ()
(* [process_heartbeat msg] is a Follower-side function that handles receiving
* heartbeats from the Leader
* requires:
* -[msg] is a valid json with "leader_id" and "leader_commit" fields
*)
let process_heartbeat msg =
let l_id = msg |> member "leader_id" |> to_string in
let leader_commit = msg |> member "leader_commit" |> to_int in
(* if the leader ip that the client server has does not match current
* leader id, update the leader ip *)
let one_entry_in_list = json_es (msg |> member "entries") in
if (List.length one_entry_in_list > 0) then
res_client_msg := string_of_int ((List.hd (one_entry_in_list)).value);
if (not serv_state.is_server) && !leader_ip <> l_id then leader_ip := l_id;
if leader_commit > serv_state.commit_index
then begin
begin
match List.assoc_opt serv_state.commit_index serv_state.log with
| None -> ()
| Some {value=v} -> res_client_msg := string_of_int v
end;
serv_state.leader_id <- l_id;
serv_state.commit_index <- min leader_commit (get_p_log_idx ())
end
else serv_state.leader_id <- l_id; serv_state.voted_for <- None
(* [update_output_channels oc msg] is for all servers.
* Updates the output channels list with the ip
* the output channel [oc] belongs to. It parses the [msg] for the ip. *)
let update_output_channels oc msg =
let ip = msg |> member "ip" |> to_string in
let chans = List.find (fun (_, (_, orig_oc)) -> orig_oc == oc) !channels in
let c_lst = List.filter (fun (_, (_, orig_oc)) -> orig_oc != oc)!channels in
channels := (ip, snd chans)::c_lst
(* [handle_message msg oc] is the function that handles receiving messages from
* other servers/the client. All servers use this function
* requires:
* -[msg] is a valid json with a "type" field with a value in the pattern
* match statement
*)
let handle_message msg oc =
print_endline ("received: "^msg);
serv_state.received_heartbeat <- true;
let msg = Yojson.Basic.from_string msg in
let msg_type = msg |> member "type" |> to_string in
match msg_type with
| "oc" -> update_output_channels oc msg; ()
| "heartbeat" -> print_endline "this is a heart"; process_heartbeat msg; ()
| "sendall" -> send_heartbeats (); ()
| "vote_req" -> handle_vote_req msg oc; ()
| "vote_res" -> handle_vote_res msg; ()
| "appd_req" ->
begin
if serv_state.role = Candidate
then ignore (lose_election ());
ignore (handle_ae_req msg oc);
()
end
| "appd_res" -> handle_ae_res msg oc; ()
| "find_leader" ->
let res_id = if (serv_state.role = Leader) then serv_state.id
else serv_state.leader_id in
let res = "{\"type\": \"find_leader_res\", \"leader\": \""^res_id^"\"}" in
ignore (send_msg res oc); ()
| "find_leader_res" -> leader_ip := (msg |> member "leader" |> to_string)
| "client" ->
(* create the append_entries_rpc *)
let new_entry = {
value = msg |> member "value" |> to_int;
entry_term = serv_state.curr_term;
index = (get_p_log_idx ()) + 1;
} in
let pli = get_p_log_idx () in
let plt = get_p_log_term () in
let old_log = serv_state.log in
let new_idx = (List.length old_log) + 1 in
serv_state.log <- (new_idx,new_entry)::old_log;
let channels_without_clients = List.filter
(fun (l_id, _) -> (l_id <> "")) !channels in
let output_channels_to_rpc =
List.map (fun (id,(_,oc)) -> (oc, create_rpc msg id pli plt))
channels_without_clients in
get_ae_response_from := (!get_ae_response_from @ output_channels_to_rpc);
()
| _ -> ()
(* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
* *
* *
* EVERYTHING AFTER THIS POINT IS NETWORKING/SERVER STUFF WHICH IS GENERALLY *
* SEPARATE FROM RAFT IMPLEMENTATION DETAILS *
* *
* *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *)
(* [handle_connection ic oc ()] handles incoming messages on [ic], processes
* them and responds on [oc].
*)
let rec handle_connection ic oc () =
Lwt_io.read_line_opt ic >>=
(fun (msg) ->
match msg with
| (Some m) ->
handle_message m oc;
(handle_connection ic oc) ();
| None -> begin Lwt_log.info "Connection closed." >>= return end)
(* [send_ip oc] sends the current server's ip to [oc] as a json *)
let send_ip oc =
let json =
"{" ^
"\"type\": \"oc\"," ^
"\"ip\": \"" ^ serv_state.id ^ "\"" ^
"}"
in
send_msg json oc
(* [setup_connections ()] sets up the connections between all the servers by
* establishing the mapping between [serv_state.id] and [(ic, oc)] pairs such
* that communication can occur between servers
*)
let setup_connections () =
List.iter (fun (_,(_,oc)) -> ignore (send_ip oc); ()) !channels;
let chans = List.map (fun (ips, ic_ocs) -> ic_ocs) !channels in
List.iter (fun (ic, oc) -> Lwt.on_failure (handle_connection ic oc ())
(fun e -> Lwt_log.ign_error (Printexc.to_string e));) chans
(* [init_server ()] starts up this server as a follower and anticipates an
* election. That is, this should ONLY be called as soon as the server begins
* running (and after it has set up connections with all other servers) *)
let init_server () =
change_heartbeat ();
setup_connections ();
serv_state.started <- true;
init_follower ()
(* [accept_connection conn] accepts an incoming connection and retrieves the
* [(ic, oc)] pair that results from this connection. This also will add the
* pair to the [channels] ref, and also will decide whether to start the server
* depending on the number of connections
*)
let accept_connection conn =
let fd, _ = conn in
let ic = Lwt_io.of_fd Lwt_io.Input fd in
let oc = Lwt_io.of_fd Lwt_io.Output fd in
let otherl = !channels in
let ip = "" in
channels := ((ip, (ic, oc))::otherl);
let iplistlen = List.length (serv_state.neighboring_ips) in