Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement an ntci::Proactor using io_uring #24

Merged
merged 17 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmake/templates/ntccfg_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace ntccfg {
// Build with support for the local (a.k.a. Unix) address family.
#define NTCCFG_BUILD_WITH_ADDRESS_FAMILY_LOCAL @NTF_BUILD_WITH_ADDRESS_FAMILY_LOCAL@

// Build with support for the Transmission Control Protocol (TCP) transport
// Build with support for the Transmission Control Protocol (TCP) transport
// protocol.
#define NTCCFG_BUILD_WITH_TRANSPORT_PROTOCOL_TCP @NTF_BUILD_WITH_TRANSPORT_PROTOCOL_TCP@

Expand Down Expand Up @@ -93,6 +93,10 @@ namespace ntccfg {
// driver for a proactor. This driver is available on Windows.
#define NTC_BUILD_WITH_IOCP @NTF_BUILD_WITH_IOCP@

// Build with support for being able to configure 'io_uring' as the
// driver for a proactor. This driver is available on Linux.
#define NTC_BUILD_WITH_IORING @NTF_BUILD_WITH_IORING@

// Build with support for being able to configure a processing model where
// any thread can process I/O for a socket, rather than just a particular
// thread chosen at the time the socket was created.
Expand Down
14 changes: 14 additions & 0 deletions configure
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ else
NTF_CONFIGURE_WITH_IOCP=0
fi

if [[ "${NTF_CONFIGURE_UNAME}" == "Linux" ]]; then
if [[ -z "${NTF_CONFIGURE_WITH_IORING}" ]]; then
NTF_CONFIGURE_WITH_IORING=0
fi
else
NTF_CONFIGURE_WITH_IORING=0
fi

if [[ -z "${NTF_CONFIGURE_WITH_DYNAMIC_LOAD_BALANCING}" ]]; then
NTF_CONFIGURE_WITH_DYNAMIC_LOAD_BALANCING=1
fi
Expand Down Expand Up @@ -323,6 +331,7 @@ usage()
echo " --with-pollset Enable the reactor driver implemented with 'pollset' (AIX only) [${NTF_CONFIGURE_WITH_POLLSET}]"
echo " --with-kqueue Enable the reactor driver implemented with 'kqueue' on (Darwin and FreeBSD only) [${NTF_CONFIGURE_WITH_KQUEUE}]"
echo " --with-iocp Enable the proactor driver implemented with I/O completion ports (Windows only) [${NTF_CONFIGURE_WITH_IOCP}]"
echo " --with-ioring Enable the proactor driver implemented with 'io_uring' (Linux only) [${NTF_CONFIGURE_WITH_IORING}]"

echo " --with-dynamic-load-balancing Enable processing I/O on any thread, rather than a single thread [${NTF_CONFIGURE_WITH_DYNAMIC_LOAD_BALANCING}]"
echo " --with-thread-scaling Enable automatic scaling of thread pools [${NTF_CONFIGURE_WITH_THREAD_SCALING}]"
Expand Down Expand Up @@ -472,6 +481,8 @@ while true ; do
NTF_CONFIGURE_WITH_KQUEUE=1 ; shift ;;
--with-iocp)
NTF_CONFIGURE_WITH_IOCP=1 ; shift ;;
--with-ioring)
NTF_CONFIGURE_WITH_IORING=1 ; shift ;;

--with-dynamic-load-balancing)
NTF_CONFIGURE_WITH_DYNAMIC_LOAD_BALANCING=1 ; shift ;;
Expand Down Expand Up @@ -565,6 +576,8 @@ while true ; do
NTF_CONFIGURE_WITH_KQUEUE=0 ; shift ;;
--without-iocp)
NTF_CONFIGURE_WITH_IOCP=0 ; shift ;;
--without-ioring)
NTF_CONFIGURE_WITH_IORING=0 ; shift ;;

--without-dynamic-load-balancing)
NTF_CONFIGURE_WITH_DYNAMIC_LOAD_BALANCING=0 ; shift ;;
Expand Down Expand Up @@ -666,6 +679,7 @@ export NTF_CONFIGURE_WITH_EVENTPORT
export NTF_CONFIGURE_WITH_POLLSET
export NTF_CONFIGURE_WITH_KQUEUE
export NTF_CONFIGURE_WITH_IOCP
export NTF_CONFIGURE_WITH_IORING

export NTF_CONFIGURE_WITH_DYNAMIC_LOAD_BALANCING
export NTF_CONFIGURE_WITH_THREAD_SCALING
Expand Down
11 changes: 11 additions & 0 deletions groups/ntc/ntcf/ntcf_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ BSLS_IDENT_RCSID(ntcf_system_cpp, "$Id$ $CSID$")
#include <ntco_epoll.h>
#include <ntco_eventport.h>
#include <ntco_iocp.h>
#include <ntco_ioring.h>
#include <ntco_kqueue.h>
#include <ntco_poll.h>
#include <ntco_pollset.h>
Expand Down Expand Up @@ -237,6 +238,16 @@ ntsa::Error System::initialize()
ntcs::Plugin::registerProactorFactory("IOCP", iocpFactory);
}
#endif
#endif

#if NTC_BUILD_WITH_IORING
#if defined(BSLS_PLATFORM_OS_LINUX)
if (ntco::IoRingFactory::isSupported()) {
bsl::shared_ptr<ntco::IoRingFactory> iocpFactory;
iocpFactory.createInplace(allocator, allocator);
ntcs::Plugin::registerProactorFactory("IORING", iocpFactory);
}
#endif
#endif

bsl::atexit(&System::exit);
Expand Down
7 changes: 4 additions & 3 deletions groups/ntc/ntcf/ntcf_system.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ using namespace BloombergLP;
// "POLLSET" Implementation using the pollset API.
// "KQUEUE" Implementation using kqueue/kevent
// "IOCP" Implementation using I/O completion ports
// #define NTCF_SYSTEM_TEST_DRIVER_TYPE "EPOLL"
// "IOCP" Implementation using I/O rings
// #define NTCF_SYSTEM_TEST_DRIVER_TYPE "IORING"

// Uncomment to test a specific address family, instead of all address
// families.
Expand Down Expand Up @@ -5474,13 +5475,13 @@ void concern(const ConcernCallback& concernCallback,

const bsl::size_t MIN_THREADS = 4;
const bsl::size_t MAX_THREADS = 4;
const bsl::size_t LOAD_FACTOR = 10;
const bsl::size_t LOAD_FACTOR = 10000;

#else

const bsl::size_t MIN_THREADS = 2;
const bsl::size_t MAX_THREADS = 2;
const bsl::size_t LOAD_FACTOR = 10;
const bsl::size_t LOAD_FACTOR = 10000;

#endif

Expand Down
48 changes: 24 additions & 24 deletions groups/ntc/ntco/ntco_iocp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -920,19 +920,19 @@ void Iocp::wait(ntci::Waiter waiter)
return;
}

const bool wantedEndpoint = event->d_operationArenaSize > 0;
const bool wantedEndpoint = event->d_numBytesIndicated > 0;

if (wantedEndpoint) {
const bool haveEndpoint =
event->d_operationArenaSize <= sizeof(sockaddr_storage);
event->d_numBytesIndicated <= sizeof(sockaddr_storage);

if (haveEndpoint) {
ntsa::Endpoint endpoint;
error = ntsu::SocketUtil::decodeEndpoint(
&endpoint,
reinterpret_cast<const sockaddr_storage*>(
event->d_operationArena),
static_cast<bsl::size_t>(event->d_operationArenaSize));
event->d_address),
static_cast<bsl::size_t>(event->d_numBytesIndicated));
if (error) {
ntcs::Dispatch::announceReceived(
event->d_socket,
Expand Down Expand Up @@ -1411,7 +1411,7 @@ ntsa::Error Iocp::accept(const bsl::shared_ptr<ntci::ProactorSocket>& socket)
if (sourceEndpoint.ip().host().isV4()) {
rc = acceptEx(event->d_socket->handle(),
event->d_target,
event->d_operationArena,
event->d_address,
0,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
Expand All @@ -1421,7 +1421,7 @@ ntsa::Error Iocp::accept(const bsl::shared_ptr<ntci::ProactorSocket>& socket)
else if (sourceEndpoint.ip().host().isV6()) {
rc = acceptEx(event->d_socket->handle(),
event->d_target,
event->d_operationArena,
event->d_address,
0,
sizeof(sockaddr_in6) + 16,
sizeof(sockaddr_in6) + 16,
Expand All @@ -1435,7 +1435,7 @@ ntsa::Error Iocp::accept(const bsl::shared_ptr<ntci::ProactorSocket>& socket)
else if (sourceEndpoint.isLocal()) {
rc = acceptEx(event->d_socket->handle(),
event->d_target,
event->d_operationArena,
event->d_address,
0,
sizeof(sockaddr_un) + 16,
sizeof(sockaddr_un) + 16,
Expand Down Expand Up @@ -1605,7 +1605,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
{
bsl::size_t socketAddressSizeT;
ntsa::Error error = ntsu::SocketUtil::encodeEndpoint(
reinterpret_cast<sockaddr_storage*>(event->d_operationArena),
reinterpret_cast<sockaddr_storage*>(event->d_address),
&socketAddressSizeT,
options.endpoint().value());
if (error) {
Expand All @@ -1623,7 +1623,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
static_cast<DWORD>(numBuffersTotal),
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -1695,7 +1695,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
if (specifyEndpoint) {
bsl::size_t socketAddressSizeT;
ntsa::Error error = ntsu::SocketUtil::encodeEndpoint(
reinterpret_cast<sockaddr_storage*>(event->d_operationArena),
reinterpret_cast<sockaddr_storage*>(event->d_address),
&socketAddressSizeT,
options.endpoint().value());
if (error) {
Expand Down Expand Up @@ -1745,7 +1745,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
NTCCFG_WARNING_NARROW(DWORD, numBuffersTotal),
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -1811,7 +1811,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
NTCCFG_WARNING_NARROW(DWORD, numBuffersTotal),
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -1853,7 +1853,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
1,
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -1893,7 +1893,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
1,
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -1939,7 +1939,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
NTCCFG_WARNING_NARROW(DWORD, numBuffersTotal),
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -1985,7 +1985,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
NTCCFG_WARNING_NARROW(DWORD, numBuffersTotal),
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -2025,7 +2025,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
1,
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -2071,7 +2071,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
NTCCFG_WARNING_NARROW(DWORD, numBuffersTotal),
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -2118,7 +2118,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
NTCCFG_WARNING_NARROW(DWORD, numBuffersTotal),
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -2160,7 +2160,7 @@ ntsa::Error Iocp::send(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
1,
0,
0,
reinterpret_cast<sockaddr*>(event->d_operationArena),
reinterpret_cast<sockaddr*>(event->d_address),
socketAddressSize,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);
Expand Down Expand Up @@ -2255,9 +2255,9 @@ ntsa::Error Iocp::receive(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
event->d_receiveData_p = data;

if (wantEndpoint) {
BSLMF_ASSERT(sizeof(event->d_operationArena) >=
BSLMF_ASSERT(sizeof(event->d_address) >=
sizeof(sockaddr_storage));
event->d_operationArenaSize = sizeof(sockaddr_storage) + 1;
event->d_numBytesIndicated = sizeof(sockaddr_storage) + 1;
}

NTCP_IOCP_LOG_EVENT_STARTING(event);
Expand Down Expand Up @@ -2302,8 +2302,8 @@ ntsa::Error Iocp::receive(const bsl::shared_ptr<ntci::ProactorSocket>& socket,
NTCCFG_WARNING_NARROW(DWORD, numBuffersTotal),
0,
&wsaFlags,
reinterpret_cast<sockaddr*>(event->d_operationArena),
&event->d_operationArenaSize,
reinterpret_cast<sockaddr*>(event->d_address),
&event->d_numBytesIndicated,
reinterpret_cast<OVERLAPPED*>(event.get()),
0);

Expand Down
Loading