Skip to content

Commit

Permalink
ioq: Implement async close() and closedir()
Browse files Browse the repository at this point in the history
  • Loading branch information
tavianator committed Jun 30, 2023
1 parent 1864ca8 commit c5bcf2e
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 101 deletions.
223 changes: 167 additions & 56 deletions src/bftw.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,29 +149,6 @@ static void bftw_file_close(struct bftw_cache *cache, struct bftw_file *file) {
bftw_cache_remove(cache, file);
}

/** Free an open directory. */
static void bftw_file_freedir(struct bftw_cache *cache, struct bftw_file *file) {
if (!file->dir) {
return;
}

// Try to keep an open fd if any children exist
bool reffed = file->refcount > 1;
// Keep the fd the same if it's pinned
bool pinned = file->pincount > 0;

if (reffed || pinned) {
int fd = bfs_fdclosedir(file->dir, pinned);
if (fd >= 0) {
file->fd = fd;
arena_free(&cache->dirs, file->dir);
file->dir = NULL;
}
} else {
bftw_file_close(cache, file);
}
}

/** Pop the least recently used directory from the cache. */
static int bftw_cache_pop(struct bftw_cache *cache) {
struct bftw_file *file = cache->tail;
Expand Down Expand Up @@ -228,7 +205,6 @@ static void bftw_cache_unpin(struct bftw_cache *cache, struct bftw_file *file) {

if (--file->pincount == 0) {
bftw_lru_add(cache, file);
bftw_file_freedir(cache, file);
}
}

Expand Down Expand Up @@ -889,6 +865,86 @@ static enum bftw_action bftw_call_back(struct bftw_state *state, const char *nam
}
}

/** Pop a response from the I/O queue. */
static size_t bftw_ioq_pop(struct bftw_state *state, bool block) {
struct bftw_cache *cache = &state->cache;
struct ioq *ioq = state->ioq;
if (!ioq) {
return 0;
}

size_t ret = 0;
while (true) {
struct ioq_res *res = block ? ioq_pop(ioq) : ioq_trypop(ioq);
if (!res) {
break;
}

struct bftw_file *file;
struct bfs_dir *dir;

switch (res->op) {
case IOQ_CLOSE:
++cache->capacity;
break;

case IOQ_OPENDIR:
file = res->ptr;
file->ioqueued = false;

++cache->capacity;
if (file->parent) {
bftw_cache_unpin(cache, file->parent);
}

dir = res->opendir.dir;
if (res->error) {
arena_free(&cache->dirs, dir);
} else {
bftw_file_set_dir(cache, file, dir);
}

if (!(state->flags & BFTW_SORT)) {
SLIST_PREPEND(&state->dirs, file);
}

block = false;
break;

case IOQ_CLOSEDIR:
++cache->capacity;
dir = res->closedir.dir;
arena_free(&cache->dirs, dir);
break;
}

ioq_free(ioq, res);
++ret;
}

return ret;
}

/** Try to make capacity available in the cache. */
static int bftw_ensure_capacity(struct bftw_state *state) {
struct bftw_cache *cache = &state->cache;
if (cache->capacity > 0) {
return 0;
}

bftw_ioq_pop(state, false);
if (cache->capacity > 0) {
return 0;
}

if (bftw_cache_pop(cache) != 0) {
return -1;
}

bfs_assert(cache->capacity > 0);
return 0;
}

/** Push a directory onto the queue. */
static void bftw_push_dir(struct bftw_state *state, struct bftw_file *file) {
bfs_assert(file->type == BFS_DIR);
Expand All @@ -908,10 +964,8 @@ static void bftw_push_dir(struct bftw_state *state, struct bftw_file *file) {
bftw_cache_pin(cache, file->parent);
}

if (cache->capacity == 0) {
if (bftw_cache_pop(cache) != 0) {
goto unpin;
}
if (bftw_ensure_capacity(state) != 0) {
goto unpin;
}

struct bfs_dir *dir = arena_alloc(&state->cache.dirs);
Expand Down Expand Up @@ -942,46 +996,96 @@ static void bftw_push_dir(struct bftw_state *state, struct bftw_file *file) {
SLIST_APPEND(&state->dirs, file);
}

/** Pop a response from the I/O queue. */
static int bftw_ioq_pop(struct bftw_state *state, bool block) {
if (!state->ioq) {
return -1;
/** Close a directory, asynchronously if possible. */
static int bftw_ioq_closedir(struct bftw_state *state, struct bfs_dir *dir) {
struct ioq *ioq = state->ioq;
if (ioq && ioq_closedir(ioq, dir, NULL) == 0) {
return 0;
}

struct bftw_cache *cache = &state->cache;
int ret = bfs_closedir(dir);
arena_free(&cache->dirs, dir);
++cache->capacity;
return ret;
}

/** Close a file descriptor, asynchronously if possible. */
static int bftw_ioq_close(struct bftw_state *state, int fd) {
struct ioq *ioq = state->ioq;
if (ioq && ioq_close(ioq, fd, NULL) == 0) {
return 0;
}

struct ioq_res *res;
if (block) {
res = ioq_pop(state->ioq);
struct bftw_cache *cache = &state->cache;
int ret = xclose(fd);
++cache->capacity;
return ret;
}

/** Close a file, asynchronously if possible. */
static int bftw_close(struct bftw_state *state, struct bftw_file *file) {
bfs_assert(file->fd >= 0);
bfs_assert(file->pincount == 0);

struct bfs_dir *dir = file->dir;
int fd = file->fd;

bftw_lru_remove(&state->cache, file);
file->dir = NULL;
file->fd = -1;

if (dir) {
return bftw_ioq_closedir(state, dir);
} else {
res = ioq_trypop(state->ioq);
return bftw_ioq_close(state, fd);
}
}

if (!res) {
return -1;
/** Free an open directory. */
static int bftw_freedir(struct bftw_state *state, struct bftw_file *file) {
struct bfs_dir *dir = file->dir;
if (!dir) {
return 0;
}

struct bftw_cache *cache = &state->cache;
++cache->capacity;

struct bftw_file *file = res->ptr;
file->ioqueued = false;
// Try to keep an open fd if any children exist
bool reffed = file->refcount > 1;
// Keep the fd the same if it's pinned
bool pinned = file->pincount > 0;

if (file->parent) {
bftw_cache_unpin(cache, file->parent);
#if BFS_USE_FDCLOSEDIR
if (reffed || pinned) {
bfs_fdclosedir(dir);
arena_free(&cache->dirs, dir);
file->dir = NULL;
return 0;
}
#else
if (pinned) {
return -1;
}
#endif

if (res->error) {
arena_free(&state->cache.dirs, res->dir);
} else {
bftw_file_set_dir(cache, file, res->dir);
if (!reffed) {
return bftw_close(state, file);
}

ioq_free(state->ioq, res);
if (bftw_ensure_capacity(state) != 0) {
return -1;
}

if (!(state->flags & BFTW_SORT)) {
SLIST_PREPEND(&state->dirs, file);
int fd = dup_cloexec(file->fd);
if (fd < 0) {
return -1;
}
--cache->capacity;

return 0;
file->dir = NULL;
file->fd = fd;
return bftw_ioq_closedir(state, dir);
}

/** Pop a directory to read from the queue. */
Expand Down Expand Up @@ -1094,7 +1198,7 @@ static int bftw_gc(struct bftw_state *state, enum bftw_gc_flags flags) {

if (state->dir) {
bftw_cache_unpin(&state->cache, state->file);
bftw_file_freedir(&state->cache, state->file);
bftw_freedir(state, state->file);
}
state->dir = NULL;
state->de = NULL;
Expand Down Expand Up @@ -1131,8 +1235,12 @@ static int bftw_gc(struct bftw_state *state, enum bftw_gc_flags flags) {
if (state->previous == file) {
state->previous = parent;
}
bftw_file_free(&state->cache, file);
state->file = parent;

if (file->fd >= 0) {
bftw_close(state, file);
}
bftw_file_free(&state->cache, file);
}

return ret;
Expand All @@ -1147,16 +1255,19 @@ static int bftw_gc(struct bftw_state *state, enum bftw_gc_flags flags) {
static int bftw_state_destroy(struct bftw_state *state) {
dstrfree(state->path);

if (state->ioq) {
ioq_cancel(state->ioq);
struct ioq *ioq = state->ioq;
if (ioq) {
ioq_cancel(ioq);
while (bftw_ioq_pop(state, true) > 0);
state->ioq = NULL;
}

SLIST_EXTEND(&state->files, &state->batch);
do {
bftw_gc(state, BFTW_VISIT_NONE);
} while (bftw_pop_dir(state) || bftw_pop_file(state));

ioq_destroy(state->ioq);
ioq_destroy(ioq);

bftw_cache_destroy(&state->cache);

Expand Down
20 changes: 4 additions & 16 deletions src/dir.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
#include <sys/stat.h>
#include <unistd.h>

#ifndef BFS_USE_GETDENTS
# define BFS_USE_GETDENTS (__linux__ || __FreeBSD__)
#endif

#if BFS_USE_GETDENTS
# if __linux__
# include <sys/syscall.h>
Expand Down Expand Up @@ -296,25 +292,17 @@ int bfs_closedir(struct bfs_dir *dir) {
return ret;
}

int bfs_fdclosedir(struct bfs_dir *dir, bool same_fd) {
#if BFS_USE_FDCLOSEDIR
int bfs_fdclosedir(struct bfs_dir *dir) {
#if BFS_USE_GETDENTS
int ret = dir->fd;
#elif __FreeBSD__
int ret = fdclosedir(dir->dir);
#else
if (same_fd) {
errno = ENOTSUP;
return -1;
}

int ret = dup_cloexec(dirfd(dir->dir));
if (ret < 0) {
return -1;
}

bfs_closedir(dir);
# error "No implementation of fdclosedir()"
#endif

sanitize_uninit(dir, DIR_SIZE);
return ret;
}
#endif
28 changes: 20 additions & 8 deletions src/dir.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
#include "config.h"
#include <sys/types.h>

/**
* Whether the implementation uses the getdents() syscall directly, rather than
* libc's readdir().
*/
#ifndef BFS_USE_GETDENTS
# define BFS_USE_GETDENTS (__linux__ || __FreeBSD__)
#endif

/**
* A directory.
*/
Expand Down Expand Up @@ -128,19 +136,23 @@ int bfs_readdir(struct bfs_dir *dir, struct bfs_dirent *de);
*/
int bfs_closedir(struct bfs_dir *dir);

/**
* Whether the bfs_fdclosedir() function is supported.
*/
#ifndef BFS_USE_FDCLOSEDIR
# define BFS_USE_FDCLOSEDIR (BFS_USE_GETDENTS || __FreeBSD__)
#endif

#if BFS_USE_FDCLOSEDIR
/**
* Extract the file descriptor from an open directory.
*
* @param dir
* The directory to free.
* @param same_fd
* If true, require that the returned file descriptor is the same one
* that bfs_dirfd() would have returned. Otherwise, it may be a new
* file descriptor for the same directory.
* The directory to detach.
* @return
* On success, a file descriptor for the directory is returned. On
* failure, -1 is returned, and the directory remains open.
* The file descriptor of the directory.
*/
int bfs_fdclosedir(struct bfs_dir *dir, bool same_fd);
int bfs_fdclosedir(struct bfs_dir *dir);
#endif

#endif // BFS_DIR_H
Loading

0 comments on commit c5bcf2e

Please sign in to comment.