Skip to content

Commit

Permalink
Add support for UDP sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
mogemimi committed Aug 9, 2019
1 parent 7864279 commit 44359c7
Show file tree
Hide file tree
Showing 9 changed files with 954 additions and 0 deletions.
6 changes: 6 additions & 0 deletions build/pomdog/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ set(POMDOG_SOURCES_CORE
${POMDOG_DIR}/include/Pomdog/Network/ArrayView.hpp
${POMDOG_DIR}/include/Pomdog/Network/IOService.hpp
${POMDOG_DIR}/include/Pomdog/Network/TCPStream.hpp
${POMDOG_DIR}/include/Pomdog/Network/UDPStream.hpp
${POMDOG_DIR}/include/Pomdog/Network/detail/ForwardDeclarations.hpp
${POMDOG_DIR}/include/Pomdog/Reactive/Observable.hpp
${POMDOG_DIR}/include/Pomdog/Reactive/ObservableBase.hpp
Expand Down Expand Up @@ -334,6 +335,7 @@ set(POMDOG_SOURCES_CORE
${POMDOG_DIR}/src/Network/IOService.cpp
${POMDOG_DIR}/src/Network/SocketProtocol.hpp
${POMDOG_DIR}/src/Network/TCPStream.cpp
${POMDOG_DIR}/src/Network/UDPStream.cpp
${POMDOG_DIR}/src/RenderSystem/BufferBindMode.hpp
${POMDOG_DIR}/src/RenderSystem/BufferHelper.cpp
${POMDOG_DIR}/src/RenderSystem/BufferHelper.hpp
Expand Down Expand Up @@ -686,6 +688,8 @@ set(POMDOG_SOURCES_NETWORK_POSIX
${POMDOG_DIR}/src/Network.POSIX/SocketHelperPOSIX.hpp
${POMDOG_DIR}/src/Network.POSIX/TCPStreamPOSIX.cpp
${POMDOG_DIR}/src/Network.POSIX/TCPStreamPOSIX.hpp
${POMDOG_DIR}/src/Network.POSIX/UDPStreamPOSIX.cpp
${POMDOG_DIR}/src/Network.POSIX/UDPStreamPOSIX.hpp
)

set(POMDOG_SOURCES_NETWORK_WIN32
Expand All @@ -695,6 +699,8 @@ set(POMDOG_SOURCES_NETWORK_WIN32
${POMDOG_DIR}/src/Network.Win32/SocketHelperWin32.hpp
${POMDOG_DIR}/src/Network.Win32/TCPStreamWin32.cpp
${POMDOG_DIR}/src/Network.Win32/TCPStreamWin32.hpp
${POMDOG_DIR}/src/Network.Win32/UDPStreamWin32.cpp
${POMDOG_DIR}/src/Network.Win32/UDPStreamWin32.hpp
)

set(SOURCE_FILES ${POMDOG_SOURCES_CORE} ${POMDOG_SOURCES_EXPERIMENTAL})
Expand Down
64 changes: 64 additions & 0 deletions include/Pomdog/Network/UDPStream.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) 2013-2019 mogemimi. Distributed under the MIT license.

#pragma once

#include "Pomdog/Application/Duration.hpp"
#include "Pomdog/Basic/Export.hpp"
#include "Pomdog/Network/detail/ForwardDeclarations.hpp"
#include "Pomdog/Signals/detail/ForwardDeclarations.hpp"
#include "Pomdog/Utility/Errors.hpp"
#include <functional>
#include <memory>
#include <string_view>
#include <tuple>

namespace Pomdog {

class POMDOG_EXPORT UDPStream final {
public:
UDPStream();
explicit UDPStream(IOService* service);

UDPStream(const UDPStream&) = delete;
UDPStream& operator=(const UDPStream&) = delete;

UDPStream(UDPStream&& other);
UDPStream& operator=(UDPStream&& other);

~UDPStream();

/// Opens a UDP connection to a remote host.
static std::tuple<UDPStream, std::shared_ptr<Error>>
Connect(IOService* service, const std::string_view& address);

/// Starts listening for incoming datagrams.
static std::tuple<UDPStream, std::shared_ptr<Error>>
Listen(IOService* service, const std::string_view& address);

/// Closes the connection.
void Disconnect();

/// Writes data to the connection.
std::shared_ptr<Error> Write(const ArrayView<uint8_t const>& data);

/// Writes data to address.
std::shared_ptr<Error>
WriteTo(const ArrayView<uint8_t const>& data, const std::string_view& address);

/// Sets a callback function that is called when a connection is successfully established.
[[nodiscard]] Connection
OnConnected(std::function<void(const std::shared_ptr<Error>&)>&& callback);

/// Sets a callback function that is called when a data packet is received.
[[nodiscard]] Connection
OnRead(std::function<void(const ArrayView<uint8_t>&, const std::shared_ptr<Error>&)>&& callback);

/// Sets a callback function that is called when a data packet is received from the connection.
[[nodiscard]] Connection
OnReadFrom(std::function<void(const ArrayView<std::uint8_t>&, const std::string_view& address, const std::shared_ptr<Error>&)>&& callback);

private:
std::unique_ptr<Detail::NativeUDPStream> nativeStream;
};

} // namespace Pomdog
267 changes: 267 additions & 0 deletions src/Network.POSIX/UDPStreamPOSIX.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
// Copyright (c) 2013-2019 mogemimi. Distributed under the MIT license.

#include "UDPStreamPOSIX.hpp"
#include "SocketHelperPOSIX.hpp"
#include "../Network/AddressParser.hpp"
#include "../Network/EndPoint.hpp"
#include "../Network/ErrorHelper.hpp"
#include "Pomdog/Network/ArrayView.hpp"
#include "Pomdog/Network/IOService.hpp"
#include "Pomdog/Utility/Assert.hpp"
#include <array>
#include <cstring>
#include <arpa/inet.h>
#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

namespace Pomdog::Detail {
namespace {

constexpr int InvalidSocket = -1;

bool isSocketValid(int descriptor) noexcept
{
return descriptor >= 0;
}

#if defined(POMDOG_PLATFORM_LINUX)
constexpr int flags = MSG_NOSIGNAL;
#else
constexpr int flags = 0;
#endif

} // namespace

UDPStreamPOSIX::UDPStreamPOSIX(IOService* serviceIn)
: service(serviceIn)
{
}

UDPStreamPOSIX::~UDPStreamPOSIX()
{
if (blockingThread.joinable()) {
blockingThread.join();
}

if (isSocketValid(this->descriptor)) {
::close(this->descriptor);
this->descriptor = InvalidSocket;
}
}

std::shared_ptr<Error>
UDPStreamPOSIX::Connect(const std::string_view& host, const std::string_view& port, const Duration& connectTimeout)
{
POMDOG_ASSERT(service != nullptr);

// NOTE: A std::string_view doesn't provide a conversion to a const char* because it doesn't store a null-terminated string.
const auto hostBuf = std::string{host};
const auto portBuf = std::string{port};

std::thread connectThread([this, hostBuf = std::move(hostBuf), portBuf = std::move(portBuf), connectTimeout = connectTimeout] {
auto[fd, err] = Detail::ConnectSocketPOSIX(hostBuf, portBuf, SocketProtocol::UDP, connectTimeout);

if (err != nullptr) {
auto wrapped = Errors::Wrap(std::move(err), "couldn't connect to UDP socket on " + hostBuf + ":" + portBuf);
errorConn = service->ScheduleTask([this, err = std::move(wrapped)] {
this->OnConnected(std::move(err));
this->errorConn.Disconnect();
});
return;
}
this->descriptor = fd;

eventLoopConn = service->ScheduleTask([this] {
this->OnConnected(nullptr);
eventLoopConn.Disconnect();
eventLoopConn = service->ScheduleTask([this] { this->ReadEventLoop(); });
});
});

this->blockingThread = std::move(connectThread);

return nullptr;
}

std::shared_ptr<Error>
UDPStreamPOSIX::Listen(const std::string_view& host, const std::string_view& port)
{
POMDOG_ASSERT(service != nullptr);

// NOTE: A std::string_view doesn't provide a conversion to a const char* because it doesn't store a null-terminated string.
const auto hostBuf = std::string{host};
const auto portBuf = std::string{port};

auto[fd, err] = Detail::BindSocketPOSIX(hostBuf, portBuf, SocketProtocol::UDP);

if (err != nullptr) {
errorConn = service->ScheduleTask([this, err = std::move(err)] {
this->OnConnected(err);
this->errorConn.Disconnect();
});
return err;
}
this->descriptor = fd;

eventLoopConn = service->ScheduleTask([this] {
this->OnConnected(nullptr);
eventLoopConn.Disconnect();
eventLoopConn = service->ScheduleTask([this] { this->ReadFromEventLoop(); });
});

return nullptr;
}

void UDPStreamPOSIX::Close()
{
this->eventLoopConn.Disconnect();
this->errorConn.Disconnect();

if (isSocketValid(this->descriptor)) {
::close(this->descriptor);
this->descriptor = InvalidSocket;
}
}

std::shared_ptr<Error>
UDPStreamPOSIX::Write(const ArrayView<std::uint8_t const>& data)
{
POMDOG_ASSERT(isSocketValid(descriptor));
POMDOG_ASSERT(data.GetData() != nullptr);
POMDOG_ASSERT(data.GetSize() > 0);

auto result = ::send(this->descriptor, data.GetData(), data.GetSize(), flags);

if (result == -1) {
auto errorCode = Detail::ToErrc(errno);
return Errors::New(errorCode, "send failed with error");
}

return nullptr;
}

std::shared_ptr<Error>
UDPStreamPOSIX::WriteTo(const ArrayView<std::uint8_t const>& data, const std::string_view& address)
{
POMDOG_ASSERT(isSocketValid(this->descriptor));
POMDOG_ASSERT(data.GetData() != nullptr);
POMDOG_ASSERT(data.GetSize() > 0);

const auto[family, hostView, portView] = Detail::AddressParser::TransformAddress(address);
auto host = std::string{hostView};
auto port = std::string{portView};

struct ::addrinfo hints;
std::memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = IPPROTO_UDP;

struct ::addrinfo* addrListRaw = nullptr;

auto res = ::getaddrinfo(host.data(), port.data(), &hints, &addrListRaw);
if (res != 0) {
return Errors::New("getaddrinfo failed with error " + std::to_string(res));
}

auto addrList = std::unique_ptr<struct ::addrinfo, void(*)(struct ::addrinfo*)>{addrListRaw, ::freeaddrinfo};
addrListRaw = nullptr;

std::optional<std::errc> lastError;

for (auto info = addrList.get(); info != nullptr; info = info->ai_next) {
auto result = ::sendto(
this->descriptor, data.GetData(), data.GetSize(), flags,
info->ai_addr, static_cast<int>(info->ai_addrlen));

if (result == -1) {
lastError = Detail::ToErrc(errno);
continue;
}
lastError = std::nullopt;
break;
}

if (lastError != std::nullopt) {
return Errors::New(*lastError, "sendto failed with error");
}

return nullptr;
}

int UDPStreamPOSIX::GetHandle() const noexcept
{
return descriptor;
}

void UDPStreamPOSIX::ReadEventLoop()
{
POMDOG_ASSERT(isSocketValid(descriptor));

// NOTE: Read per 1 frame (= 1/60 seconds) for a packet up to 1024 bytes.
std::array<std::uint8_t, 1024> buffer;

const auto readSize = ::recv(this->descriptor, buffer.data(), buffer.size(), flags);
if (readSize == -1) {
const auto errorCode = Detail::ToErrc(errno);
if (errorCode == std::errc::resource_unavailable_try_again || errorCode == std::errc::operation_would_block) {
// NOTE: There is no data to be read yet
return;
}

this->OnRead({}, Errors::New(errorCode, "read failed with error"));
return;
}

if (readSize < 0) {
return;
}

// NOTE: When the peer socket has performed orderly shutdown,
// the read size will be 0 (meaning the "end-of-file").
POMDOG_ASSERT(readSize >= 0);
auto view = ArrayView<std::uint8_t>{buffer.data(), static_cast<std::size_t>(readSize)};
this->OnRead(std::move(view), nullptr);
}

void UDPStreamPOSIX::ReadFromEventLoop()
{
POMDOG_ASSERT(isSocketValid(descriptor));

// NOTE: Read per 1 frame (= 1/60 seconds) for a packet up to 1024 bytes.
std::array<std::uint8_t, 1024> buffer;

struct ::sockaddr_storage addrInfo;
socklen_t addrLen = sizeof(addrInfo);

const auto readSize = ::recvfrom(
this->descriptor, buffer.data(), buffer.size(), flags,
reinterpret_cast<struct sockaddr*>(&addrInfo), &addrLen);

if (readSize == -1) {
const auto errorCode = Detail::ToErrc(errno);
if (errorCode == std::errc::resource_unavailable_try_again || errorCode == std::errc::operation_would_block) {
// NOTE: There is no data to be read yet
return;
}

this->OnReadFrom({}, "", Errors::New(errorCode, "read failed with error"));
return;
}

if (readSize < 0) {
return;
}

// NOTE: When the peer socket has performed orderly shutdown,
// the read size will be 0 (meaning the "end-of-file").
POMDOG_ASSERT(readSize >= 0);
auto addr = EndPoint::CreateFromAddressStorage(addrInfo);
auto view = ArrayView<std::uint8_t>{buffer.data(), static_cast<std::size_t>(readSize)};
this->OnReadFrom(std::move(view), addr.ToString(), nullptr);
}

} // namespace Pomdog::Detail
Loading

0 comments on commit 44359c7

Please sign in to comment.