-
Notifications
You must be signed in to change notification settings - Fork 27
/
UdpEndpoint.kt
214 lines (182 loc) · 6.18 KB
/
UdpEndpoint.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package nl.tudelft.ipv8.messaging.udp
import kotlinx.coroutines.*
import mu.KotlinLogging
import nl.tudelft.ipv8.Community
import nl.tudelft.ipv8.IPv4Address
import nl.tudelft.ipv8.Peer
import nl.tudelft.ipv8.messaging.Endpoint
import nl.tudelft.ipv8.messaging.EndpointListener
import nl.tudelft.ipv8.messaging.Packet
import nl.tudelft.ipv8.messaging.tftp.TFTPEndpoint
import java.io.IOException
import java.net.*
private val logger = KotlinLogging.logger {}
open class UdpEndpoint(
private val port: Int,
private val ip: InetAddress,
private val tftpEndpoint: TFTPEndpoint = TFTPEndpoint(),
) : Endpoint<Peer>() {
private var socket: DatagramSocket? = null
private val job = SupervisorJob()
private val scope = CoroutineScope(Dispatchers.IO + job)
private var bindJob: Job? = null
private var lanEstimationJob: Job? = null
init {
tftpEndpoint.addListener(
object : EndpointListener {
override fun onPacket(packet: Packet) {
logger.debug(
"Received TFTP packet (${packet.data.size} B) from ${packet.source}",
)
notifyListeners(packet)
}
override fun onEstimatedLanChanged(address: IPv4Address) {
}
},
)
}
override fun isOpen(): Boolean {
return socket?.isBound == true
}
override fun send(
peer: Peer,
data: ByteArray,
) {
if (!isOpen()) throw IllegalStateException("UDP socket is closed")
val address = peer.address
scope.launch {
logger.debug("Send packet (${data.size} B) to $address ($peer)")
try {
if (data.size > UDP_PAYLOAD_LIMIT) {
if (peer.supportsTFTP) {
tftpEndpoint.send(address, data)
} else {
logger.warn {
"The packet is larger then UDP_PAYLOAD_LIMIT and the peer " +
"does not support TFTP"
}
}
} else {
send(address, data)
}
} catch (e: Exception) {
e.printStackTrace()
}
}
}
fun send(
address: IPv4Address,
data: ByteArray,
) = scope.launch(Dispatchers.IO) {
try {
val datagramPacket = DatagramPacket(data, data.size, address.toSocketAddress())
socket?.send(datagramPacket)
} catch (e: Exception) {
logger.error("Sending DatagramPacket failed", e)
}
}
override fun open() {
val socket = getDatagramSocket()
this.socket = socket
tftpEndpoint.socket = socket
tftpEndpoint.open()
logger.info { "Opened UDP socket on port ${socket.localPort}" }
startLanEstimation()
bindJob = bindSocket(socket)
}
/**
* Finds the nearest unused socket.
*/
private fun getDatagramSocket(): DatagramSocket {
for (i in 0 until 100) {
try {
return DatagramSocket(port + i, ip)
} catch (e: Exception) {
// Try another port
}
}
// Use any available port
return DatagramSocket()
}
override fun close() {
if (!isOpen()) throw IllegalStateException("UDP socket is already closed")
stopLanEstimation()
bindJob?.cancel()
bindJob = null
tftpEndpoint.close()
socket?.close()
socket = null
}
open fun startLanEstimation() {
lanEstimationJob =
scope.launch {
while (isActive) {
estimateLan()
delay(60_000)
}
}
}
private fun estimateLan() {
val interfaces = NetworkInterface.getNetworkInterfaces()
for (intf in interfaces) {
for (intfAddr in intf.interfaceAddresses) {
if (intfAddr.address is Inet4Address && !intfAddr.address.isLoopbackAddress) {
val estimatedAddress =
IPv4Address(intfAddr.address.hostAddress, getSocketPort())
setEstimatedLan(estimatedAddress)
}
}
}
}
open fun stopLanEstimation() {
lanEstimationJob?.cancel()
lanEstimationJob = null
}
fun getSocketPort(): Int {
return socket?.localPort ?: port
}
private fun bindSocket(socket: DatagramSocket) =
scope.launch {
try {
val receiveData = ByteArray(UDP_PAYLOAD_LIMIT)
while (isActive) {
val receivePacket = DatagramPacket(receiveData, receiveData.size)
withContext(Dispatchers.IO) {
socket.receive(receivePacket)
}
handleReceivedPacket(receivePacket)
}
} catch (e: IOException) {
e.printStackTrace()
}
}
internal fun handleReceivedPacket(receivePacket: DatagramPacket) {
logger.debug(
"Received packet (${receivePacket.length} B) from " +
"${receivePacket.address.hostAddress}:${receivePacket.port}",
)
// Check whether prefix is IPv8 or TFTP
when (receivePacket.data[0]) {
Community.PREFIX_IPV8 -> {
val sourceAddress =
IPv4Address(receivePacket.address.hostAddress, receivePacket.port)
val packet =
Packet(sourceAddress, receivePacket.data.copyOf(receivePacket.length))
logger.debug(
"Received UDP packet (${receivePacket.length} B) from $sourceAddress",
)
notifyListeners(packet)
}
TFTPEndpoint.PREFIX_TFTP -> {
tftpEndpoint.onPacket(receivePacket)
}
else -> {
logger.warn { "Invalid packet prefix" }
}
}
}
companion object {
// 1500 - 20 (IPv4 header) - 8 (UDP header)
const val UDP_PAYLOAD_LIMIT = 1472
}
}