Skip to content

Experimental io_uring support #194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ set(TRANTOR_SOURCES
trantor/net/inner/TimerQueue.cc
trantor/net/inner/poller/EpollPoller.cc
trantor/net/inner/poller/KQueue.cc
trantor/net/inner/poller/PollPoller.cc)
trantor/net/inner/poller/IoUringPoller.cc)
set(private_headers
trantor/net/inner/Acceptor.h
trantor/net/inner/Connector.h
Expand All @@ -110,7 +110,8 @@ set(private_headers
trantor/net/inner/TimerQueue.h
trantor/net/inner/poller/EpollPoller.h
trantor/net/inner/poller/KQueue.h
trantor/net/inner/poller/PollPoller.h)
trantor/net/inner/poller/PollPoller.h
trantor/net/inner/poller/IoUringPoller.h)

if(WIN32)
set(TRANTOR_SOURCES
Expand Down Expand Up @@ -142,6 +143,18 @@ if (BUILD_C-ARES)
endif()
endif ()

set(HAVE_LIBURING NO)
if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
find_package(LibUring)
if (LibUring_FOUND)
set(HAVE_LIBURING TRUE)
message(STATUS "LibUring found!")
target_link_libraries(${PROJECT_NAME} PRIVATE ${LibUring_LIBRARIES})
target_include_directories(${PROJECT_NAME} PRIVATE ${LibUring_INCLUDE_DIR})
target_compile_definitions(${PROJECT_NAME} PRIVATE ${USE_LIBURING})
endif ()
endif ()

if(HAVE_C-ARES)
target_link_libraries(${PROJECT_NAME} PRIVATE c-ares_lib)
set(TRANTOR_SOURCES
Expand Down
3 changes: 3 additions & 0 deletions cmake/templates/TrantorConfig.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ endif()
if(@c-ares_FOUND@)
find_dependency(c-ares)
endif()
if(@LibUring_FOUND@)
find_dependency(LibUring)
endif()
find_dependency(Threads)
# Compute paths

Expand Down
30 changes: 30 additions & 0 deletions cmake_modules/FindLibUring.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

find_path(LIBURING_INCLUDE_DIR NAMES liburing.h)
mark_as_advanced(LIBURING_INCLUDE_DIR)

find_library(LIBURING_LIBRARY NAMES uring)
mark_as_advanced(LIBURING_LIBRARY)

include(FindPackageHandleStandardArgs)
FIND_PACKAGE_HANDLE_STANDARD_ARGS(
LibUring
REQUIRED_VARS LIBURING_LIBRARY LIBURING_INCLUDE_DIR
)

if(LIBURING_FOUND)
set(LibUring_LIBRARIES ${LIBURING_LIBRARY})
set(LibUring_INCLUDE_DIRS ${LIBURING_INCLUDE_DIR})
endif()
1 change: 1 addition & 0 deletions trantor/net/Channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ class TRANTOR_EXPORT Channel : NonCopyable
friend class EpollPoller;
friend class KQueue;
friend class PollPoller;
friend class IoUringPoller;
void update();
void handleEvent();
void handleEventSafely();
Expand Down
31 changes: 30 additions & 1 deletion trantor/net/inner/Poller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
*/

#include "Poller.h"
#include <trantor/utils/Logger.h>
#ifdef __linux__
#include "poller/IoUringPoller.h"
#include "poller/EpollPoller.h"
#include <sys/utsname.h>
#elif defined _WIN32
#include "Wepoll.h"
#include "poller/EpollPoller.h"
Expand All @@ -26,7 +29,33 @@
using namespace trantor;
Poller *Poller::newPoller(EventLoop *loop)
{
#if defined __linux__ || defined _WIN32
#if defined __linux__
#if USE_LIBURING
// Our io_uring poller requires kerenel >= 5.6 for polling file descriptors
utsname buffer;
uname(&buffer);
std::string release(buffer.release);
auto dot1 = release.find('.');
if (dot1 == std::string::npos)
{
LOG_FATAL << "Unexpected version string format";
abort();
}
auto dot2 = release.substr(dot1).find('.');
if (dot2 == std::string::npos)
{
LOG_FATAL << "Unexpected version string format";
abort();
}
int majorVersion = stoi(release.substr(0, dot1));
int minorVersion = stoi(release.substr(dot1 + 1, dot2 - dot1));
bool supportIoUring =
majorVersion > 5 || (majorVersion == 5 && minorVersion >= 6);
if (supportIoUring)
return new IoUringPoller(loop);
#endif
return new EpollPoller(loop);
#elif defined _WIN32
return new EpollPoller(loop);
#elif defined __FreeBSD__ || defined __OpenBSD__ || defined __APPLE__
return new KQueue(loop);
Expand Down
256 changes: 256 additions & 0 deletions trantor/net/inner/poller/IoUringPoller.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
/**
*
* IoUringPoller.cc
* Martin Chang
*
* Public header file in trantor lib.
*
* Copyright 2021, An Tao. All rights reserved.
* Use of this source code is governed by a BSD-style license
* that can be found in the License file.
*
*
*/

#include <trantor/utils/Logger.h>
#include "Channel.h"
#include "IoUringPoller.h"
#include <assert.h>
#include <iostream>

#if defined(__linux__) && USE_LIBURING
#include <poll.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <strings.h>
namespace trantor
{
static_assert(EPOLLIN == POLLIN, "EPOLLIN != POLLIN");
static_assert(EPOLLPRI == POLLPRI, "EPOLLPRI != POLLPRI");
static_assert(EPOLLOUT == POLLOUT, "EPOLLOUT != POLLOUT");
static_assert(EPOLLRDHUP == POLLRDHUP, "EPOLLRDHUP != POLLRDHUP");
static_assert(EPOLLERR == POLLERR, "EPOLLERR != POLLERR");
static_assert(EPOLLHUP == POLLHUP, "EPOLLHUP != POLLHUP");

namespace
{
const int kNew = -1;
const int kAdded = 1;
const int kDeleted = 2;
} // namespace
IoUringPoller::IoUringPoller(EventLoop *loop)
: Poller(loop),
epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
events_(kInitEventListSize)
{
if (io_uring_queue_init(kInitEventListSize, &ring_, 0) < 0)
{
LOG_FATAL << "Failed to initialize io_uring: " << strerror(-errno);
abort();
}
enqueueEpollPoll();
io_uring_submit(&ring_);
}
IoUringPoller::~IoUringPoller()
{
close(epollfd_);
io_uring_queue_exit(&ring_);
}
void IoUringPoller::enqueueEpollPoll()
{
auto sqe = io_uring_get_sqe(&ring_);
io_uring_prep_poll_add(sqe, epollfd_, POLLIN);
auto data = new IoUringData;
data->type = IoUringData::Type::Epoll;
io_uring_sqe_set_data(sqe, data);
}
void IoUringPoller::poll(int timeoutMs, ChannelList *activeChannels)
{
(void)timeoutMs;
// Submit IO that might have been submitted since last time
io_uring_submit(&ring_);
bool epollHasEvent = false;
while (true)
{
struct io_uring_cqe *cqe;
int status = io_uring_peek_cqe(&ring_, &cqe);
if (status < 0)
LOG_SYSERR << "io_uring_peek_cqe";
if (status == 0)
break;

auto data = (IoUringData *)io_uring_cqe_get_data(cqe);
if (data->type == IoUringData::Type::Epoll)
{
epollHasEvent = true;
enqueueEpollPoll();
}

io_uring_cqe_seen(&ring_, cqe);
delete data;
}
if (!epollHasEvent)
{
io_uring_submit(&ring_);
return;
}

int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(),
static_cast<int>(events_.size()),
0);
int savedErrno = errno;
// Timestamp now(Timestamp::now());
if (numEvents > 0)
{
// LOG_TRACE << numEvents << " events happended";
fillActiveChannels(numEvents, activeChannels);
if (static_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size() * 2);
}
}
else if (numEvents == 0)
{
// std::cout << "nothing happended" << std::endl;
}
else
{
// error happens, log uncommon ones
if (savedErrno != EINTR)
{
errno = savedErrno;
LOG_SYSERR << "EPollIoUringPoller::poll()";
}
}
io_uring_submit(&ring_);
}
void IoUringPoller::fillActiveChannels(int numEvents,
ChannelList *activeChannels) const
{
assert(static_cast<size_t>(numEvents) <= events_.size());
for (int i = 0; i < numEvents; ++i)
{
Channel *channel = static_cast<Channel *>(events_[i].data.ptr);
#ifndef NDEBUG
int fd = channel->fd();
ChannelMap::const_iterator it = channels_.find(fd);
assert(it != channels_.end());
assert(it->second == channel);
#endif
channel->setRevents(events_[i].events);
activeChannels->push_back(channel);
}
// LOG_TRACE<<"active Channels num:"<<activeChannels->size();
}
void IoUringPoller::updateChannel(Channel *channel)
{
assertInLoopThread();
assert(channel->fd() >= 0);

const int index = channel->index();
// LOG_TRACE << "fd = " << channel->fd()
// << " events = " << channel->events() << " index = " << index;
if (index == kNew || index == kDeleted)
{
// a new one, add with EPOLL_CTL_ADD
#ifndef NDEBUG
int fd = channel->fd();
if (index == kNew)
{
assert(channels_.find(fd) == channels_.end());
channels_[fd] = channel;
}
else
{ // index == kDeleted
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
}
#endif
channel->setIndex(kAdded);
update(EPOLL_CTL_ADD, channel);
}
else
{
// update existing one with EPOLL_CTL_MOD/DEL
#ifndef NDEBUG
int fd = channel->fd();
(void)fd;
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
#endif
assert(index == kAdded);
if (channel->isNoneEvent())
{
update(EPOLL_CTL_DEL, channel);
channel->setIndex(kDeleted);
}
else
{
update(EPOLL_CTL_MOD, channel);
}
}
}
void IoUringPoller::removeChannel(Channel *channel)
{
IoUringPoller::assertInLoopThread();
#ifndef NDEBUG
int fd = channel->fd();
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
size_t n = channels_.erase(fd);
(void)n;
assert(n == 1);
#endif
assert(channel->isNoneEvent());
int index = channel->index();
assert(index == kAdded || index == kDeleted);
if (index == kAdded)
{
update(EPOLL_CTL_DEL, channel);
}
channel->setIndex(kNew);
}
void IoUringPoller::update(int operation, Channel *channel)
{
struct epoll_event event;
memset(&event, 0, sizeof(event));
event.events = channel->events();
event.data.ptr = channel;
int fd = channel->fd();
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
{
if (operation == EPOLL_CTL_DEL)
{
// LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) <<
// " fd =" << fd;
}
else
{
// LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation)
// << " fd =" << fd;
}
}
}
} // namespace trantor
#else
namespace trantor
{
IoUringPoller::IoUringPoller(EventLoop *loop) : Poller(loop)
{
assert(false);
}
IoUringPoller::~IoUringPoller()
{
}
void IoUringPoller::poll(int, ChannelList *)
{
}
void IoUringPoller::updateChannel(Channel *)
{
}
void IoUringPoller::removeChannel(Channel *)
{
}
} // namespace trantor
#endif
Loading