-
Notifications
You must be signed in to change notification settings - Fork 0
/
UDPHandlers.py
132 lines (115 loc) · 5.21 KB
/
UDPHandlers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import socket as s
import threading
import time
import Globals as g
from Helpers import Message, Peer, to_id
from Globals import RETRY_SECONDS, IP_ADDRESS
def init_udp(_udp_socket, _ping_socket, _server_port, _ping_interval):
global udp_socket
global ping_socket
global SERVER_PORT
global PING_INTERVAL
udp_socket = _udp_socket
ping_socket = _ping_socket
SERVER_PORT = _server_port
PING_INTERVAL = _ping_interval
ping_thread=threading.Thread(name="PingEmitter",target=run_udp_server)
ping_thread.daemon=True
ping_thread.start()
udp_server_thread=threading.Thread(name="UDPServer",target=ping_emitter)
udp_server_thread.daemon=True
udp_server_thread.start()
return [udp_server_thread, ping_thread]
def run_udp_server():
while True:
data, addr = udp_socket.recvfrom(1024) # buffer size is 1024 bytes
message = Message.fromMessage(data)
if (message.mType() == Message.PING):
socket = s.socket(s.AF_INET, s.SOCK_DGRAM)
socket.sendto(Message(Message.PONG, SERVER_PORT).content(), (addr[0], message.data()))
print("Ping request message received from Peer {}".format(to_id(message.data())))
elif (message.mType() == Message.PONG):
with g.peer_lock:
for p in g.peers:
if p.port() == message.data():
p.connected()
break
g.peer_lock.notify()
print("Ping response received from Peer {}".format(to_id(message.data())))
elif (message.mType() == Message.DIE):
print("UDP recieved DIE. R.I.P")
return
# Start a new thread and return its identifier
# TODO: Change to ping handler
# recv_thread=threading.Thread(name=addr, target=recv_handler, args=(data,addr))
# recv_thread.daemon=True
# recv_thread.start()
def ping_emitter():
# go through the list of the subscribed peers and send them the current time after every 1 second
while(1):
#get lock
with g.peer_lock:
for p in g.peers:
ping_socket.sendto(Message(Message.PING, SERVER_PORT).content(), (IP_ADDRESS, p.port()))
p.send_ping()
print('Ping request sent to Peer {}'.format(to_id(p.port())))
# notify other thread
g.peer_lock.notify()
# sleep for PING_INTERVAL
retry_thread = threading.Thread(name="RetryThread",target=retry_pings)
retry_thread.daemon=True
retry_thread.start()
time.sleep(PING_INTERVAL)
def retry_pings():
for _ in range(0, 4):
time.sleep(RETRY_SECONDS)
peers_to_connect = list(filter(lambda p: not p.response_recieved(), g.peers))
if (len(peers_to_connect) == 0):
return
with g.peer_lock:
for p in g.peers:
if not p.response_recieved():
p.attempt_failed()
ping_socket.sendto(Message(Message.PING, SERVER_PORT).content(), (IP_ADDRESS, p.port()))
else:
peers_to_connect = list(filter(lambda p: not p.response_recieved(), g.peers))
if len(peers_to_connect) == 0:
break
g.peer_lock.notify()
peers_to_connect = list(filter(lambda p: not p.response_recieved(), g.peers))
if (len(peers_to_connect) == 0):
return
socket = s.socket(s.AF_INET, s.SOCK_STREAM)
if (g.peers[0].is_lost()):
print("Peer {} is no longer alive".format(to_id(p.port())))
for i in range(0,3):
try:
socket.connect((IP_ADDRESS, g.peers[1].port()))
socket.sendall(Message(Message.YOUR_SUCC, g.server_port).content())
succs = Message.fromMessage(socket.recv(1024)).data()
with g.peer_lock:
g.peers = [g.peers[1], Peer(succs[0])]
print("Successors updated.")
print("My first successor is Peer {}".format(g.peers[0].ID()))
print("My second successor is Peer {}".format(g.peers[1].ID()))
break
except:
pass
if (i == 2):
print("Lost first and second successors. Everything is broken.")
elif(g.peers[1].is_lost()):
print("Peer {} is no longer alive".format(to_id(p.port())))
for i in range(0,3):
try:
socket.connect((IP_ADDRESS, g.peers[0].port()))
socket.sendall(Message(Message.YOUR_SUCC, g.server_port).content())
succs = Message.fromMessage(socket.recv(1024)).data()
with g.peer_lock:
g.peers[1] = Peer(succs[0])
print("Successors updated.")
print("My first successor is Peer {}".format(g.peers[0].ID()))
print("My second successor is Peer {}".format(g.peers[1].ID()))
except:
if i ==2:
print("Lost first and second successors. Everything is broken.")
socket.close()