diff --git a/CMakeLists.txt b/CMakeLists.txt index 3bfa42ee3cb..f47c199eaeb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -102,6 +102,7 @@ find_package(OpenSSL) # Check for IO faculties check_symbol_exists(epoll_create "sys/epoll.h" TS_USE_EPOLL) check_symbol_exists(kqueue "sys/event.h" TS_USE_KQUEUE) +set(CMAKE_REQUIRED_LIBRARIES uring) check_symbol_exists(io_uring_queue_init "liburing.h" HAVE_IOURING) check_symbol_exists(getresuid unistd.h HAVE_GETRESUID) check_symbol_exists(getresgid unistd.h HAVE_GETRESGID) @@ -111,9 +112,10 @@ check_symbol_exists(eventfd sys/eventfd.h HAVE_EVENTFD) check_symbol_exists(SSL_CTX_set_tlsext_ticket_key_cb openssl/ssl.h HAVE_SSL_CTX_SET_TLSEXT_TICKET_KEY_CB) option(USE_IOURING "Use experimental io_uring (linux only)" 0) -if (HAVE_IOURING AND USE_IOUIRNG) - set(TS_USE_LINUX_IO_URING) -endif(HAVE_IOURING AND USE_IOUIRNG) +if (HAVE_IOURING AND USE_IOURING) + message(Using io_uring) + set(TS_USE_LINUX_IO_URING 1) +endif(HAVE_IOURING AND USE_IOURING) # Check ssl functionality list(APPEND CMAKE_REQUIRED_INCLUDES ${OPENSSL_INCLUDE_DIR}) diff --git a/configure.ac b/configure.ac index a9c3c93f50b..1e820b8d8b1 100644 --- a/configure.ac +++ b/configure.ac @@ -1686,6 +1686,7 @@ AC_ARG_ENABLE([experimental-linux-io-uring], [enable_linux_io_uring="${enableval}"], [enable_linux_io_uring=no] ) +AM_CONDITIONAL([ENABLE_IO_URING], [ test "x${enable_linux_io_uring}" = "xyes" ]) AS_IF([test "x$enable_linux_io_uring" = "xyes"], [ URING_LIBS="-luring" @@ -2316,6 +2317,7 @@ AC_SUBST([default_stack_size], [$with_default_stack_size]) # iocore_include_dirs="\ -I\$(abs_top_srcdir)/iocore/eventsystem \ +-I\$(abs_top_srcdir)/iocore/io_uring \ -I\$(abs_top_srcdir)/iocore/net \ -I\$(abs_top_srcdir)/iocore/net/quic \ -I\$(abs_top_srcdir)/iocore/aio \ @@ -2367,6 +2369,7 @@ AC_CONFIG_FILES([ include/tscore/ink_config.h iocore/Makefile iocore/aio/Makefile + iocore/io_uring/Makefile iocore/cache/Makefile iocore/dns/Makefile iocore/eventsystem/Makefile diff --git a/iocore/CMakeLists.txt b/iocore/CMakeLists.txt index da34ad8f263..21a360528ae 100644 --- a/iocore/CMakeLists.txt +++ b/iocore/CMakeLists.txt @@ -17,6 +17,7 @@ add_subdirectory(eventsystem) +add_subdirectory(io_uring) add_subdirectory(net) add_subdirectory(aio) add_subdirectory(dns) @@ -27,6 +28,7 @@ add_subdirectory(cache) set(IOCORE_INCLUDE_DIRS ${CMAKE_SOURCE_DIR}/iocore/eventsystem ${CMAKE_SOURCE_DIR}/iocore/dns + ${CMAKE_SOURCE_DIR}/iocore/io_uring ${CMAKE_SOURCE_DIR}/iocore/aio ${CMAKE_SOURCE_DIR}/iocore/net ${CMAKE_SOURCE_DIR}/iocore/cache diff --git a/iocore/Makefile.am b/iocore/Makefile.am index 5aae15ea552..fb00430bb88 100644 --- a/iocore/Makefile.am +++ b/iocore/Makefile.am @@ -17,3 +17,6 @@ # limitations under the License. SUBDIRS = eventsystem net aio dns hostdb utils cache +if ENABLE_IO_URING +SUBDIRS += io_uring +endif diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc index 26d281194f1..415cdd37b84 100644 --- a/iocore/aio/AIO.cc +++ b/iocore/aio/AIO.cc @@ -73,12 +73,10 @@ RecInt aio_io_uring_wq_unbounded = 0; RecRawStatBlock *aio_rsb = nullptr; Continuation *aio_err_callbck = nullptr; // AIO Stats -std::atomic aio_num_read = 0; -std::atomic aio_bytes_read = 0; -std::atomic aio_num_write = 0; -std::atomic aio_bytes_written = 0; -std::atomic io_uring_submissions = 0; -std::atomic io_uring_completions = 0; +std::atomic aio_num_read = 0; +std::atomic aio_bytes_read = 0; +std::atomic aio_num_write = 0; +std::atomic aio_bytes_written = 0; /* * Stats @@ -534,87 +532,12 @@ AIOThreadInfo::aio_thread_main(AIOThreadInfo *thr_info) #elif AIO_MODE == AIO_MODE_IO_URING -std::atomic aio_main_wq_fd; - -DiskHandler::DiskHandler() -{ - io_uring_params p{}; - - if (aio_io_uring_attach_wq > 0) { - int wq_fd = get_main_queue_fd(); - if (wq_fd > 0) { - p.flags = IORING_SETUP_ATTACH_WQ; - p.wq_fd = wq_fd; - } - } - - if (aio_io_uring_sq_poll_ms > 0) { - p.flags |= IORING_SETUP_SQPOLL; - p.sq_thread_idle = aio_io_uring_sq_poll_ms; - } - - int ret = io_uring_queue_init_params(aio_io_uring_queue_entries, &ring, &p); - if (ret < 0) { - throw std::runtime_error(strerror(-ret)); - } - - /* no sharing for non-fixed either */ - if (aio_io_uring_sq_poll_ms && !(p.features & IORING_FEAT_SQPOLL_NONFIXED)) { - throw std::runtime_error("No SQPOLL sharing with nonfixed"); - } - - // assign this handler to the thread - // TODO(cmcfarlen): maybe a bad place for this! - this_ethread()->diskHandler = this; -} - -DiskHandler::~DiskHandler() -{ - io_uring_queue_exit(&ring); -} - -void -DiskHandler::set_main_queue(DiskHandler *dh) -{ - dh->set_wq_max_workers(aio_io_uring_wq_bounded, aio_io_uring_wq_unbounded); - aio_main_wq_fd.store(dh->ring.ring_fd); -} - -int -DiskHandler::get_main_queue_fd() -{ - return aio_main_wq_fd.load(); -} - -int -DiskHandler::set_wq_max_workers(unsigned int bounded, unsigned int unbounded) -{ - if (bounded == 0 && unbounded == 0) { - return 0; - } - unsigned int args[2] = {bounded, unbounded}; - int result = io_uring_register_iowq_max_workers(&ring, args); - return result; -} - -std::pair -DiskHandler::get_wq_max_workers() -{ - unsigned int args[2] = {0, 0}; - io_uring_register_iowq_max_workers(&ring, args); - return std::make_pair(args[0], args[1]); -} - -void -DiskHandler::submit() -{ - io_uring_submissions.fetch_add(io_uring_submit(&ring)); -} +#include "I_IO_URING.h" void -DiskHandler::handle_cqe(io_uring_cqe *cqe) +ink_aiocb::handle_complete(io_uring_cqe *cqe) { - AIOCallback *op = static_cast(io_uring_cqe_get_data(cqe)); + AIOCallback *op = this_op; op->aio_result = static_cast(cqe->res); op->link.prev = nullptr; @@ -643,68 +566,15 @@ DiskHandler::handle_cqe(io_uring_cqe *cqe) } } -void -DiskHandler::service() -{ - io_uring_cqe *cqe = nullptr; - io_uring_peek_cqe(&ring, &cqe); - while (cqe) { - handle_cqe(cqe); - io_uring_completions.fetch_add(1); - io_uring_cqe_seen(&ring, cqe); - - cqe = nullptr; - io_uring_peek_cqe(&ring, &cqe); - } -} - -void -DiskHandler::submit_and_wait(int ms) -{ - ink_hrtime t = ink_hrtime_from_msec(ms); - timespec ts = ink_hrtime_to_timespec(t); - __kernel_timespec timeout = {ts.tv_sec, ts.tv_nsec}; - io_uring_cqe *cqe = nullptr; - - io_uring_submit_and_wait_timeout(&ring, &cqe, 1, &timeout, nullptr); - while (cqe) { - handle_cqe(cqe); - io_uring_completions.fetch_add(1); - io_uring_cqe_seen(&ring, cqe); - - cqe = nullptr; - io_uring_peek_cqe(&ring, &cqe); - } -} - -int -DiskHandler::register_eventfd() -{ - int fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); - - io_uring_register_eventfd(&ring, fd); - - return fd; -} - -DiskHandler * -DiskHandler::local_context() -{ - // TODO(cmcfarlen): load config - thread_local DiskHandler threadContext; - - return &threadContext; -} - int ink_aio_read(AIOCallback *op_in, int /* fromAPI ATS_UNUSED */) { - EThread *t = this_ethread(); - AIOCallback *op = op_in; + IOUringContext *ur = IOUringContext::local_context(); + AIOCallback *op = op_in; while (op) { - io_uring_sqe *sqe = t->diskHandler->next_sqe(); + op->aiocb.this_op = op; + io_uring_sqe *sqe = ur->next_sqe(&op->aiocb); io_uring_prep_read(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset); - io_uring_sqe_set_data(sqe, op); op->aiocb.aio_lio_opcode = LIO_READ; if (op->then) { sqe->flags |= IOSQE_IO_LINK; @@ -720,12 +590,12 @@ ink_aio_read(AIOCallback *op_in, int /* fromAPI ATS_UNUSED */) int ink_aio_write(AIOCallback *op_in, int /* fromAPI ATS_UNUSED */) { - EThread *t = this_ethread(); - AIOCallback *op = op_in; + IOUringContext *ur = IOUringContext::local_context(); + AIOCallback *op = op_in; while (op) { - io_uring_sqe *sqe = t->diskHandler->next_sqe(); + op->aiocb.this_op = op; + io_uring_sqe *sqe = ur->next_sqe(&op->aiocb); io_uring_prep_write(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset); - io_uring_sqe_set_data(sqe, op); op->aiocb.aio_lio_opcode = LIO_WRITE; if (op->then) { sqe->flags |= IOSQE_IO_LINK; diff --git a/iocore/aio/CMakeLists.txt b/iocore/aio/CMakeLists.txt index 944c072b2c6..02565097149 100644 --- a/iocore/aio/CMakeLists.txt +++ b/iocore/aio/CMakeLists.txt @@ -18,4 +18,4 @@ add_library(aio) target_sources(aio PRIVATE AIO.cc Inline.cc) -target_include_directories(aio PRIVATE ${CMAKE_SOURCE_DIR}/iocore/eventsystem) +target_include_directories(aio PRIVATE ${CMAKE_SOURCE_DIR}/iocore/eventsystem ${CMAKE_SOURCE_DIR}/iocore/io_uring) diff --git a/iocore/aio/I_AIO.h b/iocore/aio/I_AIO.h index eb3cc58eacc..263c3c8d838 100644 --- a/iocore/aio/I_AIO.h +++ b/iocore/aio/I_AIO.h @@ -68,18 +68,21 @@ typedef struct io_event ink_io_event_t; #define aio_buf u.c.buf #elif AIO_MODE == AIO_MODE_IO_URING -#include +#include "I_IO_URING.h" struct AIOCallback; -struct ink_aiocb { +struct ink_aiocb : public IOUringCompletionHandler { int aio_fildes = -1; /* file descriptor or status: AIO_NOT_IN_PROGRESS */ void *aio_buf = nullptr; /* buffer location */ size_t aio_nbytes = 0; /* length of transfer */ off_t aio_offset = 0; /* file offset */ - int aio_lio_opcode = 0; /* listio operation */ - int aio_state = 0; /* state flag for List I/O */ - AIOCallback *aio_op = nullptr; + int aio_lio_opcode = 0; /* listio operation */ + int aio_state = 0; /* state flag for List I/O */ + AIOCallback *this_op = nullptr; + AIOCallback *aio_op = nullptr; + + void handle_complete(io_uring_cqe *) override; }; #else @@ -153,41 +156,6 @@ struct DiskHandler : public Continuation { }; #endif -#if AIO_MODE == AIO_MODE_IO_URING - -class DiskHandler -{ -public: - DiskHandler(); - ~DiskHandler(); - - io_uring_sqe * - next_sqe() - { - return io_uring_get_sqe(&ring); - } - - int set_wq_max_workers(unsigned int bounded, unsigned int unbounded); - std::pair get_wq_max_workers(); - - void submit(); - void service(); - void submit_and_wait(int ms); - - int register_eventfd(); - - static DiskHandler *local_context(); - static void set_main_queue(DiskHandler *); - static int get_main_queue_fd(); - -private: - io_uring ring; - - void handle_cqe(io_uring_cqe *); -}; - -#endif - void ink_aio_init(ts::ModuleVersion version); int ink_aio_start(); void ink_aio_set_callback(Continuation *error_callback); diff --git a/iocore/aio/Makefile.am b/iocore/aio/Makefile.am index 9e54824442a..4650e23ff79 100644 --- a/iocore/aio/Makefile.am +++ b/iocore/aio/Makefile.am @@ -18,6 +18,7 @@ AM_CPPFLAGS += \ -I$(abs_top_srcdir)/iocore/eventsystem \ + -I$(abs_top_srcdir)/iocore/io_uring \ -I$(abs_top_srcdir)/include \ -I$(abs_top_srcdir)/lib \ @SWOC_INCLUDES@ diff --git a/iocore/cache/CMakeLists.txt b/iocore/cache/CMakeLists.txt index f518c40ea85..369516189cb 100644 --- a/iocore/cache/CMakeLists.txt +++ b/iocore/cache/CMakeLists.txt @@ -34,6 +34,7 @@ add_library(inkcache STATIC ) target_include_directories(inkcache PRIVATE ${CMAKE_SOURCE_DIR}/iocore/eventsystem + ${CMAKE_SOURCE_DIR}/iocore/io_uring ${CMAKE_SOURCE_DIR}/iocore/dns ${CMAKE_SOURCE_DIR}/iocore/aio ${CMAKE_SOURCE_DIR}/iocore/net diff --git a/iocore/dns/CMakeLists.txt b/iocore/dns/CMakeLists.txt index dcb7fbf1259..beae3f1fba5 100644 --- a/iocore/dns/CMakeLists.txt +++ b/iocore/dns/CMakeLists.txt @@ -25,6 +25,7 @@ add_library(inkdns STATIC target_include_directories(inkdns PRIVATE ${CMAKE_SOURCE_DIR}/iocore/eventsystem ${CMAKE_SOURCE_DIR}/iocore/dns + ${CMAKE_SOURCE_DIR}/iocore/io_uring ${CMAKE_SOURCE_DIR}/iocore/aio ${CMAKE_SOURCE_DIR}/iocore/net ${CMAKE_SOURCE_DIR}/iocore/cache diff --git a/iocore/hostdb/CMakeLists.txt b/iocore/hostdb/CMakeLists.txt index c41ce698d0f..5158ca62ffa 100644 --- a/iocore/hostdb/CMakeLists.txt +++ b/iocore/hostdb/CMakeLists.txt @@ -25,6 +25,7 @@ add_library(inkhostdb STATIC ) target_include_directories(inkhostdb PRIVATE ${CMAKE_SOURCE_DIR}/iocore/eventsystem + ${CMAKE_SOURCE_DIR}/iocore/io_uring ${CMAKE_SOURCE_DIR}/iocore/dns ${CMAKE_SOURCE_DIR}/iocore/aio ${CMAKE_SOURCE_DIR}/iocore/net diff --git a/iocore/io_uring/CMakeLists.txt b/iocore/io_uring/CMakeLists.txt new file mode 100644 index 00000000000..ed35138cc3b --- /dev/null +++ b/iocore/io_uring/CMakeLists.txt @@ -0,0 +1,34 @@ +####################### +# +# Licensed to the Apache Software Foundation (ASF) under one or more contributor license +# agreements. See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# +####################### + +add_library(inkuring STATIC + io_uring.cc +) +include_directories( + ${CMAKE_SOURCE_DIR}/iocore/eventsystem + ${CMAKE_SOURCE_DIR}/iocore/dns + ${CMAKE_SOURCE_DIR}/iocore/aio + ${CMAKE_SOURCE_DIR}/iocore/io_uring + ${CMAKE_SOURCE_DIR}/iocore/net + ${CMAKE_SOURCE_DIR}/iocore/cache + ${CMAKE_SOURCE_DIR}/iocore/hostdb +) + +add_executable(test_iouring + unit_tests/test_diskIO.cc) +target_link_libraries(test_iouring PRIVATE tscore inkuring tscpputil libswoc uring) +target_include_directories(test_iouring PRIVATE ${CMAKE_SOURCE_DIR}/include ${CATCH_INCLUDE_DIR}) diff --git a/iocore/io_uring/I_IO_URING.h b/iocore/io_uring/I_IO_URING.h new file mode 100644 index 00000000000..fe8c8ca7a0b --- /dev/null +++ b/iocore/io_uring/I_IO_URING.h @@ -0,0 +1,85 @@ +/** @file + +Linux io_uring helper library + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once + +#include +#include + +struct IOUringConfig { + int queue_entries = 1024; + int sq_poll_ms = 0; + int attach_wq = 0; + int wq_bounded = 0; + int wq_unbounded = 0; +}; + +class IOUringCompletionHandler +{ +public: + virtual void handle_complete(io_uring_cqe *) = 0; +}; + +class IOUringContext +{ +public: + IOUringContext(); + ~IOUringContext(); + + IOUringContext(const IOUringContext &) = delete; + + io_uring_sqe * + next_sqe(IOUringCompletionHandler *handler) + { + io_uring_sqe *result = io_uring_get_sqe(&ring); + if (result != nullptr) { + io_uring_sqe_set_data(result, handler); + } + return result; + } + + int set_wq_max_workers(unsigned int bounded, unsigned int unbounded); + std::pair get_wq_max_workers(); + + void submit(); + void service(); + void submit_and_wait(int ms); + + int register_eventfd(); + + // assigns the global iouring config + static void set_config(const IOUringConfig &); + static IOUringContext *local_context(); + static void set_main_queue(IOUringContext *); + static int get_main_queue_fd(); + +private: + io_uring ring = {}; + int evfd = -1; + + void handle_cqe(io_uring_cqe *); + static IOUringConfig config; +}; + +extern std::atomic io_uring_submissions; +extern std::atomic io_uring_completions; diff --git a/iocore/io_uring/Makefile.am b/iocore/io_uring/Makefile.am new file mode 100644 index 00000000000..1434fd6ee45 --- /dev/null +++ b/iocore/io_uring/Makefile.am @@ -0,0 +1,53 @@ +# Makefile.am for traffic/iocore/io_uring +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +AM_CPPFLAGS += \ + -I$(abs_top_srcdir)/iocore/eventsystem \ + -I$(abs_top_srcdir)/include \ + -I$(abs_top_srcdir)/lib \ + $(TS_INCLUDES) + +noinst_LIBRARIES = libinkuring.a + +libinkuring_a_SOURCES = \ + io_uring.cc \ + I_IO_URING.h \ + P_IO_URING.h + + +check_PROGRAMS = test_diskIO + +test_LD_FLAGS = \ + @AM_LDFLAGS@ \ + @OPENSSL_LDFLAGS@ + +test_CPP_FLAGS = \ + $(AM_CPPFLAGS) \ + $(iocore_include_dirs) \ + -I$(abs_top_srcdir)/tests/include + +test_LD_ADD = \ + $(top_builddir)/src/tscore/libtscore.la \ + $(top_builddir)/iocore/io_uring/libinkuring.a \ + @HWLOC_LIBS@ + + +test_diskIO_SOURCES = unit_tests/test_diskIO.cc +test_diskIO_CPPFLAGS = $(test_CPP_FLAGS) +test_diskIO_LDFLAGS = $(test_LD_FLAGS) +test_diskIO_LDADD = $(test_LD_ADD) diff --git a/iocore/io_uring/P_IO_URING.h b/iocore/io_uring/P_IO_URING.h new file mode 100644 index 00000000000..78dff9854f7 --- /dev/null +++ b/iocore/io_uring/P_IO_URING.h @@ -0,0 +1,24 @@ +/** @file + +Linux io_uring helper library + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once diff --git a/iocore/io_uring/io_uring.cc b/iocore/io_uring/io_uring.cc new file mode 100644 index 00000000000..b4cab17319b --- /dev/null +++ b/iocore/io_uring/io_uring.cc @@ -0,0 +1,182 @@ +/** @file + +Linux io_uring helper library + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include +#include +#include +#include + +#include "I_IO_URING.h" +#include "tscore/ink_hrtime.h" + +std::atomic main_wq_fd; +std::atomic io_uring_submissions = 0; +std::atomic io_uring_completions = 0; + +IOUringConfig IOUringContext::config; + +void +IOUringContext::set_config(const IOUringConfig &cfg) +{ + config = cfg; +} + +IOUringContext::IOUringContext() +{ + io_uring_params p{}; + + if (config.attach_wq > 0) { + int wq_fd = get_main_queue_fd(); + if (wq_fd > 0) { + p.flags = IORING_SETUP_ATTACH_WQ; + p.wq_fd = wq_fd; + } + } + + if (config.sq_poll_ms > 0) { + p.flags |= IORING_SETUP_SQPOLL; + p.sq_thread_idle = config.sq_poll_ms; + } + + int ret = io_uring_queue_init_params(config.queue_entries, &ring, &p); + if (ret < 0) { + throw std::runtime_error(strerror(-ret)); + } + + /* no sharing for non-fixed either */ + if (config.sq_poll_ms && !(p.features & IORING_FEAT_SQPOLL_NONFIXED)) { + throw std::runtime_error("No SQPOLL sharing with nonfixed"); + } + + // assign this handler to the thread + // TODO(cmcfarlen): Assign in thread somewhere else + // this_ethread()->diskHandler = this; +} + +IOUringContext::~IOUringContext() +{ + if (evfd != -1) { + close(evfd); + evfd = -1; + } + io_uring_queue_exit(&ring); +} + +void +IOUringContext::set_main_queue(IOUringContext *dh) +{ + dh->set_wq_max_workers(config.wq_bounded, config.wq_unbounded); + main_wq_fd.store(dh->ring.ring_fd); +} + +int +IOUringContext::get_main_queue_fd() +{ + return main_wq_fd.load(); +} + +int +IOUringContext::set_wq_max_workers(unsigned int bounded, unsigned int unbounded) +{ + if (bounded == 0 && unbounded == 0) { + return 0; + } + unsigned int args[2] = {bounded, unbounded}; + int result = io_uring_register_iowq_max_workers(&ring, args); + return result; +} + +std::pair +IOUringContext::get_wq_max_workers() +{ + unsigned int args[2] = {0, 0}; + io_uring_register_iowq_max_workers(&ring, args); + return std::make_pair(args[0], args[1]); +} + +void +IOUringContext::submit() +{ + io_uring_submissions.fetch_add(io_uring_submit(&ring)); +} + +void +IOUringContext::handle_cqe(io_uring_cqe *cqe) +{ + auto *op = reinterpret_cast(io_uring_cqe_get_data(cqe)); + + op->handle_complete(cqe); +} + +void +IOUringContext::service() +{ + io_uring_cqe *cqe = nullptr; + io_uring_peek_cqe(&ring, &cqe); + while (cqe) { + handle_cqe(cqe); + io_uring_completions++; + io_uring_cqe_seen(&ring, cqe); + + cqe = nullptr; + io_uring_peek_cqe(&ring, &cqe); + } +} + +void +IOUringContext::submit_and_wait(int ms) +{ + ink_hrtime t = ink_hrtime_from_msec(ms); + timespec ts = ink_hrtime_to_timespec(t); + __kernel_timespec timeout = {ts.tv_sec, ts.tv_nsec}; + io_uring_cqe *cqe = nullptr; + + io_uring_submit_and_wait_timeout(&ring, &cqe, 1, &timeout, nullptr); + while (cqe) { + handle_cqe(cqe); + io_uring_completions++; + io_uring_cqe_seen(&ring, cqe); + + cqe = nullptr; + io_uring_peek_cqe(&ring, &cqe); + } +} + +int +IOUringContext::register_eventfd() +{ + if (evfd == -1) { + evfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + + io_uring_register_eventfd(&ring, evfd); + } + return evfd; +} + +IOUringContext * +IOUringContext::local_context() +{ + thread_local IOUringContext threadContext; + + return &threadContext; +} diff --git a/iocore/io_uring/unit_tests/test_diskIO.cc b/iocore/io_uring/unit_tests/test_diskIO.cc new file mode 100644 index 00000000000..ab7a3906661 --- /dev/null +++ b/iocore/io_uring/unit_tests/test_diskIO.cc @@ -0,0 +1,271 @@ +/** @file + + Catch based unit tests for EventSystem + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#define CATCH_CONFIG_MAIN +#include +#include "catch.hpp" + +#include "I_IO_URING.h" +#include "tscore/ts_file.h" + +#include + +#include +#include +#include + +ts::file::path +temp_prefix(const char *basename) +{ + char buffer[PATH_MAX]; + std::error_code err; + auto tmpdir = ts::file::temp_directory_path(); + snprintf(buffer, sizeof(buffer), "%s/%s.XXXXXX", tmpdir.c_str(), basename); + auto prefix = ts::file::path(mkdtemp(buffer)); + bool result = ts::file::create_directories(prefix, err, 0755); + if (!result) { + throw std::runtime_error("Failed to create directory"); + } + ink_assert(result); + + return prefix; +} + +int +open_path(const ts::file::path &path, int oflags = O_CREAT | O_RDWR, int mode = 0644) +{ + return open(path.c_str(), oflags, mode); +} + +template class FunctionHolderHandler : public IOUringCompletionHandler +{ +public: + FunctionHolderHandler(F &&f) : f(std::move(f)) {} + + void + handle_complete(io_uring_cqe *c) override + { + f(c->res); + } + +private: + F f; +}; + +IOUringCompletionHandler * +handle(const std::function &f) +{ + return new FunctionHolderHandler(std::move(f)); +} + +void +io_uring_write(IOUringContext &ur, int fd, const char *data, size_t len, const std::function &f) +{ + io_uring_sqe *s = ur.next_sqe(handle(f)); + + io_uring_prep_write(s, fd, data, len, 0); +} +void +io_uring_read(IOUringContext &ur, int fd, char *data, size_t len, const std::function &f) +{ + io_uring_sqe *s = ur.next_sqe(handle(f)); + + io_uring_prep_read(s, fd, data, len, 0); +} + +void +io_uring_close(IOUringContext &ur, int fd, const std::function &f) +{ + io_uring_sqe *s = ur.next_sqe(handle(f)); + + io_uring_prep_close(s, fd); +} + +void +io_uring_accept(IOUringContext &ur, int sock, sockaddr *addr, socklen_t *addrlen, const std::function &f) +{ + io_uring_sqe *s = ur.next_sqe(handle(f)); + + io_uring_prep_accept(s, sock, addr, addrlen, 0); +} + +void +io_uring_connect(IOUringContext &ur, int sock, sockaddr *addr, socklen_t addrlen, const std::function &f) +{ + io_uring_sqe *s = ur.next_sqe(handle(f)); + + io_uring_prep_connect(s, sock, addr, addrlen); +} + +TEST_CASE("disk_io", "[io_uring]") +{ + IOUringConfig cfg = { + .queue_entries = 32, + }; + IOUringContext::set_config(cfg); + IOUringContext ctx; + + auto tmp = temp_prefix("disk_io"); + + REQUIRE(ts::file::exists(tmp)); + + auto apath = tmp / "a"; + + int fd = open_path(apath); + + std::printf("%s\n", apath.c_str()); + + REQUIRE(fd != -1); + + io_uring_write(ctx, fd, "hello", 5, [](int result) { REQUIRE(result == 5); }); + ctx.submit_and_wait(100); + io_uring_close(ctx, fd, [&fd](int result) { + REQUIRE(result == 0); + fd = -1; + }); + + ctx.submit_and_wait(100); + + REQUIRE(fd == -1); + + fd = open_path(apath, O_RDONLY); + char buffer[6] = {0}; + io_uring_read(ctx, fd, buffer, sizeof(buffer), [&](int result) { + using namespace std::literals; + + REQUIRE(result == 5); + REQUIRE("hello"sv == std::string_view(buffer, result)); + }); + + ctx.submit_and_wait(100); +} + +void +set_reuseport(int s) +{ + int optval = 1; + setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); +} + +void +set_reuseaddr(int s) +{ + int optval = 1; + setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); +} + +sockaddr_in +any_addr(int port) +{ + return {.sin_family = AF_INET, .sin_port = htons(port), .sin_addr = {.s_addr = htonl(INADDR_ANY)}, .sin_zero = {}}; +} + +sockaddr_in +make_addr(const std::string &ip, int port) +{ + sockaddr_in addr = {}; + + ::inet_aton(ip.c_str(), &addr.sin_addr); + addr.sin_port = htons(port); + addr.sin_family = AF_INET; + + return addr; +} + +int +make_listen_socket(int port) +{ + int s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + set_reuseaddr(s); + set_reuseport(s); + + const sockaddr_in addr = any_addr(port); + int rc = ::bind(s, reinterpret_cast(&addr), sizeof(addr)); + if (rc == -1) { + throw std::runtime_error("failed to bind"); + } + rc = ::listen(s, 10000); + if (rc == -1) { + throw std::runtime_error("failed to listen"); + } + + return s; +} + +int +make_client_socket() +{ + int s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + return s; +} + +class SimpleTestServer +{ +public: + SimpleTestServer(int port) : s(make_listen_socket(port)), port(port), clients(0) {} + + void + start(IOUringContext &ctx) + { + client_len = sizeof(client); + io_uring_accept(ctx, s, reinterpret_cast(&client), &client_len, [&](int result) { + REQUIRE(result > 0); + clients++; + }); + } + + int s; + int port; + int clients; + sockaddr_in client; + socklen_t client_len; +}; + +TEST_CASE("net_io", "[io_uring]") +{ + IOUringConfig cfg = { + .queue_entries = 32, + }; + IOUringContext::set_config(cfg); + IOUringContext ctx; + + SimpleTestServer server(4321); + + server.start(ctx); + + auto client_addr = make_addr("127.0.0.1", 4321); + int s = make_client_socket(); + std::atomic connected{false}; + io_uring_connect(ctx, s, reinterpret_cast(&client_addr), sizeof(client_addr), [&](int result) { + REQUIRE(result == 0); + connected = true; + }); + + uint64_t completions_before = io_uring_completions; + uint64_t needed = 2; + while ((io_uring_completions - completions_before) < needed) { + ctx.submit_and_wait(1000); + } + + REQUIRE(server.clients == 1); + REQUIRE(connected.load()); +} diff --git a/iocore/net/CMakeLists.txt b/iocore/net/CMakeLists.txt index d046c7c0003..27b09830979 100644 --- a/iocore/net/CMakeLists.txt +++ b/iocore/net/CMakeLists.txt @@ -67,6 +67,7 @@ target_compile_options(inknet PUBLIC -Wno-deprecated-declarations) target_include_directories(inknet PRIVATE ${CMAKE_SOURCE_DIR}/iocore/eventsystem ${CMAKE_SOURCE_DIR}/iocore/dns + ${CMAKE_SOURCE_DIR}/iocore/io_uring ${CMAKE_SOURCE_DIR}/iocore/aio ${CMAKE_SOURCE_DIR}/iocore/net ${CMAKE_SOURCE_DIR}/iocore/cache diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h index dc58c09e8af..f0f58f1c736 100644 --- a/iocore/net/P_UnixNet.h +++ b/iocore/net/P_UnixNet.h @@ -36,7 +36,7 @@ #define EVENTIO_DNS_CONNECTION 3 #define EVENTIO_UDP_CONNECTION 4 #define EVENTIO_ASYNC_SIGNAL 5 -#define EVENTIO_DISK 6 +#define EVENTIO_IO_URING 6 #if TS_USE_EPOLL #ifndef EPOLLEXCLUSIVE @@ -114,7 +114,6 @@ struct EventIO { int start(EventLoop l, NetEvent *ne, int events); int start(EventLoop l, UnixUDPConnection *vc, int events); int start(EventLoop l, int fd, NetEvent *ne, int events); - int start(EventLoop l, DiskHandler *dh); int start_common(EventLoop l, int fd, int events); /** Alter the events that will trigger the continuation, for level triggered I/O. @@ -277,6 +276,10 @@ class NetHandler : public Continuation, public EThread::LoopTailHandler Que(NetEvent, active_queue_link) active_queue; uint32_t active_queue_size = 0; +#ifdef TS_USE_LINUX_IO_URING + EventIO uring_evio; +#endif + /// configuration settings for managing the active and keep-alive queues struct Config { uint32_t max_connections_in = 0; diff --git a/iocore/net/UnixNet.cc b/iocore/net/UnixNet.cc index 721ac80b28b..4adcb9c8144 100644 --- a/iocore/net/UnixNet.cc +++ b/iocore/net/UnixNet.cc @@ -25,6 +25,10 @@ #include "I_AIO.h" #include "tscore/ink_hrtime.h" +#if TS_USE_LINUX_IO_URING +#include "I_IO_URING.h" +#endif + using namespace std::literals; ink_hrtime last_throttle_warning; @@ -270,12 +274,14 @@ initialize_thread_for_net(EThread *thread) thread->ep->type = EVENTIO_ASYNC_SIGNAL; #if HAVE_EVENTFD thread->ep->start(pd, thread->evfd, nullptr, EVENTIO_READ); -#if TS_USE_LINUX_IO_URING - thread->ep->start(pd, DiskHandler::local_context()); -#endif #else thread->ep->start(pd, thread->evpipe[0], nullptr, EVENTIO_READ); #endif + +#if TS_USE_LINUX_IO_URING + nh->uring_evio.type = EVENTIO_IO_URING; + nh->uring_evio.start(pd, IOUringContext::local_context()->register_eventfd(), nullptr, EVENTIO_READ); +#endif } // NetHandler method definitions @@ -488,8 +494,8 @@ NetHandler::waitForActivity(ink_hrtime timeout) { EventIO *epd = nullptr; #if AIO_MODE == AIO_MODE_IO_URING - DiskHandler *dh = DiskHandler::local_context(); - bool servicedh = false; + IOUringContext *ur = IOUringContext::local_context(); + bool servicedh = false; #endif NET_INCREMENT_DYN_STAT(net_handler_run_stat); @@ -498,7 +504,7 @@ NetHandler::waitForActivity(ink_hrtime timeout) process_enabled_list(); #if AIO_MODE == AIO_MODE_IO_URING - dh->submit(); + ur->submit(); #endif // Polling event by PollCont @@ -553,7 +559,7 @@ NetHandler::waitForActivity(ink_hrtime timeout) } else if (epd->type == EVENTIO_NETACCEPT) { this->thread->schedule_imm(epd->data.na); #if AIO_MODE == AIO_MODE_IO_URING - } else if (epd->type == EVENTIO_DISK) { + } else if (epd->type == EVENTIO_IO_URING) { servicedh = true; #endif } @@ -566,7 +572,7 @@ NetHandler::waitForActivity(ink_hrtime timeout) #if AIO_MODE == AIO_MODE_IO_URING if (servicedh) { - dh->service(); + ur->service(); } #endif @@ -801,16 +807,3 @@ NetHandler::remove_from_active_queue(NetEvent *ne) --active_queue_size; } } - -int -EventIO::start(EventLoop l, DiskHandler *dh) -{ -#if AIO_MODE == AIO_MODE_IO_URING - data.dh = dh; - int fd = dh->register_eventfd(); - type = EVENTIO_DISK; - return start_common(l, fd, EVENTIO_READ); -#else - return 1; -#endif -} diff --git a/mgmt/config/CMakeLists.txt b/mgmt/config/CMakeLists.txt index 7f8c298ca7b..ddc08687d66 100644 --- a/mgmt/config/CMakeLists.txt +++ b/mgmt/config/CMakeLists.txt @@ -24,9 +24,5 @@ include_directories( ${CMAKE_SOURCE_DIR}/proxy ${CMAKE_SOURCE_DIR}/proxy/hdrs ${CMAKE_SOURCE_DIR}/proxy/http - ${CMAKE_SOURCE_DIR}/iocore/eventsystem - ${CMAKE_SOURCE_DIR}/iocore/net - ${CMAKE_SOURCE_DIR}/iocore/cache - ${CMAKE_SOURCE_DIR}/iocore/aio - ${CMAKE_SOURCE_DIR}/iocore/dns + ${IOCORE_INCLUDE_DIRS} ) \ No newline at end of file diff --git a/mgmt/rpc/CMakeLists.txt b/mgmt/rpc/CMakeLists.txt index feb8cb9ecc9..1e162a3087a 100644 --- a/mgmt/rpc/CMakeLists.txt +++ b/mgmt/rpc/CMakeLists.txt @@ -18,12 +18,7 @@ include_directories( ${CMAKE_SOURCE_DIR}/mgmt ${CMAKE_SOURCE_DIR}/mgmt/rpc - ${CMAKE_SOURCE_DIR}/iocore/eventsystem - ${CMAKE_SOURCE_DIR}/iocore/utils - ${CMAKE_SOURCE_DIR}/iocore/cache - ${CMAKE_SOURCE_DIR}/iocore/aio - ${CMAKE_SOURCE_DIR}/iocore/net - ${CMAKE_SOURCE_DIR}/iocore/dns + ${IOCORE_INCLUDE_DIRS} ${CMAKE_SOURCE_DIR}/lib ${CMAKE_SOURCE_DIR}/proxy ${CMAKE_SOURCE_DIR}/proxy/hdrs diff --git a/src/traffic_server/CMakeLists.txt b/src/traffic_server/CMakeLists.txt index 97955731cbf..9e47abf1034 100644 --- a/src/traffic_server/CMakeLists.txt +++ b/src/traffic_server/CMakeLists.txt @@ -59,4 +59,8 @@ target_link_libraries(traffic_server jsonrpc_protocol jsonrpc_server rpcpublichandlers - ) \ No newline at end of file + ) + +if (TS_USE_LINUX_IO_URING) + target_link_libraries(traffic_server inkuring uring) +endif (TS_USE_LINUX_IO_URING) \ No newline at end of file diff --git a/src/traffic_server/Makefile.inc b/src/traffic_server/Makefile.inc index fd44088da9d..9121f91a66f 100644 --- a/src/traffic_server/Makefile.inc +++ b/src/traffic_server/Makefile.inc @@ -121,3 +121,8 @@ traffic_server_traffic_server_LDADD += \ $(QUICHE_LIB) endif endif + +if ENABLE_IO_URING +traffic_server_traffic_server_LDADD += \ + $(top_builddir)/iocore/io_uring/libinkuring.a +endif diff --git a/src/traffic_server/traffic_server.cc b/src/traffic_server/traffic_server.cc index 7328eb65076..fd63ba31711 100644 --- a/src/traffic_server/traffic_server.cc +++ b/src/traffic_server/traffic_server.cc @@ -2041,9 +2041,9 @@ main(int /* argc ATS_UNUSED */, const char **argv) #if TS_USE_LINUX_IO_URING == 1 Note("Using io_uring for AIO"); - DiskHandler *main_aio = DiskHandler::local_context(); - DiskHandler::set_main_queue(main_aio); - auto [bounded, unbounded] = main_aio->get_wq_max_workers(); + IOUringContext *ur = IOUringContext::local_context(); + IOUringContext::set_main_queue(ur); + auto [bounded, unbounded] = ur->get_wq_max_workers(); Note("io_uring: WQ workers - bounded = %d, unbounded = %d", bounded, unbounded); #endif @@ -2248,7 +2248,7 @@ main(int /* argc ATS_UNUSED */, const char **argv) while (!TSSystemState::is_event_system_shut_down()) { #if TS_USE_LINUX_IO_URING == 1 - main_aio->submit_and_wait(1000); + ur->submit_and_wait(1000); #else sleep(1); #endif