-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.rb
191 lines (178 loc) · 6.54 KB
/
main.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
require 'yaml'
require './paxos'
require './app/app'
CONFIG = YAML.load(File.read('./config.yaml'))
Paxos::LocalData['local_addr'] = ARGV[0]
Paxos::LocalData['local_port'] =
Paxos::LocalData['local_addr'].match(/:(\d+)$/)[1].to_i
Paxos::H['addrs'] = CONFIG['addrs']
Paxos::H['leader'] = Paxos::H['addrs'][0]
Paxos::HeartbeatTimeout = (CONFIG['heartbeat_timeout'] || 1) # seconds
puts "pid: #{Process.pid}, ADDR: #{Paxos::LocalData['local_addr']}, PORT: #{Paxos::LocalData['local_port']}, H: #{Paxos::H}"
if CONFIG['perftools']
require 'perftools'
PerfTools::CpuProfiler.start("tmp/#{ARGV[0]}perftool")
end
# heartbeat listener thread
heartbeat_thread = Thread.new do
Thread.current.abort_on_exception = true
local_addr = Paxos::LocalData['local_addr']
loop do
sleep(Paxos::HeartbeatTimeout + 0.5*rand)
next if local_addr == Paxos::H['leader']
if Paxos.should_become_leader?
puts "TRYTOBELEADER!!!!!!!!! #{Process.pid} #{Time.now}"
command = "#{Paxos::App::Command::SET} leader #{local_addr}"
p = Paxos.propose Paxos.smallest_executable_id, command,
timeout: 1, wait_for_all_acceptors: true
if p.is_a?(Paxos::SuccessfulProposal) and local_addr == Paxos::H['leader']
Paxos::ClientHandlerQueue << Paxos::ClientHandlerRefreshId
puts "WE BECAME THE LEADER!!!!! #{Process.pid} #{Time.now}"
end
end
end
end
# Client handler thread
clienthandler_thread = Thread.new do
Thread.current.abort_on_exception = true
local_addr = Paxos::LocalData['local_addr']
db = Paxos.disk_conn
id = Paxos.smallest_executable_id
n = Paxos::SmallestProposeN - 1
loop do
command, client = Paxos::ClientHandlerQueue.pop
if command == Paxos::ClientHandlerRefreshId
id = Paxos.smallest_executable_id(db); next
end
unless local_addr == Paxos::H['leader']
client.puts "Please contact the leader: #{Paxos::H['leader']}"
client.close; next
end
# Run multi-paxos (omit prepare messages).
# We always chose n as `Paxos::SmallestProposeN - 1` so that it is
# impossible for us to overwrite an established consensus for this id,
# and that there can only be three possible outcomes:
# 1. A majority of replicas successfully executes our command
# 2. We got ignored by some replicas because they held promises
# with larger n's
# 3. Network glitch
acc_msgs, ign_msgs = Paxos.send_accept_msgs id, n, command
if acc_msgs.size > (Paxos::H['addrs'].size.to_f / 2)
client.puts acc_msgs[0].exec_result
id += 1
elsif ign_msgs.size > 0
client.puts "Please contact the leader: #{Paxos::H['leader']}"
id += 1
else
client.puts "Please contact the leader: #{Paxos::H['leader']}"
end
client.close
end
end
# catchup thread
catchup_thread = Thread.new do
Thread.current.abort_on_exception = true
local_addr = Paxos::LocalData['local_addr']
db = Paxos.disk_conn
loop do
end_id = Paxos::CatchupQueue.pop
puts "!#{end_id}CATCHING UP#{Paxos.smallest_executable_id}!!!!!!"
(Paxos.smallest_executable_id..end_id).each do |id|
# propose until our acceptor has accepted for this id
loop do
p = Paxos.propose id, Paxos::App::Command::NoOp, disk_conn: db
break if p.accepted_msgs.find{|m| m.addr == local_addr }
end
end
Paxos::LocalData['catching_up'] = false
end
end
# acceptor thread
acceptor_thread = Thread.new do
Thread.current.abort_on_exception = true
local_addr = Paxos::LocalData['local_addr']
db = Paxos.disk_conn
Paxos::LocalData['catching_up'] = false
loop do
request, client = Paxos::AcceptorQueue.pop
# Become a non-voting member and start catching up if we're lagging behind
if Paxos.smallest_executable_id(db) < request.id
unless Paxos::LocalData['catching_up']
Paxos::LocalData['catching_up'] = true
Paxos::CatchupQueue << request.id
end
client.close; next
end
last_promise = Paxos::Disk.find_by_id(request.id, db)
case request.type
when Paxos::Msg::PREPARE
if last_promise.nil? or request.n > last_promise.n
Paxos::Disk.new(request.id, request.n, request.v).save_n_to_disk(db)
response = "#{request.id} #{Paxos::Msg::PROMISE}"
if last_promise and last_promise.v
response << " #{last_promise.n} #{last_promise.v}"
end
elsif request.n <= last_promise.n
response = "#{request.id} #{Paxos::Msg::IGNORED} #{last_promise.n}"
end
when Paxos::Msg::ACCEPT
unless (last_promise and last_promise.n > request.n)
if last_promise and request.v == last_promise.v
response = "#{request.id} #{Paxos::Msg::ACCEPTED} #{local_addr} "
else
exec_res = if (paxos_command = Paxos::App::Command.create(request.v))
Paxos::App.execute paxos_command
else
App.execute request.v
end
if exec_res.is_successful
n = [request.n, Paxos::SmallestProposeN].max
Paxos::Disk.new(request.id, n, request.v).save_n_v_to_disk(db)
response = "#{request.id} #{Paxos::Msg::ACCEPTED} #{local_addr} #{exec_res.value}"
else
response = "#{request.id} execution_failed"
end
end
else
response = "#{request.id} #{Paxos::Msg::IGNORED} #{last_promise.n}"
end
end
begin; Paxos::Sock.puts_with_timeout client, response, 0.1
rescue Errno::EPIPE; ensure client.close; end
end
end
# main thread handles socket
server = TCPServer.new Paxos::LocalData['local_port']
loop do
Thread.new(server.accept) do |client|
Thread.current.abort_on_exception = true
str_msg = client.gets
if str_msg.nil?
client.close; Thread.exit
elsif str_msg[-1] != "\n"
client.puts "newline at the end needed"; client.close; Thread.exit
else
str_msg = str_msg[0...-1]
end
if str_msg == 'exit'
PerfTools::CpuProfiler.stop if CONFIG['perftools']
Process.exit
end
if str_msg == Paxos::HeartbeatPing
Paxos::Sock.puts_and_close client, Paxos::HeartbeatPong
Thread.exit
end
if (paxos_msg = Paxos::Msg.create(str_msg))
Paxos::AcceptorQueue << [paxos_msg, client]
elsif (paxos_command = Paxos::App::Command.create(str_msg))
Paxos::ClientHandlerQueue << [str_msg, client]
else
command = ::App::Command.new(str_msg)
if command.err_msg
Paxos::Sock.puts_and_close client, command.err_msg
Thread.exit
end
Paxos::ClientHandlerQueue << [str_msg, client]
end
end # Thread.new do
end