-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprudp.py
159 lines (120 loc) · 5.38 KB
/
prudp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import socket
import threading
import multiprocessing
import numpy as np
from typing import Callable, Dict, List, Union
from packets import PacketInterface, PacketV0, PacketV1
class Counter:
def __init__(self):
self.value = 0
self.lock = threading.Lock()
def increment(self):
with self.lock:
self.value += 1
return self.value
class PRUDPClient:
def __init__(self, address):
self.address = address
self.server = PRUDPServer()
self.pid = 0
def get_pid(self):
return self.pid
def set_pid(self, pid):
self.pid = pid
class PRUDPServer:
def __init__(self):
self.clients = PRUDPClient
self.socket = socket.socket
self.access_key = str
self.prudp_version = int
self.nex_version = int
self.fragment_size = np.int16
self.connection_id_counter = Counter
self.generic_event_handles: Dict[str, List[Callable[[PacketInterface], None]]] = {}
self.prudp_v0_event_handles: Dict[str, List[Callable[[PacketV0], None]]] = {}
self.prudp_v1_event_handles: Dict[str, List[Callable[[PacketV1], None]]] = {}
def listen(self, address: str):
protocol = "udp"
try:
udp_address = socket.getaddrinfo(address.split(":")[0], int(address.split(":")[1]), socket.AF_INET, socket.SOCK_DGRAM)[0][4]
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind(udp_address)
except Exception as e:
raise RuntimeError(f"Failed to bind to address {address}: {e}")
quit_event = threading.Event()
for _ in range(multiprocessing.cpu_count()):
threading.Thread(target=self.listen_datagram, args=(quit_event,), daemon=True).start()
print(f"PRUDP server listening on address - {address}")
self.emit("Listening", None)
quit_event.wait()
def listen_datagram(self, quit_event: threading.Event):
try:
while not quit_event.is_set():
self.handle_socket_message()
except Exception as e:
quit_event.set()
raise RuntimeError(f"Error in datagram listener: {e}")
def handle_socket_message(self):
buffer = bytearray(64000)
self.set_socket(self.socket)
length, addr = self.socket.recvfrom_into(buffer)
discriminator = addr[0]
if discriminator not in self.clients:
new_client = PRUDPClient(addr)
self.clients[discriminator] = new_client
client = self.clients[discriminator]
data = buffer[:length]
packet = PacketInterface()
if self.prudp_version() == 0:
packet = PacketV0(client, data)
else:
packet = PacketV1(client, data)
def on(self, event: str, handler: Callable) -> None:
if self._is_handler_for_packet_interface(handler):
self.generic_event_handles.setdefault(event, []).append(handler)
elif self._is_handler_for_packet_v0(handler):
self.prudp_v0_event_handles.setdefault(event, []).append(handler)
elif self._is_handler_for_packet_v1(handler):
self.prudp_v1_event_handles.setdefault(event, []).append(handler)
else:
raise TypeError("Handler type does not match any expected packet type")
def _is_handler_for_packet_interface(self, handler: Callable) -> bool:
return self._is_callable_with_type(handler, PacketInterface)
def _is_handler_for_packet_v0(self, handler: Callable) -> bool:
return self._is_callable_with_type(handler, PacketV0)
def _is_handler_for_packet_v1(self, handler: Callable) -> bool:
return self._is_callable_with_type(handler, PacketV1)
def _is_callable_with_type(self, handler: Callable, packet_type: type) -> bool:
try:
handler(packet_type())
return True
except TypeError:
return False
def emit(self, event: str, packet: 'PacketInterface'):
event_handlers = self.generic_event_handles.get(event, [])
for handler in event_handlers:
threading.Thread(target=handler, args=(packet,)).start()
if isinstance(packet, PacketV0):
event_handlers = self.prudp_v0_event_handles.get(event, [])
for handler in event_handlers:
threading.Thread(target=handler, args=(packet,)).start()
elif isinstance(packet, PacketV1):
event_handlers = self.prudp_v1_event_handles.get(event, [])
for handler in event_handlers:
threading.Thread(target=handler, args=(packet,)).start()
def get_socket(self) -> socket.socket:
return self.socket
def set_socket(self, socket: socket.socket):
self.socket = socket
def get_access_key(self) -> str:
return self.access_key
def set_access_key(self, access_key: str):
self.access_key = access_key
def get_prudp_version(self) -> int:
return self.prudp_version
def set_prudp_version(self, version: int):
self.prudp_version = version
def get_nex_version(self) -> int:
return self.nex_version
def set_nex_version(self, version: int):
self.nex_version = version