Skip to content

Commit

Permalink
UDPServer handles PacketPeerUDP-client association
Browse files Browse the repository at this point in the history
UDPServer now uses a single socket which is shared with the
PacketPeerUDP it creates and has a new `poll` function to read incoming
packets on that socket and delivers them to the appropriate peer.
PacketPeerUDP created this way never reads from the socket, but are
allowed to write on it using sendto.

This is needed because Windows (unlike Linux/BSD) does not support
packet routing when multiple sockets are bound on the same address/port.
  • Loading branch information
Faless committed Jul 14, 2020
1 parent c2ed367 commit 147bbe2
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 35 deletions.
65 changes: 40 additions & 25 deletions core/io/packet_peer_udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,22 @@
#include "packet_peer_udp.h"

#include "core/io/ip.h"
#include "core/io/udp_server.h"

void PacketPeerUDP::set_blocking_mode(bool p_enable) {
blocking = p_enable;
}

void PacketPeerUDP::set_broadcast_enabled(bool p_enabled) {
ERR_FAIL_COND(udp_server);
broadcast = p_enabled;
if (_sock.is_valid() && _sock->is_open()) {
_sock->set_broadcasting_enabled(p_enabled);
}
}

Error PacketPeerUDP::join_multicast_group(IP_Address p_multi_address, String p_if_name) {
ERR_FAIL_COND_V(udp_server, ERR_LOCKED);
ERR_FAIL_COND_V(!_sock.is_valid(), ERR_UNAVAILABLE);
ERR_FAIL_COND_V(!p_multi_address.is_valid(), ERR_INVALID_PARAMETER);

Expand All @@ -58,6 +61,7 @@ Error PacketPeerUDP::join_multicast_group(IP_Address p_multi_address, String p_i
}

Error PacketPeerUDP::leave_multicast_group(IP_Address p_multi_address, String p_if_name) {
ERR_FAIL_COND_V(udp_server, ERR_LOCKED);
ERR_FAIL_COND_V(!_sock.is_valid(), ERR_UNAVAILABLE);
ERR_FAIL_COND_V(!_sock->is_open(), ERR_UNCONFIGURED);
return _sock->leave_multicast_group(p_multi_address, p_if_name);
Expand Down Expand Up @@ -130,7 +134,7 @@ Error PacketPeerUDP::put_packet(const uint8_t *p_buffer, int p_buffer_size) {
}

do {
if (connected) {
if (connected && !udp_server) {
err = _sock->send(p_buffer, p_buffer_size, sent);
} else {
err = _sock->sendto(p_buffer, p_buffer_size, sent, peer_addr, peer_port);
Expand Down Expand Up @@ -186,26 +190,25 @@ Error PacketPeerUDP::listen(int p_port, const IP_Address &p_bind_address, int p_
return OK;
}

Error PacketPeerUDP::connect_socket(Ref<NetSocket> p_sock) {
Error err;
int read = 0;
uint16_t r_port;
IP_Address r_ip;

err = p_sock->recvfrom(recv_buffer, sizeof(recv_buffer), read, r_ip, r_port, true);
ERR_FAIL_COND_V(err != OK, err);
err = p_sock->connect_to_host(r_ip, r_port);
ERR_FAIL_COND_V(err != OK, err);
Error PacketPeerUDP::connect_shared_socket(Ref<NetSocket> p_sock, IP_Address p_ip, uint16_t p_port, UDPServer *p_server) {
udp_server = p_server;
connected = true;
_sock = p_sock;
peer_addr = r_ip;
peer_port = r_port;
peer_addr = p_ip;
peer_port = p_port;
packet_ip = peer_addr;
packet_port = peer_port;
connected = true;
return OK;
}

void PacketPeerUDP::disconnect_shared_socket() {
udp_server = nullptr;
_sock = Ref<NetSocket>(NetSocket::create());
close();
}

Error PacketPeerUDP::connect_to_host(const IP_Address &p_host, int p_port) {
ERR_FAIL_COND_V(udp_server, ERR_LOCKED);
ERR_FAIL_COND_V(!_sock.is_valid(), ERR_UNAVAILABLE);
ERR_FAIL_COND_V(!p_host.is_valid(), ERR_INVALID_PARAMETER);

Expand Down Expand Up @@ -243,7 +246,11 @@ bool PacketPeerUDP::is_connected_to_host() const {
}

void PacketPeerUDP::close() {
if (_sock.is_valid()) {
if (udp_server) {
udp_server->remove_peer(peer_addr, peer_port);
udp_server = nullptr;
_sock = Ref<NetSocket>(NetSocket::create());
} else if (_sock.is_valid()) {
_sock->close();
}
rb.resize(16);
Expand All @@ -262,6 +269,9 @@ Error PacketPeerUDP::_poll() {
if (!_sock->is_open()) {
return FAILED;
}
if (udp_server) {
return OK; // Handled by UDPServer.
}

Error err;
int read;
Expand All @@ -284,24 +294,29 @@ Error PacketPeerUDP::_poll() {
return FAILED;
}

if (rb.space_left() < read + 24) {
err = store_packet(ip, port, recv_buffer, read);
#ifdef TOOLS_ENABLED
if (err != OK) {
WARN_PRINT("Buffer full, dropping packets!");
#endif
continue;
}

uint32_t port32 = port;
rb.write(ip.get_ipv6(), 16);
rb.write((uint8_t *)&port32, 4);
rb.write((uint8_t *)&read, 4);
rb.write(recv_buffer, read);
++queue_count;
#endif
}

return OK;
}

Error PacketPeerUDP::store_packet(IP_Address p_ip, uint32_t p_port, uint8_t *p_buf, int p_buf_size) {
if (rb.space_left() < p_buf_size + 24) {
return ERR_OUT_OF_MEMORY;
}
rb.write(p_ip.get_ipv6(), 16);
rb.write((uint8_t *)&p_port, 4);
rb.write((uint8_t *)&p_buf_size, 4);
rb.write(p_buf, p_buf_size);
++queue_count;
return OK;
}

bool PacketPeerUDP::is_listening() const {
return _sock.is_valid() && _sock->is_open();
}
Expand Down
7 changes: 6 additions & 1 deletion core/io/packet_peer_udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include "core/io/net_socket.h"
#include "core/io/packet_peer.h"

class UDPServer;

class PacketPeerUDP : public PacketPeer {
GDCLASS(PacketPeerUDP, PacketPeer);

Expand All @@ -55,6 +57,7 @@ class PacketPeerUDP : public PacketPeer {
bool connected = false;
bool blocking = true;
bool broadcast = false;
UDPServer *udp_server = nullptr;
Ref<NetSocket> _sock;

static void _bind_methods();
Expand All @@ -72,7 +75,9 @@ class PacketPeerUDP : public PacketPeer {
Error wait();
bool is_listening() const;

Error connect_socket(Ref<NetSocket> p_sock); // Used by UDPServer
Error connect_shared_socket(Ref<NetSocket> p_sock, IP_Address p_ip, uint16_t p_port, UDPServer *ref); // Used by UDPServer
void disconnect_shared_socket(); // Used by UDPServer
Error store_packet(IP_Address p_ip, uint32_t p_port, uint8_t *p_buf, int p_buf_size); // Used internally and by UDPServer
Error connect_to_host(const IP_Address &p_host, int p_port);
bool is_connected_to_host() const;

Expand Down
100 changes: 93 additions & 7 deletions core/io/udp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,58 @@

void UDPServer::_bind_methods() {
ClassDB::bind_method(D_METHOD("listen", "port", "bind_address"), &UDPServer::listen, DEFVAL("*"));
ClassDB::bind_method(D_METHOD("poll"), &UDPServer::poll);
ClassDB::bind_method(D_METHOD("is_connection_available"), &UDPServer::is_connection_available);
ClassDB::bind_method(D_METHOD("is_listening"), &UDPServer::is_listening);
ClassDB::bind_method(D_METHOD("take_connection"), &UDPServer::take_connection);
ClassDB::bind_method(D_METHOD("stop"), &UDPServer::stop);
ClassDB::bind_method(D_METHOD("set_max_pending_connections", "max_pending_connections"), &UDPServer::set_max_pending_connections);
ClassDB::bind_method(D_METHOD("get_max_pending_connections"), &UDPServer::get_max_pending_connections);
ADD_PROPERTY(PropertyInfo(Variant::INT, "max_pending_connections", PROPERTY_HINT_RANGE, "0,256,1"), "set_max_pending_connections", "get_max_pending_connections");
}

Error UDPServer::poll() {
ERR_FAIL_COND_V(!_sock.is_valid(), ERR_UNAVAILABLE);
if (!_sock->is_open()) {
return ERR_UNCONFIGURED;
}
Error err;
int read;
IP_Address ip;
uint16_t port;
while (true) {
err = _sock->recvfrom(recv_buffer, sizeof(recv_buffer), read, ip, port);
if (err != OK) {
if (err == ERR_BUSY) {
break;
}
return FAILED;
}
Peer p;
p.ip = ip;
p.port = port;
List<Peer>::Element *E = peers.find(p);
if (!E) {
E = pending.find(p);
}
if (E) {
E->get().peer->store_packet(ip, port, recv_buffer, read);
} else {
if (pending.size() >= max_pending_connections) {
// Drop connection.
continue;
}
// It's a new peer, add it to the pending list.
Peer peer;
peer.ip = ip;
peer.port = port;
peer.peer = memnew(PacketPeerUDP);
peer.peer->connect_shared_socket(_sock, ip, port, this);
peer.peer->store_packet(ip, port, recv_buffer, read);
pending.push_back(peer);
}
}
return OK;
}

Error UDPServer::listen(uint16_t p_port, const IP_Address &p_bind_address) {
Expand Down Expand Up @@ -82,8 +130,24 @@ bool UDPServer::is_connection_available() const {
return false;
}

Error err = _sock->poll(NetSocket::POLL_TYPE_IN, 0);
return (err == OK);
return pending.size() > 0;
}

void UDPServer::set_max_pending_connections(int p_max) {
ERR_FAIL_COND_MSG(p_max < 0, "Max pending connections value must be a positive number (0 means refuse new connections).");
max_pending_connections = p_max;
while (p_max > pending.size()) {
List<Peer>::Element *E = pending.back();
if (!E) {
break;
}
memdelete(E->get().peer);
pending.erase(E);
}
}

int UDPServer::get_max_pending_connections() const {
return max_pending_connections;
}

Ref<PacketPeerUDP> UDPServer::take_connection() {
Expand All @@ -92,11 +156,20 @@ Ref<PacketPeerUDP> UDPServer::take_connection() {
return conn;
}

conn = Ref<PacketPeerUDP>(memnew(PacketPeerUDP));
conn->connect_socket(_sock);
_sock = Ref<NetSocket>(NetSocket::create());
listen(bind_port, bind_address);
return conn;
Peer peer = pending[0];
pending.pop_front();
peers.push_back(peer);
return peer.peer;
}

void UDPServer::remove_peer(IP_Address p_ip, int p_port) {
Peer peer;
peer.ip = p_ip;
peer.port = p_port;
List<Peer>::Element *E = peers.find(peer);
if (E) {
peers.erase(E);
}
}

void UDPServer::stop() {
Expand All @@ -105,6 +178,19 @@ void UDPServer::stop() {
}
bind_port = 0;
bind_address = IP_Address();
List<Peer>::Element *E = peers.front();
while (E) {
E->get().peer->disconnect_shared_socket();
E = E->next();
}
E = pending.front();
while (E) {
E->get().peer->disconnect_shared_socket();
memdelete(E->get().peer);
E = E->next();
}
peers.clear();
pending.clear();
}

UDPServer::UDPServer() :
Expand Down
29 changes: 27 additions & 2 deletions core/io/udp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,40 @@ class UDPServer : public Reference {
GDCLASS(UDPServer, Reference);

protected:
static void _bind_methods();
int bind_port;
enum {
PACKET_BUFFER_SIZE = 65536
};

struct Peer {
PacketPeerUDP *peer;
IP_Address ip;
uint16_t port = 0;

bool operator==(const Peer &p_other) const {
return (ip == p_other.ip && port == p_other.port);
}
};
uint8_t recv_buffer[PACKET_BUFFER_SIZE];

int bind_port = 0;
IP_Address bind_address;

List<Peer> peers;
List<Peer> pending;
int max_pending_connections = 16;

Ref<NetSocket> _sock;

static void _bind_methods();

public:
void remove_peer(IP_Address p_ip, int p_port);
Error listen(uint16_t p_port, const IP_Address &p_bind_address = IP_Address("*"));
Error poll();
bool is_listening() const;
bool is_connection_available() const;
void set_max_pending_connections(int p_max);
int get_max_pending_connections() const;
Ref<PacketPeerUDP> take_connection();

void stop();
Expand Down

0 comments on commit 147bbe2

Please sign in to comment.