Skip to content

Commit

Permalink
ioq: Use io_uring
Browse files Browse the repository at this point in the history
Closes #65.
  • Loading branch information
tavianator committed Jul 11, 2023
1 parent 6299b8d commit e26e209
Show file tree
Hide file tree
Showing 2 changed files with 275 additions and 32 deletions.
15 changes: 12 additions & 3 deletions src/bftw.c
Original file line number Diff line number Diff line change
Expand Up @@ -490,15 +490,14 @@ static int bftw_state_init(struct bftw_state *state, const struct bftw_args *arg
errno = EMFILE;
return -1;
}
size_t nopenfd = args->nopenfd;

state->path = dstralloc(0);
if (!state->path) {
return -1;
}

bftw_cache_init(&state->cache, args->nopenfd);

size_t qdepth = args->nopenfd - 1;
size_t qdepth = nopenfd;
if (qdepth > 1024) {
qdepth = 1024;
}
Expand All @@ -508,6 +507,14 @@ static int bftw_state_init(struct bftw_state *state, const struct bftw_args *arg
nthreads = qdepth;
}

#if BFS_USE_LIBURING
// io_uring uses one fd per ring, ioq uses one ring per thread
if (nthreads >= nopenfd) {
nthreads = nopenfd - 1;
}
nopenfd -= nthreads;
#endif

state->ioq = NULL;
if (nthreads > 0) {
state->ioq = ioq_create(qdepth, nthreads);
Expand All @@ -518,6 +525,8 @@ static int bftw_state_init(struct bftw_state *state, const struct bftw_args *arg
}
state->nthreads = nthreads;

bftw_cache_init(&state->cache, nopenfd);

SLIST_INIT(&state->dirs);
SLIST_INIT(&state->files);
SLIST_INIT(&state->batch);
Expand Down
292 changes: 263 additions & 29 deletions src/ioq.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
#include <pthread.h>
#include <stdlib.h>

#if BFS_USE_LIBURING
# include <liburing.h>
#endif

/**
* A monitor for an I/O queue slot.
*/
Expand Down Expand Up @@ -266,6 +270,21 @@ static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) {
/** Sentinel stop command. */
static struct ioq_ent IOQ_STOP;

/** I/O queue thread-specific data. */
struct ioq_thread {
/** The thread handle. */
pthread_t id;
/** Pointer back to the I/O queue. */
struct ioq *parent;

#if BFS_USE_LIBURING
/** io_uring instance. */
struct io_uring ring;
/** Any error that occurred initializing the ring. */
int ring_err;
#endif
};

struct ioq {
/** The depth of the queue. */
size_t depth;
Expand All @@ -285,60 +304,251 @@ struct ioq {
/** The number of background threads. */
size_t nthreads;
/** The background threads themselves. */
pthread_t threads[];
struct ioq_thread threads[];
};

/** Background thread entry point. */
static void *ioq_work(void *ptr) {
struct ioq *ioq = ptr;
/** Cancel a request if we need to. */
static bool ioq_check_cancel(struct ioq *ioq, struct ioq_ent *ent) {
if (!load(&ioq->cancel, relaxed)) {
return false;
}

// Always close(), even if we're cancelled, just like a real EINTR
if (ent->op == IOQ_CLOSE || ent->op == IOQ_CLOSEDIR) {
return false;
}

ent->ret = -1;
ent->error = EINTR;
ioqq_push(ioq->ready, ent);
return true;
}

/** Handle a single request synchronously. */
static void ioq_handle(struct ioq *ioq, struct ioq_ent *ent) {
int ret;

switch (ent->op) {
case IOQ_CLOSE:
ret = xclose(ent->close.fd);
break;

case IOQ_OPENDIR:
ret = bfs_opendir(ent->opendir.dir, ent->opendir.dfd, ent->opendir.path);
if (ret == 0) {
bfs_polldir(ent->opendir.dir);
}
break;

case IOQ_CLOSEDIR:
ret = bfs_closedir(ent->closedir.dir);
break;

default:
bfs_bug("Unknown ioq_op %d", (int)ent->op);
ret = -1;
errno = ENOSYS;
break;
}

ent->ret = ret;
ent->error = ret == 0 ? 0 : errno;

ioqq_push(ioq->ready, ent);
}

#if BFS_USE_LIBURING
/** io_uring worker state. */
struct ioq_ring_state {
/** The I/O queue. */
struct ioq *ioq;
/** The io_uring. */
struct io_uring *ring;
/** The current ioq->pending slot. */
ioq_slot *slot;
/** Saved ioq_ent if the submission queue overflows. */
struct ioq_ent *saved;
/** Number of prepped, unsubmitted SQEs. */
size_t prepped;
/** Number of submitted, unreaped SQEs. */
size_t submitted;
};

/** Pop a request for ioq_ring_prep(). */
static struct ioq_ent *ioq_ring_pop(struct ioq_ring_state *state) {
// Retry the saved request after submission queue overflow
struct ioq_ent *ret = state->saved;
state->saved = NULL;
if (ret) {
return ret;
}

// Advance to the next slot if necessary
struct ioq *ioq = state->ioq;
if (!state->slot) {
state->slot = ioqq_read(ioq->pending);
}

// Block if we have nothing else to do
bool block = !state->prepped && !state->submitted;
ret = ioq_slot_pop(ioq->pending, state->slot, block);

if (ret) {
// Got an entry, move to the next slot next time
state->slot = NULL;
}

return ret;
}

/** Prep a batch of SQEs. */
static bool ioq_ring_prep(struct ioq_ring_state *state) {
struct ioq *ioq = state->ioq;

while (true) {
struct ioq_ent *ent = ioqq_pop(ioq->pending);
if (ent == &IOQ_STOP) {
struct ioq_ent *ent = ioq_ring_pop(state);
if (!ent || ent == &IOQ_STOP) {
// Save IOQ_STOP forever
state->saved = ent;
break;
}

bool cancel = load(&ioq->cancel, relaxed);
if (ioq_check_cancel(ioq, ent)) {
continue;
}

ent->ret = -1;
#if !BFS_USE_UNWRAPDIR
if (ent->op == IOQ_CLOSEDIR) {
ioq_handle(ioq, ent);
continue;
}
#endif

struct io_uring_sqe *sqe = io_uring_get_sqe(state->ring);
if (!sqe) {
// Submission queue overflow, save for next time
state->saved = ent;
break;
}
io_uring_sqe_set_data(sqe, ent);

switch (ent->op) {
case IOQ_CLOSE:
// Always close(), even if we're cancelled, just like a real EINTR
ent->ret = xclose(ent->close.fd);
io_uring_prep_close(sqe, ent->close.fd);
break;

case IOQ_OPENDIR:
if (!cancel) {
struct ioq_opendir *args = &ent->opendir;
ent->ret = bfs_opendir(args->dir, args->dfd, args->path);
if (ent->ret == 0) {
bfs_polldir(args->dir);
}
}
io_uring_prep_openat(sqe, ent->opendir.dfd, ent->opendir.path, O_RDONLY | O_CLOEXEC | O_DIRECTORY, 0);
break;

#if BFS_USE_UNWRAPDIR
case IOQ_CLOSEDIR:
ent->ret = bfs_closedir(ent->closedir.dir);
io_uring_prep_close(sqe, bfs_unwrapdir(ent->closedir.dir));
break;

#endif
default:
bfs_bug("Unknown ioq_op %d", (int)ent->op);
errno = ENOSYS;
io_uring_prep_nop(sqe);
break;
}

if (cancel) {
ent->error = EINTR;
} else if (ent->ret < 0) {
ent->error = errno;
++state->prepped;
}

return state->prepped || state->submitted;
}

/** Reap a batch of SQEs. */
static void ioq_ring_reap(struct ioq_ring_state *state) {
struct ioq *ioq = state->ioq;
struct io_uring *ring = state->ring;

// Always reap at least one CQE
bool reaped = false;

while (state->prepped || state->submitted) {
if (!state->submitted) {
// Try to save a syscall in io_uring_wait_cqe() below
int ret = io_uring_submit_and_wait(ring, reaped ? 0 : 1);
if (ret > 0) {
state->prepped -= ret;
state->submitted += ret;
} else {
continue;
}
}

struct io_uring_cqe *cqe;
if (reaped) {
if (io_uring_peek_cqe(ring, &cqe) < 0) {
break;
}
} else {
ent->error = 0;
if (io_uring_wait_cqe(ring, &cqe) < 0) {
continue;
}
}

struct ioq_ent *ent = io_uring_cqe_get_data(cqe);
ent->ret = cqe->res >= 0 ? cqe->res : -1;
ent->error = cqe->res < 0 ? -cqe->res : 0;
io_uring_cqe_seen(ring, cqe);

if (ent->op == IOQ_OPENDIR && ent->ret >= 0) {
ent->ret = bfs_opendir(ent->opendir.dir, ent->ret, NULL);
if (ent->ret == 0) {
// TODO: io_uring_prep_getdents()
bfs_polldir(ent->opendir.dir);
} else {
ent->error = errno;
}
}

ioqq_push(ioq->ready, ent);
--state->submitted;
reaped = true;
}
}

/** io_uring worker loop. */
static void ioq_ring_work(struct ioq_thread *thread) {
struct ioq_ring_state state = {
.ioq = thread->parent,
.ring = &thread->ring,
};

while (ioq_ring_prep(&state)) {
ioq_ring_reap(&state);
}
}
#endif

/** Synchronous syscall loop. */
static void ioq_sync_work(struct ioq_thread *thread) {
struct ioq *ioq = thread->parent;

while (true) {
struct ioq_ent *ent = ioqq_pop(ioq->pending);
if (ent == &IOQ_STOP) {
break;
}

if (!ioq_check_cancel(ioq, ent)) {
ioq_handle(ioq, ent);
}
}
}

/** Background thread entry point. */
static void *ioq_work(void *ptr) {
struct ioq_thread *thread = ptr;

#if BFS_USE_LIBURING
if (thread->ring_err == 0) {
ioq_ring_work(thread);
return NULL;
}
#endif

ioq_sync_work(thread);
return NULL;
}

Expand All @@ -362,7 +572,27 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) {
}

for (size_t i = 0; i < nthreads; ++i) {
if (thread_create(&ioq->threads[i], NULL, ioq_work, ioq) != 0) {
struct ioq_thread *thread = &ioq->threads[i];
thread->parent = ioq;

#if BFS_USE_LIBURING
struct ioq_thread *prev = i ? &ioq->threads[i - 1] : NULL;
if (prev && prev->ring_err) {
thread->ring_err = prev->ring_err;
} else {
// Share io-wq workers between rings
struct io_uring_params params = {0};
if (prev) {
params.flags |= IORING_SETUP_ATTACH_WQ;
params.wq_fd = prev->ring.ring_fd;
}

// We only need enough io_uring entries for one batch at a time
thread->ring_err = -io_uring_queue_init_params(16, &thread->ring, &params);
}
#endif

if (thread_create(&thread->id, NULL, ioq_work, thread) != 0) {
goto fail;
}
++ioq->nthreads;
Expand Down Expand Up @@ -482,7 +712,11 @@ void ioq_destroy(struct ioq *ioq) {
ioq_cancel(ioq);

for (size_t i = 0; i < ioq->nthreads; ++i) {
thread_join(ioq->threads[i], NULL);
struct ioq_thread *thread = &ioq->threads[i];
thread_join(thread->id, NULL);
#if BFS_USE_LIBURING
io_uring_queue_exit(&thread->ring);
#endif
}

ioqq_destroy(ioq->ready);
Expand Down

0 comments on commit e26e209

Please sign in to comment.