Skip to content

Commit

Permalink
ioq: Implement async close() and closedir()
Browse files Browse the repository at this point in the history
  • Loading branch information
tavianator committed Jul 4, 2023
1 parent b1ae0f6 commit 067e674
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 37 deletions.
37 changes: 21 additions & 16 deletions src/bftw.c
Original file line number Diff line number Diff line change
Expand Up @@ -902,28 +902,33 @@ static int bftw_ioq_pop(struct bftw_state *state, bool block) {
}

struct bftw_cache *cache = &state->cache;
++cache->capacity;
struct bftw_file *file;
struct bfs_dir *dir;

struct bftw_file *file = res->ptr;
file->ioqueued = false;
enum ioq_op op = res->op;
if (op == IOQ_OPENDIR) {
file = res->ptr;
file->ioqueued = false;

if (file->parent) {
bftw_cache_unpin(cache, file->parent);
}

if (res->error) {
arena_free(&cache->dirs, res->dir);
} else {
bftw_file_set_dir(cache, file, res->dir);
}
++cache->capacity;
if (file->parent) {
bftw_cache_unpin(cache, file->parent);
}

ioq_free(ioq, res);
dir = res->opendir.dir;
if (res->error) {
arena_free(&cache->dirs, dir);
} else {
bftw_file_set_dir(cache, file, dir);
}

if (!(state->flags & BFTW_SORT)) {
SLIST_PREPEND(&state->dirs, file);
if (!(state->flags & BFTW_SORT)) {
SLIST_PREPEND(&state->dirs, file);
}
}

return 0;
ioq_free(ioq, res);
return op;
}

/** Push a directory onto the queue. */
Expand Down
121 changes: 102 additions & 19 deletions src/ioq.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,29 @@
* An I/O queue request.
*/
struct ioq_req {
/** Directory allocation. */
struct bfs_dir *dir;
/** Base file descriptor for openat(). */
int dfd;
/** Path to open, relative to dfd. */
const char *path;
/** The operation. */
enum ioq_op op;

/** Arbitrary user data. */
void *ptr;

/** Arguments. */
union {
/** ioq_close() args. */
struct {
int fd;
} close;
/** ioq_opendir() args. */
struct {
struct bfs_dir *dir;
int dfd;
const char *path;
} opendir;
/** ioq_closedir() args. */
struct {
struct bfs_dir *dir;
} closedir;
};
};

/**
Expand Down Expand Up @@ -301,16 +315,45 @@ static void *ioq_work(void *ptr) {
sanitize_uninit(cmd);

struct ioq_res *res = &cmd->res;
res->op = req.op;
res->ptr = req.ptr;
res->dir = req.dir;
res->error = 0;

int ret = -1;

switch (req.op) {
case IOQ_CLOSE:
// Always close(), even if we're cancelled, just like a real EINTR
ret = xclose(req.close.fd);
break;

case IOQ_OPENDIR:
res->opendir.dir = req.opendir.dir;
if (cancel) {
break;
}
ret = bfs_opendir(req.opendir.dir, req.opendir.dfd, req.opendir.path);
if (ret == 0) {
bfs_polldir(req.opendir.dir);
}
break;

case IOQ_CLOSEDIR:
res->closedir.dir = req.closedir.dir;
ret = bfs_closedir(req.closedir.dir);
break;

default:
bfs_bug("Unknown ioq_op %d", (int)req.op);
errno = ENOSYS;
break;
}

if (cancel) {
res->error = EINTR;
} else if (bfs_opendir(req.dir, req.dfd, req.path) != 0) {
} else if (ret != 0) {
res->error = errno;
} else {
res->error = 0;
bfs_polldir(res->dir);
bfs_assert(res->error != 0);
}

ioqq_push(ioq->ready, cmd);
Expand Down Expand Up @@ -359,24 +402,64 @@ size_t ioq_capacity(const struct ioq *ioq) {
return ioq->depth - ioq->size;
}

int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) {
static struct ioq_req *ioq_request(struct ioq *ioq, enum ioq_op op, void *ptr) {
if (load(&ioq->cancel, relaxed)) {
errno = EINTR;
return NULL;
}

if (ioq->size >= ioq->depth) {
return -1;
errno = EAGAIN;
return NULL;
}

union ioq_cmd *cmd = arena_alloc(&ioq->cmds);
if (!cmd) {
return -1;
return NULL;
}

struct ioq_req *req = &cmd->req;
req->dir = dir;
req->dfd = dfd;
req->path = path;
req->op = op;
req->ptr = ptr;

ioqq_push(ioq->pending, cmd);
++ioq->size;
return req;
}

int ioq_close(struct ioq *ioq, int fd, void *ptr) {
struct ioq_req *req = ioq_request(ioq, IOQ_CLOSE, ptr);
if (!req) {
return -1;
}

req->close.fd = fd;

ioqq_push(ioq->pending, (union ioq_cmd *)req);
return 0;
}

int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) {
struct ioq_req *req = ioq_request(ioq, IOQ_OPENDIR, ptr);
if (!req) {
return -1;
}

req->opendir.dir = dir;
req->opendir.dfd = dfd;
req->opendir.path = path;

ioqq_push(ioq->pending, (union ioq_cmd *)req);
return 0;
}

int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) {
struct ioq_req *req = ioq_request(ioq, IOQ_CLOSEDIR, ptr);
if (!req) {
return -1;
}

req->closedir.dir = dir;

ioqq_push(ioq->pending, (union ioq_cmd *)req);
return 0;
}

Expand Down
59 changes: 57 additions & 2 deletions src/ioq.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,44 @@
*/
struct ioq;

/**
* I/O queue operations.
*/
enum ioq_op {
/** ioq_close(). */
IOQ_CLOSE,
/** ioq_opendir(). */
IOQ_OPENDIR,
/** ioq_closedir(). */
IOQ_CLOSEDIR,
};

/**
* An I/O queue response.
*/
struct ioq_res {
/** The opened directory. */
struct bfs_dir *dir;
/** The I/O operation. */
enum ioq_op op;

/** The error code, if the operation failed. */
int error;

/** Arbitrary user data. */
void *ptr;

/** Operation-specific responses. */
union {
/** ioq_opendir() response. */
struct {
/** The opened directory. */
struct bfs_dir *dir;
} opendir;
/** ioq_closedir() response. */
struct {
/** The closed directory. */
struct bfs_dir *dir;
} closedir;
};
};

/**
Expand All @@ -45,6 +72,20 @@ struct ioq *ioq_create(size_t depth, size_t nthreads);
*/
size_t ioq_capacity(const struct ioq *ioq);

/**
* Asynchronous close().
*
* @param ioq
* The I/O queue.
* @param fd
* The fd to close.
* @param ptr
* An arbitrary pointer to associate with the request.
* @return
* 0 on success, or -1 on failure.
*/
int ioq_close(struct ioq *ioq, int fd, void *ptr);

/**
* Asynchronous bfs_opendir().
*
Expand All @@ -63,6 +104,20 @@ size_t ioq_capacity(const struct ioq *ioq);
*/
int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr);

/**
* Asynchronous bfs_closedir().
*
* @param ioq
* The I/O queue.
* @param dir
* The directory to close.
* @param ptr
* An arbitrary pointer to associate with the request.
* @return
* 0 on success, or -1 on failure.
*/
int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr);

/**
* Pop a response from the queue.
*
Expand Down

0 comments on commit 067e674

Please sign in to comment.