Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ check_symbol_exists(kqueue "sys/event.h" TS_USE_KQUEUE)
set(CMAKE_REQUIRED_LIBRARIES uring)
check_symbol_exists(io_uring_queue_init "liburing.h" HAVE_IOURING)
unset(CMAKE_REQUIRED_LIBRARIES)
set(CMAKE_REQUIRED_LIBRARIES aio)
check_symbol_exists(io_submit "libaio.h" HAVE_LIBAIO)
unset(CMAKE_REQUIRED_LIBRARIES)
check_symbol_exists(getresuid unistd.h HAVE_GETRESUID)
check_symbol_exists(getresgid unistd.h HAVE_GETRESGID)
check_symbol_exists(accept4 sys/socket.h HAVE_ACCEPT4)
Expand All @@ -118,6 +121,12 @@ if (HAVE_IOURING AND USE_IOURING)
set(TS_USE_LINUX_IO_URING 1)
endif(HAVE_IOURING AND USE_IOURING)

option(USE_LIBAIO "Use experimental libaio (linux only)" 0)
if (HAVE_LIBAIO AND USE_LIBAIO)
message(Using libaio)
set(TS_USE_LINUX_NATIVE_AIO 1)
endif(HAVE_LIBAIO AND USE_LIBAIO)

# Check ssl functionality
list(APPEND CMAKE_REQUIRED_INCLUDES ${OPENSSL_INCLUDE_DIR})
list(APPEND CMAKE_REQUIRED_LIBRARIES ${OPENSSL_CRYPTO_LIBRARY})
Expand Down
42 changes: 42 additions & 0 deletions doc/admin-guide/files/records.yaml.en.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5196,3 +5196,45 @@ Sockets

.. _Traffic Shaping:
https://cwiki.apache.org/confluence/display/TS/Traffic+Shaping

IO_URING
========

.. ts:cv:: CONFIG proxy.config.io_uring.entries INT 32

Specify the number of entries in each io_uring. There will be on io_uring instance per thread that uses io_uring
for IO. This parameter is passed to io_uring_queue_init.

.. ts:cv:: CONFIG proxy.config.io_uring.sq_poll_ms INT 0

If this value is >0 then use submit queue polling mode. The value will be used to specifiy the sq_thread_idle parameter
to io_uring setup. More information about submit queue polling mode can be found here: https://unixism.net/loti/tutorial/sq_poll.html

.. ts:cv:: CONFIG proxy.config.io_uring.attach_wq INT 0

Set this to 1 if you want io_uring to re-use the same worker queue backend for each thread.

.. ts:cv:: CONFIG proxy.config.io_uring.wq_workers_bounded INT 0
.. ts:cv:: CONFIG proxy.config.io_uring.wq_workers_unbounded INT 0

These settings configured the number of threads for the io_uring worker queue backend. See the manpage for
io_uring_register_iowq_max_workers for more information.

AIO
===

.. ts:cv:: CONFIG proxy.config.aio.mode STRING auto

(Only if io_uring is enabled in the build)
Normally, ATS will detect if io_uring can be used for async disk IO. Using this config item, the AIO mode
can instead be specified. The value can be one of:

============ ======================================================================
Value Description
============ ======================================================================
``auto`` Use the default detection logic
``thread`` Use the AIO thread pool for disk IO
``io_uring`` Use io_uring for disk IO
============ ======================================================================

Note: If you force the backend to use io_uring, you might experience failures with some (older, pre 5.4) kernel versions
2 changes: 1 addition & 1 deletion include/tscore/ink_config.h.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ const int DEFAULT_STACKSIZE = @DEFAULT_STACK_SIZE@;
#cmakedefine01 TS_USE_EPOLL
#cmakedefine01 TS_USE_KQUEUE
#cmakedefine01 TS_USE_LINUX_IO_URING
#cmakedefine01 TS_USE_LINUX_NATIVE_AIO
#cmakedefine01 TS_USE_SET_RBIO
#cmakedefine01 TS_USE_DIAGS
#cmakedefine01 TS_USE_GET_DH_2048_256

#define TS_BUILD_CANONICAL_HOST "@CMAKE_HOST@"

#cmakedefine YAMLCPP_LIB_VERSION "@YAMLCPP_LIB_VERSION@"

195 changes: 110 additions & 85 deletions iocore/aio/AIO.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

#include "P_AIO.h"

#if AIO_MODE == AIO_MODE_IO_URING
#if defined(HAVE_EVENTFD) && AIO_MODE == AIO_MODE_IO_URING
#include <sys/eventfd.h>
#endif

Expand All @@ -43,33 +43,25 @@
#define MAX_DISKS_POSSIBLE 100

// globals
#if TS_USE_LINUX_IO_URING
static bool use_io_uring = false;
#endif

int ts_config_with_inkdiskio = 0;
/* structure to hold information about each file descriptor */
AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
/* number of unique file descriptors in the aio_reqs array */
int num_filedes = 1;

#if AIO_MODE == AIO_MODE_THREAD
// acquire this mutex before inserting a new entry in the aio_reqs array.
// Don't need to acquire this for searching the array
static ink_mutex insert_mutex;
#endif

int thread_is_created = 0;
#endif // AIO_MODE == AIO_MODE_NATIVE

RecInt cache_config_threads_per_disk = 12;
RecInt api_config_threads_per_disk = 12;

// config for io_uring mode
#if AIO_MODE == AIO_MODE_IO_URING
RecInt aio_io_uring_queue_entries = 1024;
RecInt aio_io_uring_sq_poll_ms = 0;
RecInt aio_io_uring_attach_wq = 0;
RecInt aio_io_uring_wq_bounded = 0;
RecInt aio_io_uring_wq_unbounded = 0;
#endif

RecRawStatBlock *aio_rsb = nullptr;
Continuation *aio_err_callbck = nullptr;
// AIO Stats
Expand Down Expand Up @@ -116,7 +108,7 @@ aio_stats_cb(const char * /* name ATS_UNUSED */, RecDataT data_type, RecData *da
case AIO_STAT_KB_WRITE_PER_SEC:
new_val = aio_bytes_written.load() >> 10;
break;
#if AIO_MODE == AIO_MODE_IO_URING
#if TS_USE_LINUX_IO_URING
case AIO_STAT_IO_URING_SUBMITTED:
new_val = io_uring_submissions.load();
break;
Expand Down Expand Up @@ -171,7 +163,7 @@ ink_aio_set_callback(Continuation *callback)
}

void
ink_aio_init(ts::ModuleVersion v)
ink_aio_init(ts::ModuleVersion v, AIOBackend backend)
{
ink_release_assert(v.check(AIO_MODULE_INTERNAL_VERSION));

Expand All @@ -190,35 +182,69 @@ ink_aio_init(ts::ModuleVersion v)
RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.io_uring.completed", RECD_FLOAT, RECP_PERSISTENT,
(int)AIO_STAT_IO_URING_COMPLETED, aio_stats_cb);
#endif
#if AIO_MODE == AIO_MODE_THREAD
#if AIO_MODE == AIO_MODE_DEFAULT
memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
ink_mutex_init(&insert_mutex);
#endif
REC_ReadConfigInteger(cache_config_threads_per_disk, "proxy.config.cache.threads_per_disk");
#if TS_USE_LINUX_NATIVE_AIO
Warning("Running with Linux AIO, there are known issues with this feature");
Warning(
"Running with Linux libaio is deprecated. There are known issues with this feature and it is being replaced with io_uring");
#endif

#if AIO_MODE == AIO_MODE_DEFAULT
#if TS_USE_LINUX_IO_URING
REC_ReadConfigInteger(aio_io_uring_queue_entries, "proxy.config.aio.io_uring.entries");
REC_ReadConfigInteger(aio_io_uring_sq_poll_ms, "proxy.config.aio.io_uring.sq_poll_ms");
REC_ReadConfigInteger(aio_io_uring_attach_wq, "proxy.config.aio.io_uring.attach_wq");
REC_ReadConfigInteger(aio_io_uring_wq_bounded, "proxy.config.aio.io_uring.wq_workers_bounded");
REC_ReadConfigInteger(aio_io_uring_wq_unbounded, "proxy.config.aio.io_uring.wq_workers_unbounded");
#endif
}
// If the caller specified auto backend, check for config to force a backend
if (backend == AIOBackend::AIO_BACKEND_AUTO) {
RecString aio_mode = nullptr;
REC_ReadConfigStringAlloc(aio_mode, "proxy.config.aio.mode");
if (aio_mode) {
if (strcasecmp(aio_mode, "auto") == 0) {
backend = AIOBackend::AIO_BACKEND_AUTO;
} else if (strcasecmp(aio_mode, "thread") == 0) {
// force thread mode
backend = AIOBackend::AIO_BACKEND_THREAD;
} else if (strcasecmp(aio_mode, "io_uring") == 0) {
// force io_uring mode
backend = AIOBackend::AIO_BACKEND_IO_URING;
} else {
Warning("Invalid value '%s' for proxy.config.aio.mode. autodetecting", aio_mode);
}

int
ink_aio_start()
{
#ifdef AIO_STATS
data = new AIOTestData();
eventProcessor.schedule_in(data, HRTIME_MSECONDS(100), ET_CALL);
ats_free(aio_mode);
}
}

switch (backend) {
case AIOBackend::AIO_BACKEND_AUTO: {
// detect if io_uring is available and can support the required features
auto *ctx = IOUringContext::local_context();
if (ctx && ctx->supports_op(IORING_OP_WRITE) && ctx->supports_op(IORING_OP_READ)) {
use_io_uring = true;
} else {
Note("AIO using thread backend as required io_uring ops are not supported");
use_io_uring = false;
}
break;
}
case AIOBackend::AIO_BACKEND_IO_URING:
use_io_uring = true;
break;
case AIOBackend::AIO_BACKEND_THREAD:
use_io_uring = false;
break;
}

if (use_io_uring) {
Note("Using io_uring for AIO");
} else {
Note("Using thread for AIO");
}
#endif
#endif
return 0;
}

#if AIO_MODE == AIO_MODE_THREAD
#if AIO_MODE == AIO_MODE_DEFAULT

struct AIOThreadInfo : public Continuation {
AIO_Reqs *req;
Expand Down Expand Up @@ -444,24 +470,6 @@ cache_op(AIOCallbackInternal *op)
return 1;
}

int
ink_aio_read(AIOCallback *op, int fromAPI)
{
op->aiocb.aio_lio_opcode = LIO_READ;
aio_queue_req((AIOCallbackInternal *)op, fromAPI);

return 1;
}

int
ink_aio_write(AIOCallback *op, int fromAPI)
{
op->aiocb.aio_lio_opcode = LIO_WRITE;
aio_queue_req((AIOCallbackInternal *)op, fromAPI);

return 1;
}

bool
ink_aio_thread_num_set(int thread_num)
{
Expand Down Expand Up @@ -530,14 +538,13 @@ AIOThreadInfo::aio_thread_main(AIOThreadInfo *thr_info)
return nullptr;
}

#elif AIO_MODE == AIO_MODE_IO_URING

#if TS_USE_LINUX_IO_URING
#include "I_IO_URING.h"

void
ink_aiocb::handle_complete(io_uring_cqe *cqe)
AIOCallbackInternal::handle_complete(io_uring_cqe *cqe)
{
AIOCallback *op = this_op;
AIOCallbackInternal *op = this_op;

op->aio_result = static_cast<int64_t>(cqe->res);
op->link.prev = nullptr;
Expand All @@ -553,8 +560,8 @@ ink_aiocb::handle_complete(io_uring_cqe *cqe)
}

// the last op in the linked ops will have the original op stored in the aiocb
if (op->aiocb.aio_op) {
op = op->aiocb.aio_op;
if (aio_op) {
op = op->aio_op;
if (op->thread == AIO_CALLBACK_THREAD_AIO) {
SCOPED_MUTEX_LOCK(lock, op->mutex, this_ethread());
op->handleEvent(EVENT_NONE, nullptr);
Expand All @@ -565,50 +572,68 @@ ink_aiocb::handle_complete(io_uring_cqe *cqe)
}
}
}
#endif

int
ink_aio_read(AIOCallback *op_in, int /* fromAPI ATS_UNUSED */)
ink_aio_read(AIOCallback *op_in, int fromAPI)
{
IOUringContext *ur = IOUringContext::local_context();
AIOCallback *op = op_in;
while (op) {
op->aiocb.this_op = op;
io_uring_sqe *sqe = ur->next_sqe(&op->aiocb);
io_uring_prep_read(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset);
op->aiocb.aio_lio_opcode = LIO_READ;
if (op->then) {
sqe->flags |= IOSQE_IO_LINK;
} else {
op->aiocb.aio_op = op_in;
}
#if TS_USE_LINUX_IO_URING
if (use_io_uring) {
IOUringContext *ur = IOUringContext::local_context();
AIOCallbackInternal *op = static_cast<AIOCallbackInternal *>(op_in);
while (op) {
op->this_op = op;
io_uring_sqe *sqe = ur->next_sqe(op);
io_uring_prep_read(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset);
op->aiocb.aio_lio_opcode = LIO_READ;
if (op->then) {
sqe->flags |= IOSQE_IO_LINK;
} else {
op->aio_op = static_cast<AIOCallbackInternal *>(op_in);
}

op = op->then;
op = static_cast<AIOCallbackInternal *>(op->then);
}
return 1;
}
#endif
op_in->aiocb.aio_lio_opcode = LIO_READ;
aio_queue_req(static_cast<AIOCallbackInternal *>(op_in), fromAPI);

return 1;
}

int
ink_aio_write(AIOCallback *op_in, int /* fromAPI ATS_UNUSED */)
ink_aio_write(AIOCallback *op_in, int fromAPI)
{
IOUringContext *ur = IOUringContext::local_context();
AIOCallback *op = op_in;
while (op) {
op->aiocb.this_op = op;
io_uring_sqe *sqe = ur->next_sqe(&op->aiocb);
io_uring_prep_write(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset);
op->aiocb.aio_lio_opcode = LIO_WRITE;
if (op->then) {
sqe->flags |= IOSQE_IO_LINK;
} else {
op->aiocb.aio_op = op_in;
}
#if TS_USE_LINUX_IO_URING
if (use_io_uring) {
IOUringContext *ur = IOUringContext::local_context();
AIOCallbackInternal *op = static_cast<AIOCallbackInternal *>(op_in);
while (op) {
op->this_op = op;
io_uring_sqe *sqe = ur->next_sqe(op);
io_uring_prep_write(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset);
op->aiocb.aio_lio_opcode = LIO_WRITE;
if (op->then) {
sqe->flags |= IOSQE_IO_LINK;
} else {
op->aio_op = static_cast<AIOCallbackInternal *>(op_in);
}

op = op->then;
op = static_cast<AIOCallbackInternal *>(op->then);
}
return 1;
}
#endif
op_in->aiocb.aio_lio_opcode = LIO_WRITE;
aio_queue_req(static_cast<AIOCallbackInternal *>(op_in), fromAPI);

return 1;
}
#endif

#else
#if AIO_MODE == AIO_MODE_NATIVE
int
DiskHandler::startAIOEvent(int /* event ATS_UNUSED */, Event *e)
{
Expand Down
Loading