Skip to content

Commit

Permalink
Implement an ntci::Proactor using io_uring
Browse files Browse the repository at this point in the history
  • Loading branch information
mattrm456 authored Aug 18, 2023
1 parent 9a5e12a commit b3a640b
Show file tree
Hide file tree
Showing 14 changed files with 9,013 additions and 78 deletions.
6 changes: 5 additions & 1 deletion cmake/templates/ntccfg_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace ntccfg {
// Build with support for the local (a.k.a. Unix) address family.
#define NTCCFG_BUILD_WITH_ADDRESS_FAMILY_LOCAL @NTF_BUILD_WITH_ADDRESS_FAMILY_LOCAL@

// Build with support for the Transmission Control Protocol (TCP) transport
// Build with support for the Transmission Control Protocol (TCP) transport
// protocol.
#define NTCCFG_BUILD_WITH_TRANSPORT_PROTOCOL_TCP @NTF_BUILD_WITH_TRANSPORT_PROTOCOL_TCP@

Expand Down Expand Up @@ -93,6 +93,10 @@ namespace ntccfg {
// driver for a proactor. This driver is available on Windows.
#define NTC_BUILD_WITH_IOCP @NTF_BUILD_WITH_IOCP@

// Build with support for being able to configure 'io_uring' as the
// driver for a proactor. This driver is available on Linux.
#define NTC_BUILD_WITH_IORING @NTF_BUILD_WITH_IORING@

// Build with support for being able to configure a processing model where
// any thread can process I/O for a socket, rather than just a particular
// thread chosen at the time the socket was created.
Expand Down
14 changes: 14 additions & 0 deletions configure
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ else
NTF_CONFIGURE_WITH_IOCP=0
fi

if [[ "${NTF_CONFIGURE_UNAME}" == "Linux" ]]; then
if [[ -z "${NTF_CONFIGURE_WITH_IORING}" ]]; then
NTF_CONFIGURE_WITH_IORING=0
fi
else
NTF_CONFIGURE_WITH_IORING=0
fi

if [[ -z "${NTF_CONFIGURE_WITH_DYNAMIC_LOAD_BALANCING}" ]]; then
NTF_CONFIGURE_WITH_DYNAMIC_LOAD_BALANCING=1
fi
Expand Down Expand Up @@ -323,6 +331,7 @@ usage()
echo " --with-pollset Enable the reactor driver implemented with 'pollset' (AIX only) [${NTF_CONFIGURE_WITH_POLLSET}]"
echo " --with-kqueue Enable the reactor driver implemented with 'kqueue' on (Darwin and FreeBSD only) [${NTF_CONFIGURE_WITH_KQUEUE}]"
echo " --with-iocp Enable the proactor driver implemented with I/O completion ports (Windows only) [${NTF_CONFIGURE_WITH_IOCP}]"
echo " --with-ioring Enable the proactor driver implemented with 'io_uring' (Linux only) [${NTF_CONFIGURE_WITH_IORING}]"

echo " --with-dynamic-load-balancing Enable processing I/O on any thread, rather than a single thread [${NTF_CONFIGURE_WITH_DYNAMIC_LOAD_BALANCING}]"
echo " --with-thread-scaling Enable automatic scaling of thread pools [${NTF_CONFIGURE_WITH_THREAD_SCALING}]"
Expand Down Expand Up @@ -472,6 +481,8 @@ while true ; do
NTF_CONFIGURE_WITH_KQUEUE=1 ; shift ;;
--with-iocp)
NTF_CONFIGURE_WITH_IOCP=1 ; shift ;;
--with-ioring)
NTF_CONFIGURE_WITH_IORING=1 ; shift ;;

--with-dynamic-load-balancing)
NTF_CONFIGURE_WITH_DYNAMIC_LOAD_BALANCING=1 ; shift ;;
Expand Down Expand Up @@ -565,6 +576,8 @@ while true ; do
NTF_CONFIGURE_WITH_KQUEUE=0 ; shift ;;
--without-iocp)
NTF_CONFIGURE_WITH_IOCP=0 ; shift ;;
--without-ioring)
NTF_CONFIGURE_WITH_IORING=0 ; shift ;;

--without-dynamic-load-balancing)
NTF_CONFIGURE_WITH_DYNAMIC_LOAD_BALANCING=0 ; shift ;;
Expand Down Expand Up @@ -666,6 +679,7 @@ export NTF_CONFIGURE_WITH_EVENTPORT
export NTF_CONFIGURE_WITH_POLLSET
export NTF_CONFIGURE_WITH_KQUEUE
export NTF_CONFIGURE_WITH_IOCP
export NTF_CONFIGURE_WITH_IORING

export NTF_CONFIGURE_WITH_DYNAMIC_LOAD_BALANCING
export NTF_CONFIGURE_WITH_THREAD_SCALING
Expand Down
11 changes: 11 additions & 0 deletions groups/ntc/ntcf/ntcf_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ BSLS_IDENT_RCSID(ntcf_system_cpp, "$Id$ $CSID$")
#include <ntco_epoll.h>
#include <ntco_eventport.h>
#include <ntco_iocp.h>
#include <ntco_ioring.h>
#include <ntco_kqueue.h>
#include <ntco_poll.h>
#include <ntco_pollset.h>
Expand Down Expand Up @@ -237,6 +238,16 @@ ntsa::Error System::initialize()
ntcs::Plugin::registerProactorFactory("IOCP", iocpFactory);
}
#endif
#endif

#if NTC_BUILD_WITH_IORING
#if defined(BSLS_PLATFORM_OS_LINUX)
if (ntco::IoRingFactory::isSupported()) {
bsl::shared_ptr<ntco::IoRingFactory> iocpFactory;
iocpFactory.createInplace(allocator, allocator);
ntcs::Plugin::registerProactorFactory("IORING", iocpFactory);
}
#endif
#endif

bsl::atexit(&System::exit);
Expand Down
7 changes: 4 additions & 3 deletions groups/ntc/ntcf/ntcf_system.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ using namespace BloombergLP;
// "POLLSET" Implementation using the pollset API.
// "KQUEUE" Implementation using kqueue/kevent
// "IOCP" Implementation using I/O completion ports
// #define NTCF_SYSTEM_TEST_DRIVER_TYPE "EPOLL"
// "IOCP" Implementation using I/O rings
// #define NTCF_SYSTEM_TEST_DRIVER_TYPE "IORING"

// Uncomment to test a specific address family, instead of all address
// families.
Expand Down Expand Up @@ -5474,13 +5475,13 @@ void concern(const ConcernCallback& concernCallback,

const bsl::size_t MIN_THREADS = 4;
const bsl::size_t MAX_THREADS = 4;
const bsl::size_t LOAD_FACTOR = 10;
const bsl::size_t LOAD_FACTOR = 10000;

#else

const bsl::size_t MIN_THREADS = 2;
const bsl::size_t MAX_THREADS = 2;
const bsl::size_t LOAD_FACTOR = 10;
const bsl::size_t LOAD_FACTOR = 10000;

#endif

Expand Down
48 changes: 24 additions & 24 deletions groups/ntc/ntco/ntco_iocp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -920,19 +920,19 @@ void Iocp::wait(ntci::Waiter waiter)
return;
}

const bool wantedEndpoint = event->d_operationArenaSize > 0;
const bool wantedEndpoint = event->d_numBytesIndicated > 0;

if (wantedEndpoint) {
const bool haveEndpoint =
event->d_operationArenaSize <= sizeof(sockaddr_storage);
event->d_numBytesIndicated <= sizeof(sockaddr_storage);

if (haveEndpoint) {
ntsa::Endpoint endpoint;
error = ntsu::SocketUtil::decodeEndpoint(
&endpoint,
reinterpret_cast<const sockaddr_storage*>(
event->d_operationArena),
static_cast<bsl::size_t>(event->d_operationArenaSize));
event->d_address),
static_cast<bsl::size_t>(event->d_numBytesIndicated));
if (error) {
ntcs::Dispatch::announceReceived(
event->d_socket,
Expand Down Expand Up @@ -1411,7 +1411,7 @@ ntsa::Error Iocp::accept(const bsl::shared_ptr<ntci::ProactorSocket>& socket)
if (sourceEndpoint.ip().host().isV4()) {
rc = acceptEx(event->d_socket->handle(),
event->d_target,
event->d_operationArena,
event->d_address,
0,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
Expand All @@ -1421,7 +1421,7 @@ ntsa::Error Iocp::accept(const bsl::shared_ptr<ntci::ProactorSocket>& socket)
else if (sourceEndpoint.ip().host().isV6()) {
rc = acceptEx(event->d_socket->handle(),
event->d_target,
event->d_operationArena,
event->d_address,
0,
sizeof(sockaddr_in6) + 16,
sizeof(sockaddr_in6) + 16,
Expand All @@ -1435,7 +1435,7 @@ ntsa::Error Iocp::accept(const bsl::shared_ptr<ntci::ProactorSocket>& socket)
else if (sourceEndpoint.isLocal()) {
rc = acceptEx(event->d_socket->handle(),
event->d_target,
event->d_operationArena,
event->d_address,
0,
sizeof(sockaddr_un) + 16,
sizeof(sockaddr_un) + 16,
Expand Down Expand Up @@ -1605,7 +1605,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
{
bsl::size_t socketAddressSizeT;
ntsa::Error error = ntsu::SocketUtil::encodeEndpoint(
reinterpret_cast<sockaddr_storage*>(event->d_operationArena),
reinterpret_cast<sockaddr_storage*>(event->d_address),
&socketAddressSizeT,
options.endpoint().value());
if (error) {
Expand All @@ -1623,7 +1623,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
static_cast<DWORD>(numBuffersTotal),
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -1695,7 +1695,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
if (specifyEndpoint) {
bsl::size_t socketAddressSizeT;
ntsa::Error error = ntsu::SocketUtil::encodeEndpoint(
reinterpret_cast<sockaddr_storage*>(event->d_operationArena),
reinterpret_cast<sockaddr_storage*>(event->d_address),
&socketAddressSizeT,
options.endpoint().value());
if (error) {
Expand Down Expand Up @@ -1745,7 +1745,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
NTCCFG_WARNING_NARROW(DWORD, numBuffersTotal),
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -1811,7 +1811,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
NTCCFG_WARNING_NARROW(DWORD, numBuffersTotal),
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -1853,7 +1853,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
1,
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -1893,7 +1893,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
1,
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -1939,7 +1939,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
NTCCFG_WARNING_NARROW(DWORD, numBuffersTotal),
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -1985,7 +1985,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
NTCCFG_WARNING_NARROW(DWORD, numBuffersTotal),
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -2025,7 +2025,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
1,
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -2071,7 +2071,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
NTCCFG_WARNING_NARROW(DWORD, numBuffersTotal),
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -2118,7 +2118,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
NTCCFG_WARNING_NARROW(DWORD, numBuffersTotal),
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -2160,7 +2160,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
1,
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -2255,9 +2255,9 @@ ntsa::Error Iocp::receive(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
event->d_receiveData_p = data;

if (wantEndpoint) {
BSLMF_ASSERT(sizeof(event->d_operationArena) >=
BSLMF_ASSERT(sizeof(event->d_address) >=
sizeof(sockaddr_storage));
event->d_operationArenaSize = sizeof(sockaddr_storage) + 1;
event->d_numBytesIndicated = sizeof(sockaddr_storage) + 1;
}

NTCP_IOCP_LOG_EVENT_STARTING(event);
Expand Down Expand Up @@ -2302,8 +2302,8 @@ ntsa::Error Iocp::receive(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
NTCCFG_WARNING_NARROW(DWORD, numBuffersTotal),
0,
&wsaFlags,
reinterpret_cast<sockaddr*>(event->d_operationArena),
&event->d_operationArenaSize,
reinterpret_cast<sockaddr*>(event->d_address),
&event->d_numBytesIndicated,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);

Expand Down
Loading

0 comments on commit b3a640b

Please sign in to comment.