diff --git a/CMakeLists.txt b/CMakeLists.txt index a1a3654d08b..0efbb62b038 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -105,6 +105,9 @@ check_symbol_exists(kqueue "sys/event.h" TS_USE_KQUEUE) set(CMAKE_REQUIRED_LIBRARIES uring) check_symbol_exists(io_uring_queue_init "liburing.h" HAVE_IOURING) unset(CMAKE_REQUIRED_LIBRARIES) +set(CMAKE_REQUIRED_LIBRARIES aio) +check_symbol_exists(io_submit "libaio.h" HAVE_LIBAIO) +unset(CMAKE_REQUIRED_LIBRARIES) check_symbol_exists(getresuid unistd.h HAVE_GETRESUID) check_symbol_exists(getresgid unistd.h HAVE_GETRESGID) check_symbol_exists(accept4 sys/socket.h HAVE_ACCEPT4) @@ -118,6 +121,12 @@ if (HAVE_IOURING AND USE_IOURING) set(TS_USE_LINUX_IO_URING 1) endif(HAVE_IOURING AND USE_IOURING) +option(USE_LIBAIO "Use experimental libaio (linux only)" 0) +if (HAVE_LIBAIO AND USE_LIBAIO) + message(Using libaio) + set(TS_USE_LINUX_NATIVE_AIO 1) +endif(HAVE_LIBAIO AND USE_LIBAIO) + # Check ssl functionality list(APPEND CMAKE_REQUIRED_INCLUDES ${OPENSSL_INCLUDE_DIR}) list(APPEND CMAKE_REQUIRED_LIBRARIES ${OPENSSL_CRYPTO_LIBRARY}) diff --git a/doc/admin-guide/files/records.yaml.en.rst b/doc/admin-guide/files/records.yaml.en.rst index c92caca1057..7d8b7a87ce6 100644 --- a/doc/admin-guide/files/records.yaml.en.rst +++ b/doc/admin-guide/files/records.yaml.en.rst @@ -5196,3 +5196,45 @@ Sockets .. _Traffic Shaping: https://cwiki.apache.org/confluence/display/TS/Traffic+Shaping + +IO_URING +======== + +.. ts:cv:: CONFIG proxy.config.io_uring.entries INT 32 + + Specify the number of entries in each io_uring. There will be on io_uring instance per thread that uses io_uring + for IO. This parameter is passed to io_uring_queue_init. + +.. ts:cv:: CONFIG proxy.config.io_uring.sq_poll_ms INT 0 + + If this value is >0 then use submit queue polling mode. The value will be used to specifiy the sq_thread_idle parameter + to io_uring setup. More information about submit queue polling mode can be found here: https://unixism.net/loti/tutorial/sq_poll.html + +.. ts:cv:: CONFIG proxy.config.io_uring.attach_wq INT 0 + + Set this to 1 if you want io_uring to re-use the same worker queue backend for each thread. + +.. ts:cv:: CONFIG proxy.config.io_uring.wq_workers_bounded INT 0 +.. ts:cv:: CONFIG proxy.config.io_uring.wq_workers_unbounded INT 0 + + These settings configured the number of threads for the io_uring worker queue backend. See the manpage for + io_uring_register_iowq_max_workers for more information. + +AIO +=== + +.. ts:cv:: CONFIG proxy.config.aio.mode STRING auto + + (Only if io_uring is enabled in the build) + Normally, ATS will detect if io_uring can be used for async disk IO. Using this config item, the AIO mode + can instead be specified. The value can be one of: + + ============ ====================================================================== + Value Description + ============ ====================================================================== + ``auto`` Use the default detection logic + ``thread`` Use the AIO thread pool for disk IO + ``io_uring`` Use io_uring for disk IO + ============ ====================================================================== + + Note: If you force the backend to use io_uring, you might experience failures with some (older, pre 5.4) kernel versions diff --git a/include/tscore/ink_config.h.cmake.in b/include/tscore/ink_config.h.cmake.in index f77f2574e7f..9f93b0ef043 100644 --- a/include/tscore/ink_config.h.cmake.in +++ b/include/tscore/ink_config.h.cmake.in @@ -107,6 +107,7 @@ const int DEFAULT_STACKSIZE = @DEFAULT_STACK_SIZE@; #cmakedefine01 TS_USE_EPOLL #cmakedefine01 TS_USE_KQUEUE #cmakedefine01 TS_USE_LINUX_IO_URING +#cmakedefine01 TS_USE_LINUX_NATIVE_AIO #cmakedefine01 TS_USE_SET_RBIO #cmakedefine01 TS_USE_DIAGS #cmakedefine01 TS_USE_GET_DH_2048_256 @@ -114,4 +115,3 @@ const int DEFAULT_STACKSIZE = @DEFAULT_STACK_SIZE@; #define TS_BUILD_CANONICAL_HOST "@CMAKE_HOST@" #cmakedefine YAMLCPP_LIB_VERSION "@YAMLCPP_LIB_VERSION@" - diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc index 95e0209b890..2fac40b8a5d 100644 --- a/iocore/aio/AIO.cc +++ b/iocore/aio/AIO.cc @@ -32,7 +32,7 @@ #include "P_AIO.h" -#if AIO_MODE == AIO_MODE_IO_URING +#if defined(HAVE_EVENTFD) && AIO_MODE == AIO_MODE_IO_URING #include #endif @@ -43,33 +43,25 @@ #define MAX_DISKS_POSSIBLE 100 // globals +#if TS_USE_LINUX_IO_URING +static bool use_io_uring = false; +#endif -int ts_config_with_inkdiskio = 0; /* structure to hold information about each file descriptor */ AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE]; /* number of unique file descriptors in the aio_reqs array */ int num_filedes = 1; -#if AIO_MODE == AIO_MODE_THREAD // acquire this mutex before inserting a new entry in the aio_reqs array. // Don't need to acquire this for searching the array static ink_mutex insert_mutex; -#endif int thread_is_created = 0; #endif // AIO_MODE == AIO_MODE_NATIVE + RecInt cache_config_threads_per_disk = 12; RecInt api_config_threads_per_disk = 12; -// config for io_uring mode -#if AIO_MODE == AIO_MODE_IO_URING -RecInt aio_io_uring_queue_entries = 1024; -RecInt aio_io_uring_sq_poll_ms = 0; -RecInt aio_io_uring_attach_wq = 0; -RecInt aio_io_uring_wq_bounded = 0; -RecInt aio_io_uring_wq_unbounded = 0; -#endif - RecRawStatBlock *aio_rsb = nullptr; Continuation *aio_err_callbck = nullptr; // AIO Stats @@ -116,7 +108,7 @@ aio_stats_cb(const char * /* name ATS_UNUSED */, RecDataT data_type, RecData *da case AIO_STAT_KB_WRITE_PER_SEC: new_val = aio_bytes_written.load() >> 10; break; -#if AIO_MODE == AIO_MODE_IO_URING +#if TS_USE_LINUX_IO_URING case AIO_STAT_IO_URING_SUBMITTED: new_val = io_uring_submissions.load(); break; @@ -171,7 +163,7 @@ ink_aio_set_callback(Continuation *callback) } void -ink_aio_init(ts::ModuleVersion v) +ink_aio_init(ts::ModuleVersion v, AIOBackend backend) { ink_release_assert(v.check(AIO_MODULE_INTERNAL_VERSION)); @@ -190,35 +182,69 @@ ink_aio_init(ts::ModuleVersion v) RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.io_uring.completed", RECD_FLOAT, RECP_PERSISTENT, (int)AIO_STAT_IO_URING_COMPLETED, aio_stats_cb); #endif -#if AIO_MODE == AIO_MODE_THREAD +#if AIO_MODE == AIO_MODE_DEFAULT memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *)); ink_mutex_init(&insert_mutex); #endif REC_ReadConfigInteger(cache_config_threads_per_disk, "proxy.config.cache.threads_per_disk"); #if TS_USE_LINUX_NATIVE_AIO - Warning("Running with Linux AIO, there are known issues with this feature"); + Warning( + "Running with Linux libaio is deprecated. There are known issues with this feature and it is being replaced with io_uring"); #endif +#if AIO_MODE == AIO_MODE_DEFAULT #if TS_USE_LINUX_IO_URING - REC_ReadConfigInteger(aio_io_uring_queue_entries, "proxy.config.aio.io_uring.entries"); - REC_ReadConfigInteger(aio_io_uring_sq_poll_ms, "proxy.config.aio.io_uring.sq_poll_ms"); - REC_ReadConfigInteger(aio_io_uring_attach_wq, "proxy.config.aio.io_uring.attach_wq"); - REC_ReadConfigInteger(aio_io_uring_wq_bounded, "proxy.config.aio.io_uring.wq_workers_bounded"); - REC_ReadConfigInteger(aio_io_uring_wq_unbounded, "proxy.config.aio.io_uring.wq_workers_unbounded"); -#endif -} + // If the caller specified auto backend, check for config to force a backend + if (backend == AIOBackend::AIO_BACKEND_AUTO) { + RecString aio_mode = nullptr; + REC_ReadConfigStringAlloc(aio_mode, "proxy.config.aio.mode"); + if (aio_mode) { + if (strcasecmp(aio_mode, "auto") == 0) { + backend = AIOBackend::AIO_BACKEND_AUTO; + } else if (strcasecmp(aio_mode, "thread") == 0) { + // force thread mode + backend = AIOBackend::AIO_BACKEND_THREAD; + } else if (strcasecmp(aio_mode, "io_uring") == 0) { + // force io_uring mode + backend = AIOBackend::AIO_BACKEND_IO_URING; + } else { + Warning("Invalid value '%s' for proxy.config.aio.mode. autodetecting", aio_mode); + } -int -ink_aio_start() -{ -#ifdef AIO_STATS - data = new AIOTestData(); - eventProcessor.schedule_in(data, HRTIME_MSECONDS(100), ET_CALL); + ats_free(aio_mode); + } + } + + switch (backend) { + case AIOBackend::AIO_BACKEND_AUTO: { + // detect if io_uring is available and can support the required features + auto *ctx = IOUringContext::local_context(); + if (ctx && ctx->supports_op(IORING_OP_WRITE) && ctx->supports_op(IORING_OP_READ)) { + use_io_uring = true; + } else { + Note("AIO using thread backend as required io_uring ops are not supported"); + use_io_uring = false; + } + break; + } + case AIOBackend::AIO_BACKEND_IO_URING: + use_io_uring = true; + break; + case AIOBackend::AIO_BACKEND_THREAD: + use_io_uring = false; + break; + } + + if (use_io_uring) { + Note("Using io_uring for AIO"); + } else { + Note("Using thread for AIO"); + } +#endif #endif - return 0; } -#if AIO_MODE == AIO_MODE_THREAD +#if AIO_MODE == AIO_MODE_DEFAULT struct AIOThreadInfo : public Continuation { AIO_Reqs *req; @@ -444,24 +470,6 @@ cache_op(AIOCallbackInternal *op) return 1; } -int -ink_aio_read(AIOCallback *op, int fromAPI) -{ - op->aiocb.aio_lio_opcode = LIO_READ; - aio_queue_req((AIOCallbackInternal *)op, fromAPI); - - return 1; -} - -int -ink_aio_write(AIOCallback *op, int fromAPI) -{ - op->aiocb.aio_lio_opcode = LIO_WRITE; - aio_queue_req((AIOCallbackInternal *)op, fromAPI); - - return 1; -} - bool ink_aio_thread_num_set(int thread_num) { @@ -530,14 +538,13 @@ AIOThreadInfo::aio_thread_main(AIOThreadInfo *thr_info) return nullptr; } -#elif AIO_MODE == AIO_MODE_IO_URING - +#if TS_USE_LINUX_IO_URING #include "I_IO_URING.h" void -ink_aiocb::handle_complete(io_uring_cqe *cqe) +AIOCallbackInternal::handle_complete(io_uring_cqe *cqe) { - AIOCallback *op = this_op; + AIOCallbackInternal *op = this_op; op->aio_result = static_cast(cqe->res); op->link.prev = nullptr; @@ -553,8 +560,8 @@ ink_aiocb::handle_complete(io_uring_cqe *cqe) } // the last op in the linked ops will have the original op stored in the aiocb - if (op->aiocb.aio_op) { - op = op->aiocb.aio_op; + if (aio_op) { + op = op->aio_op; if (op->thread == AIO_CALLBACK_THREAD_AIO) { SCOPED_MUTEX_LOCK(lock, op->mutex, this_ethread()); op->handleEvent(EVENT_NONE, nullptr); @@ -565,50 +572,68 @@ ink_aiocb::handle_complete(io_uring_cqe *cqe) } } } +#endif int -ink_aio_read(AIOCallback *op_in, int /* fromAPI ATS_UNUSED */) +ink_aio_read(AIOCallback *op_in, int fromAPI) { - IOUringContext *ur = IOUringContext::local_context(); - AIOCallback *op = op_in; - while (op) { - op->aiocb.this_op = op; - io_uring_sqe *sqe = ur->next_sqe(&op->aiocb); - io_uring_prep_read(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset); - op->aiocb.aio_lio_opcode = LIO_READ; - if (op->then) { - sqe->flags |= IOSQE_IO_LINK; - } else { - op->aiocb.aio_op = op_in; - } +#if TS_USE_LINUX_IO_URING + if (use_io_uring) { + IOUringContext *ur = IOUringContext::local_context(); + AIOCallbackInternal *op = static_cast(op_in); + while (op) { + op->this_op = op; + io_uring_sqe *sqe = ur->next_sqe(op); + io_uring_prep_read(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset); + op->aiocb.aio_lio_opcode = LIO_READ; + if (op->then) { + sqe->flags |= IOSQE_IO_LINK; + } else { + op->aio_op = static_cast(op_in); + } - op = op->then; + op = static_cast(op->then); + } + return 1; } +#endif + op_in->aiocb.aio_lio_opcode = LIO_READ; + aio_queue_req(static_cast(op_in), fromAPI); + return 1; } int -ink_aio_write(AIOCallback *op_in, int /* fromAPI ATS_UNUSED */) +ink_aio_write(AIOCallback *op_in, int fromAPI) { - IOUringContext *ur = IOUringContext::local_context(); - AIOCallback *op = op_in; - while (op) { - op->aiocb.this_op = op; - io_uring_sqe *sqe = ur->next_sqe(&op->aiocb); - io_uring_prep_write(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset); - op->aiocb.aio_lio_opcode = LIO_WRITE; - if (op->then) { - sqe->flags |= IOSQE_IO_LINK; - } else { - op->aiocb.aio_op = op_in; - } +#if TS_USE_LINUX_IO_URING + if (use_io_uring) { + IOUringContext *ur = IOUringContext::local_context(); + AIOCallbackInternal *op = static_cast(op_in); + while (op) { + op->this_op = op; + io_uring_sqe *sqe = ur->next_sqe(op); + io_uring_prep_write(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset); + op->aiocb.aio_lio_opcode = LIO_WRITE; + if (op->then) { + sqe->flags |= IOSQE_IO_LINK; + } else { + op->aio_op = static_cast(op_in); + } - op = op->then; + op = static_cast(op->then); + } + return 1; } +#endif + op_in->aiocb.aio_lio_opcode = LIO_WRITE; + aio_queue_req(static_cast(op_in), fromAPI); + return 1; } +#endif -#else +#if AIO_MODE == AIO_MODE_NATIVE int DiskHandler::startAIOEvent(int /* event ATS_UNUSED */, Event *e) { diff --git a/iocore/aio/I_AIO.h b/iocore/aio/I_AIO.h index d978f8d0eec..917909dd1ec 100644 --- a/iocore/aio/I_AIO.h +++ b/iocore/aio/I_AIO.h @@ -38,21 +38,24 @@ static constexpr ts::ModuleVersion AIO_MODULE_PUBLIC_VERSION(1, 0, ts::ModuleVer #define AIO_EVENT_DONE (AIO_EVENT_EVENTS_START + 0) -#define AIO_MODE_THREAD 0 -#define AIO_MODE_NATIVE 1 -#define AIO_MODE_IO_URING 2 +#define AIO_MODE_DEFAULT 0 +#define AIO_MODE_NATIVE 1 #if TS_USE_LINUX_NATIVE_AIO #define AIO_MODE AIO_MODE_NATIVE -#elif TS_USE_LINUX_IO_URING -#define AIO_MODE AIO_MODE_IO_URING #else -#define AIO_MODE AIO_MODE_THREAD +#define AIO_MODE AIO_MODE_DEFAULT #endif #define LIO_READ 0x1 #define LIO_WRITE 0x2 +enum AIOBackend { + AIO_BACKEND_AUTO = 0, + AIO_BACKEND_THREAD = 1, + AIO_BACKEND_IO_URING = 2, +}; + #if AIO_MODE == AIO_MODE_NATIVE #include @@ -67,24 +70,6 @@ using ink_io_event_t = struct io_event; #define aio_offset u.c.offset #define aio_buf u.c.buf -#elif AIO_MODE == AIO_MODE_IO_URING -#include "I_IO_URING.h" - -struct AIOCallback; -struct ink_aiocb : public IOUringCompletionHandler { - int aio_fildes = -1; /* file descriptor or status: AIO_NOT_IN_PROGRESS */ - void *aio_buf = nullptr; /* buffer location */ - size_t aio_nbytes = 0; /* length of transfer */ - off_t aio_offset = 0; /* file offset */ - - int aio_lio_opcode = 0; /* listio operation */ - int aio_state = 0; /* state flag for List I/O */ - AIOCallback *this_op = nullptr; - AIOCallback *aio_op = nullptr; - - void handle_complete(io_uring_cqe *) override; -}; - #else struct ink_aiocb { @@ -95,11 +80,9 @@ struct ink_aiocb { int aio_lio_opcode = 0; /* listio operation */ int aio_state = 0; /* state flag for List I/O */ - int aio__pad[1]; /* extension padding */ }; bool ink_aio_thread_num_set(int thread_num); - #endif // AIOCallback::thread special values @@ -157,16 +140,15 @@ struct DiskHandler : public Continuation { private: inline static DbgCtl _dbg_ctl_aio{"aio"}; }; +int ink_aio_readv(AIOCallback *op, + int fromAPI = 0); // fromAPI is a boolean to indicate if this is from an API call such as upload proxy feature +int ink_aio_writev(AIOCallback *op, int fromAPI = 0); #endif -void ink_aio_init(ts::ModuleVersion version); -int ink_aio_start(); +void ink_aio_init(ts::ModuleVersion version, AIOBackend backend = AIO_BACKEND_AUTO); void ink_aio_set_callback(Continuation *error_callback); int ink_aio_read(AIOCallback *op, int fromAPI = 0); // fromAPI is a boolean to indicate if this is from an API call such as upload proxy feature int ink_aio_write(AIOCallback *op, int fromAPI = 0); -int ink_aio_readv(AIOCallback *op, - int fromAPI = 0); // fromAPI is a boolean to indicate if this is from an API call such as upload proxy feature -int ink_aio_writev(AIOCallback *op, int fromAPI = 0); AIOCallback *new_AIOCallback(); diff --git a/iocore/aio/P_AIO.h b/iocore/aio/P_AIO.h index d7b7fd25f20..467f4eb87f3 100644 --- a/iocore/aio/P_AIO.h +++ b/iocore/aio/P_AIO.h @@ -33,6 +33,10 @@ #include "P_EventSystem.h" #include "I_AIO.h" +#if TS_USE_LINUX_IO_URING +#include "I_IO_URING.h" +#endif + // for debugging // #define AIO_STATS 1 @@ -79,10 +83,20 @@ AIOVec::mainEvent(int /* event */, Event *) struct AIO_Reqs; +#if TS_USE_LINUX_IO_URING +struct AIOCallbackInternal : public AIOCallback, public IOUringCompletionHandler { +#else struct AIOCallbackInternal : public AIOCallback { +#endif AIO_Reqs *aio_req = nullptr; ink_hrtime sleep_time = 0; SLINK(AIOCallbackInternal, alink); /* for AIO_Reqs::aio_temp_list */ +#if TS_USE_LINUX_IO_URING + AIOCallbackInternal *this_op = nullptr; + AIOCallbackInternal *aio_op = nullptr; + + void handle_complete(io_uring_cqe *) override; +#endif int io_complete(int event, void *data); @@ -148,7 +162,7 @@ enum aio_stat_enum { AIO_STAT_KB_READ_PER_SEC, AIO_STAT_WRITE_PER_SEC, AIO_STAT_KB_WRITE_PER_SEC, -#if AIO_MODE == AIO_MODE_IO_URING +#if TS_USE_LINUX_IO_URING AIO_STAT_IO_URING_SUBMITTED, AIO_STAT_IO_URING_COMPLETED, #endif diff --git a/iocore/aio/sample.cfg b/iocore/aio/sample.cfg index 054127c1719..d8799318fc0 100644 --- a/iocore/aio/sample.cfg +++ b/iocore/aio/sample.cfg @@ -15,5 +15,5 @@ chains 1 delete_disks 1 disk_path ./aio.tst io_uring_queue_entries 32 -num_processors 1 - +num_processors 5 +io_uring_force_thread 0 diff --git a/iocore/aio/test_AIO.cc b/iocore/aio/test_AIO.cc index a4fd41d3b57..8ab99eed6d9 100644 --- a/iocore/aio/test_AIO.cc +++ b/iocore/aio/test_AIO.cc @@ -72,12 +72,13 @@ int delete_disks = 0; int max_size = 0; int use_lseek = 0; int num_processors = 0; -#if AIO_MODE == AIO_MODE_IO_URING +#if TS_USE_LINUX_IO_URING int io_uring_queue_entries = 32; int io_uring_sq_poll_ms = 0; int io_uring_attach_wq = 0; int io_uring_wq_bounded = 0; int io_uring_wq_unbounded = 0; +int io_uring_force_thread = 0; #endif int chains = 1; @@ -218,7 +219,7 @@ dump_summary() printf("%0.2f total mbytes/sec\n", sr + sw + rr); printf("----------------------------------------------------------\n"); -#if AIO_MODE == AIO_MODE_IO_URING +#if TS_USE_LINUX_IO_URING printf("-----------------\n"); printf("IO_URING results\n"); printf("-----------------\n"); @@ -392,12 +393,13 @@ read_config(const char *config_filename) PARAM(threads_per_disk) PARAM(delete_disks) PARAM(num_processors) -#if AIO_MODE == AIO_MODE_IO_URING +#if TS_USE_LINUX_IO_URING PARAM(io_uring_queue_entries) PARAM(io_uring_sq_poll_ms) PARAM(io_uring_attach_wq) PARAM(io_uring_wq_bounded) PARAM(io_uring_wq_unbounded) + PARAM(io_uring_force_thread) #endif else if (strcmp(field_name, "disk_path") == 0) { @@ -429,7 +431,7 @@ read_config(const char *config_filename) return (1); } -#if AIO_MODE == AIO_MODE_IO_URING +#if TS_USE_LINUX_IO_URING class IOUringLoopTailHandler : public EThread::LoopTailHandler { @@ -468,7 +470,9 @@ main(int /* argc ATS_UNUSED */, char *argv[]) } printf("Using %d processor threads\n", num_processors); -#if AIO_MODE == AIO_MODE_IO_URING + AIOBackend backend = AIOBackend::AIO_BACKEND_AUTO; + +#if TS_USE_LINUX_IO_URING { IOUringConfig cfg; @@ -479,6 +483,10 @@ main(int /* argc ATS_UNUSED */, char *argv[]) cfg.wq_unbounded = io_uring_wq_unbounded; IOUringContext::set_config(cfg); + + if (io_uring_force_thread) { + backend = AIOBackend::AIO_BACKEND_THREAD; + } }; #endif @@ -497,14 +505,16 @@ main(int /* argc ATS_UNUSED */, char *argv[]) et->schedule_imm(et->diskHandler); } #endif -#if AIO_MODE == AIO_MODE_IO_URING - for (EThread *et : eventProcessor.active_group_threads(ET_NET)) { - et->set_tail_handler(&uring_handler); +#if TS_USE_LINUX_IO_URING + if (!io_uring_force_thread) { + for (EThread *et : eventProcessor.active_group_threads(ET_NET)) { + et->set_tail_handler(&uring_handler); + } } #endif RecProcessStart(); - ink_aio_init(AIO_MODULE_PUBLIC_VERSION); + ink_aio_init(AIO_MODULE_PUBLIC_VERSION, backend); ts::Random::seed(time(nullptr)); max_size = seq_read_size; @@ -539,7 +549,7 @@ main(int /* argc ATS_UNUSED */, char *argv[]) } while (!TSSystemState::is_event_system_shut_down()) { -#if AIO_MODE == AIO_MODE_IO_URING +#if TS_USE_LINUX_IO_URING IOUringContext::local_context()->submit_and_wait(1 * HRTIME_SECOND); #else sleep(1); diff --git a/iocore/io_uring/I_IO_URING.h b/iocore/io_uring/I_IO_URING.h index 32520fb4555..cb76c0d3c84 100644 --- a/iocore/io_uring/I_IO_URING.h +++ b/iocore/io_uring/I_IO_URING.h @@ -28,7 +28,7 @@ Linux io_uring helper library #include "tscore/ink_hrtime.h" struct IOUringConfig { - int queue_entries = 1024; + int queue_entries = 32; int sq_poll_ms = 0; int attach_wq = 0; int wq_bounded = 0; @@ -59,6 +59,8 @@ class IOUringContext return result; } + bool supports_op(int op) const; + int set_wq_max_workers(unsigned int bounded, unsigned int unbounded); std::pair get_wq_max_workers(); @@ -75,8 +77,9 @@ class IOUringContext static int get_main_queue_fd(); private: - io_uring ring = {}; - int evfd = -1; + io_uring ring = {}; + io_uring_probe *probe = nullptr; + int evfd = -1; void handle_cqe(io_uring_cqe *); static IOUringConfig config; diff --git a/iocore/io_uring/io_uring.cc b/iocore/io_uring/io_uring.cc index c26f2751768..17cf7c91438 100644 --- a/iocore/io_uring/io_uring.cc +++ b/iocore/io_uring/io_uring.cc @@ -43,6 +43,9 @@ IOUringContext::set_config(const IOUringConfig &cfg) config = cfg; } +static io_uring_probe probe_unsupported = {}; +constexpr int MAX_SUPPORTED_OP_BEFORE_PROBE = 20; + IOUringContext::IOUringContext() { io_uring_params p{}; @@ -70,9 +73,11 @@ IOUringContext::IOUringContext() throw std::runtime_error("No SQPOLL sharing with nonfixed"); } - // assign this handler to the thread - // TODO(cmcfarlen): Assign in thread somewhere else - // this_ethread()->diskHandler = this; + // Fetch the probe info so we can check for op support + probe = io_uring_get_probe_ring(&ring); + if (probe == nullptr) { + probe = &probe_unsupported; + } } IOUringContext::~IOUringContext() @@ -81,6 +86,9 @@ IOUringContext::~IOUringContext() ::close(evfd); evfd = -1; } + if (probe != &probe_unsupported) { + io_uring_free_probe(probe); + } io_uring_queue_exit(&ring); } @@ -143,6 +151,11 @@ IOUringContext::service() cqe = nullptr; io_uring_peek_cqe(&ring, &cqe); } + + if (evfd != -1) { + uint64_t val = 0; + ::read(evfd, &val, sizeof(val)); + } } void @@ -182,3 +195,15 @@ IOUringContext::local_context() return &threadContext; } + +bool +IOUringContext::supports_op(int op) const +{ + // If we don't have a probe, we can only support the ops that were supported + // before the probe was added. + if (probe == &probe_unsupported) { + return op <= MAX_SUPPORTED_OP_BEFORE_PROBE; + } + + return io_uring_opcode_supported(probe, op); +} diff --git a/iocore/net/UnixNet.cc b/iocore/net/UnixNet.cc index a8daf9cc70f..8ce096b9c63 100644 --- a/iocore/net/UnixNet.cc +++ b/iocore/net/UnixNet.cc @@ -462,7 +462,7 @@ int NetHandler::waitForActivity(ink_hrtime timeout) { EventIO *epd = nullptr; -#if AIO_MODE == AIO_MODE_IO_URING +#if TS_USE_LINUX_IO_URING IOUringContext *ur = IOUringContext::local_context(); bool servicedh = false; #endif @@ -472,7 +472,7 @@ NetHandler::waitForActivity(ink_hrtime timeout) process_enabled_list(); -#if AIO_MODE == AIO_MODE_IO_URING +#if TS_USE_LINUX_IO_URING ur->submit(); #endif @@ -527,7 +527,7 @@ NetHandler::waitForActivity(ink_hrtime timeout) net_signal_hook_callback(this->thread); } else if (epd->type == EVENTIO_NETACCEPT) { this->thread->schedule_imm(epd->data.na); -#if AIO_MODE == AIO_MODE_IO_URING +#if TS_USE_LINUX_IO_URING } else if (epd->type == EVENTIO_IO_URING) { servicedh = true; #endif @@ -539,7 +539,7 @@ NetHandler::waitForActivity(ink_hrtime timeout) process_ready_list(); -#if AIO_MODE == AIO_MODE_IO_URING +#if TS_USE_LINUX_IO_URING if (servicedh) { ur->service(); } diff --git a/src/records/RecordsConfig.cc b/src/records/RecordsConfig.cc index 039d5250e36..63713ea8d74 100644 --- a/src/records/RecordsConfig.cc +++ b/src/records/RecordsConfig.cc @@ -1524,14 +1524,17 @@ static const RecordElement RecordsConfig[] = //########### //# - //# AIO specific configuration + //# IO_URING specific configuration //# //########### - {RECT_CONFIG, "proxy.config.aio.io_uring.entries", RECD_INT, "1024", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL}, - {RECT_CONFIG, "proxy.config.aio.io_uring.sq_poll_ms", RECD_INT, "0", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL}, - {RECT_CONFIG, "proxy.config.aio.io_uring.attach_wq", RECD_INT, "0", RECU_NULL, RR_NULL, RECC_NULL, "[0-1]", RECA_NULL}, - {RECT_CONFIG, "proxy.config.aio.io_uring.wq_workers_bounded", RECD_INT, "0", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL}, - {RECT_CONFIG, "proxy.config.aio.io_uring.wq_workers_unbounded", RECD_INT, "0", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL}, +#if TS_USE_LINUX_IO_URING + {RECT_CONFIG, "proxy.config.io_uring.entries", RECD_INT, "1024", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL}, + {RECT_CONFIG, "proxy.config.io_uring.sq_poll_ms", RECD_INT, "0", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL}, + {RECT_CONFIG, "proxy.config.io_uring.attach_wq", RECD_INT, "0", RECU_NULL, RR_NULL, RECC_NULL, "[0-1]", RECA_NULL}, + {RECT_CONFIG, "proxy.config.io_uring.wq_workers_bounded", RECD_INT, "0", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL}, + {RECT_CONFIG, "proxy.config.io_uring.wq_workers_unbounded", RECD_INT, "0", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL}, + {RECT_CONFIG, "proxy.config.aio.mode", RECD_STRING, "auto", RECU_DYNAMIC, RR_NULL, RECC_NULL, "(auto|io_uring|thread)", RECA_NULL}, +#endif }; // clang-format on diff --git a/src/traffic_server/CMakeLists.txt b/src/traffic_server/CMakeLists.txt index f8e97e09ed5..b730f0cf4e3 100644 --- a/src/traffic_server/CMakeLists.txt +++ b/src/traffic_server/CMakeLists.txt @@ -64,6 +64,8 @@ target_link_libraries(traffic_server if (TS_USE_LINUX_IO_URING) target_link_libraries(traffic_server inkuring uring) endif (TS_USE_LINUX_IO_URING) +if (TS_USE_LINUX_NATIVE_AIO) + target_link_libraries(traffic_server aio) +endif (TS_USE_LINUX_NATIVE_AIO) install(TARGETS traffic_server) - diff --git a/src/traffic_server/traffic_server.cc b/src/traffic_server/traffic_server.cc index 64d83e6230a..919437e1249 100644 --- a/src/traffic_server/traffic_server.cc +++ b/src/traffic_server/traffic_server.cc @@ -1722,6 +1722,35 @@ bind_outputs(const char *bind_stdout_p, const char *bind_stderr_p) } } +#if TS_USE_LINUX_IO_URING +// Load config items for io_uring +static void +configure_io_uring() +{ + IOUringConfig cfg; + + RecInt aio_io_uring_queue_entries = cfg.queue_entries; + RecInt aio_io_uring_sq_poll_ms = cfg.sq_poll_ms; + RecInt aio_io_uring_attach_wq = cfg.attach_wq; + RecInt aio_io_uring_wq_bounded = cfg.wq_bounded; + RecInt aio_io_uring_wq_unbounded = cfg.wq_unbounded; + + REC_ReadConfigInteger(aio_io_uring_queue_entries, "proxy.config.io_uring.entries"); + REC_ReadConfigInteger(aio_io_uring_sq_poll_ms, "proxy.config.io_uring.sq_poll_ms"); + REC_ReadConfigInteger(aio_io_uring_attach_wq, "proxy.config.io_uring.attach_wq"); + REC_ReadConfigInteger(aio_io_uring_wq_bounded, "proxy.config.io_uring.wq_workers_bounded"); + REC_ReadConfigInteger(aio_io_uring_wq_unbounded, "proxy.config.io_uring.wq_workers_unbounded"); + + cfg.queue_entries = aio_io_uring_queue_entries; + cfg.sq_poll_ms = aio_io_uring_sq_poll_ms; + cfg.attach_wq = aio_io_uring_attach_wq; + cfg.wq_bounded = aio_io_uring_wq_bounded; + cfg.wq_unbounded = aio_io_uring_wq_unbounded; + + IOUringContext::set_config(cfg); +} +#endif + // // Main // @@ -2007,6 +2036,10 @@ main(int /* argc ATS_UNUSED */, const char **argv) REC_ReadConfigInteger(thread_max_heartbeat_mseconds, "proxy.config.thread.max_heartbeat_mseconds"); +#if TS_USE_LINUX_IO_URING + configure_io_uring(); +#endif + ink_event_system_init(ts::ModuleVersion(1, 0, ts::ModuleVersion::PRIVATE)); ink_net_init(ts::ModuleVersion(1, 0, ts::ModuleVersion::PRIVATE)); ink_aio_init(ts::ModuleVersion(1, 0, ts::ModuleVersion::PRIVATE)); @@ -2040,7 +2073,6 @@ main(int /* argc ATS_UNUSED */, const char **argv) } #if TS_USE_LINUX_IO_URING == 1 - Note("Using io_uring for AIO"); IOUringContext *ur = IOUringContext::local_context(); IOUringContext::set_main_queue(ur); auto [bounded, unbounded] = ur->get_wq_max_workers();