diff --git a/src/bftw.c b/src/bftw.c index 2bdf12d6..823a2a69 100644 --- a/src/bftw.c +++ b/src/bftw.c @@ -149,29 +149,6 @@ static void bftw_file_close(struct bftw_cache *cache, struct bftw_file *file) { bftw_cache_remove(cache, file); } -/** Free an open directory. */ -static void bftw_file_freedir(struct bftw_cache *cache, struct bftw_file *file) { - if (!file->dir) { - return; - } - - // Try to keep an open fd if any children exist - bool reffed = file->refcount > 1; - // Keep the fd the same if it's pinned - bool pinned = file->pincount > 0; - - if (reffed || pinned) { - int fd = bfs_fdclosedir(file->dir, pinned); - if (fd >= 0) { - file->fd = fd; - arena_free(&cache->dirs, file->dir); - file->dir = NULL; - } - } else { - bftw_file_close(cache, file); - } -} - /** Pop the least recently used directory from the cache. */ static int bftw_cache_pop(struct bftw_cache *cache) { struct bftw_file *file = cache->tail; @@ -228,7 +205,6 @@ static void bftw_cache_unpin(struct bftw_cache *cache, struct bftw_file *file) { if (--file->pincount == 0) { bftw_lru_add(cache, file); - bftw_file_freedir(cache, file); } } @@ -889,6 +865,86 @@ static enum bftw_action bftw_call_back(struct bftw_state *state, const char *nam } } +/** Pop a response from the I/O queue. */ +static size_t bftw_ioq_pop(struct bftw_state *state, bool block) { + struct bftw_cache *cache = &state->cache; + struct ioq *ioq = state->ioq; + if (!ioq) { + return 0; + } + + size_t ret = 0; + while (true) { + struct ioq_res *res = block ? ioq_pop(ioq) : ioq_trypop(ioq); + if (!res) { + break; + } + + struct bftw_file *file; + struct bfs_dir *dir; + + switch (res->op) { + case IOQ_CLOSE: + ++cache->capacity; + break; + + case IOQ_OPENDIR: + file = res->ptr; + file->ioqueued = false; + + ++cache->capacity; + if (file->parent) { + bftw_cache_unpin(cache, file->parent); + } + + dir = res->opendir.dir; + if (res->error) { + arena_free(&cache->dirs, dir); + } else { + bftw_file_set_dir(cache, file, dir); + } + + if (!(state->flags & BFTW_SORT)) { + SLIST_PREPEND(&state->dirs, file); + } + + block = false; + break; + + case IOQ_CLOSEDIR: + ++cache->capacity; + dir = res->closedir.dir; + arena_free(&cache->dirs, dir); + break; + } + + ioq_free(ioq, res); + ++ret; + } + + return ret; +} + +/** Try to make capacity available in the cache. */ +static int bftw_ensure_capacity(struct bftw_state *state) { + struct bftw_cache *cache = &state->cache; + if (cache->capacity > 0) { + return 0; + } + + bftw_ioq_pop(state, false); + if (cache->capacity > 0) { + return 0; + } + + if (bftw_cache_pop(cache) != 0) { + return -1; + } + + bfs_assert(cache->capacity > 0); + return 0; +} + /** Push a directory onto the queue. */ static void bftw_push_dir(struct bftw_state *state, struct bftw_file *file) { bfs_assert(file->type == BFS_DIR); @@ -908,10 +964,8 @@ static void bftw_push_dir(struct bftw_state *state, struct bftw_file *file) { bftw_cache_pin(cache, file->parent); } - if (cache->capacity == 0) { - if (bftw_cache_pop(cache) != 0) { - goto unpin; - } + if (bftw_ensure_capacity(state) != 0) { + goto unpin; } struct bfs_dir *dir = arena_alloc(&state->cache.dirs); @@ -942,46 +996,96 @@ static void bftw_push_dir(struct bftw_state *state, struct bftw_file *file) { SLIST_APPEND(&state->dirs, file); } -/** Pop a response from the I/O queue. */ -static int bftw_ioq_pop(struct bftw_state *state, bool block) { - if (!state->ioq) { - return -1; +/** Close a directory, asynchronously if possible. */ +static int bftw_ioq_closedir(struct bftw_state *state, struct bfs_dir *dir) { + struct ioq *ioq = state->ioq; + if (ioq && ioq_closedir(ioq, dir, NULL) == 0) { + return 0; + } + + struct bftw_cache *cache = &state->cache; + int ret = bfs_closedir(dir); + arena_free(&cache->dirs, dir); + ++cache->capacity; + return ret; +} + +/** Close a file descriptor, asynchronously if possible. */ +static int bftw_ioq_close(struct bftw_state *state, int fd) { + struct ioq *ioq = state->ioq; + if (ioq && ioq_close(ioq, fd, NULL) == 0) { + return 0; } - struct ioq_res *res; - if (block) { - res = ioq_pop(state->ioq); + struct bftw_cache *cache = &state->cache; + int ret = xclose(fd); + ++cache->capacity; + return ret; +} + +/** Close a file, asynchronously if possible. */ +static int bftw_close(struct bftw_state *state, struct bftw_file *file) { + bfs_assert(file->fd >= 0); + bfs_assert(file->pincount == 0); + + struct bfs_dir *dir = file->dir; + int fd = file->fd; + + bftw_lru_remove(&state->cache, file); + file->dir = NULL; + file->fd = -1; + + if (dir) { + return bftw_ioq_closedir(state, dir); } else { - res = ioq_trypop(state->ioq); + return bftw_ioq_close(state, fd); } +} - if (!res) { - return -1; +/** Free an open directory. */ +static int bftw_freedir(struct bftw_state *state, struct bftw_file *file) { + struct bfs_dir *dir = file->dir; + if (!dir) { + return 0; } struct bftw_cache *cache = &state->cache; - ++cache->capacity; - struct bftw_file *file = res->ptr; - file->ioqueued = false; + // Try to keep an open fd if any children exist + bool reffed = file->refcount > 1; + // Keep the fd the same if it's pinned + bool pinned = file->pincount > 0; - if (file->parent) { - bftw_cache_unpin(cache, file->parent); +#if BFS_USE_FDCLOSEDIR + if (reffed || pinned) { + bfs_fdclosedir(dir); + arena_free(&cache->dirs, dir); + file->dir = NULL; + return 0; } +#else + if (pinned) { + return -1; + } +#endif - if (res->error) { - arena_free(&state->cache.dirs, res->dir); - } else { - bftw_file_set_dir(cache, file, res->dir); + if (!reffed) { + return bftw_close(state, file); } - ioq_free(state->ioq, res); + if (bftw_ensure_capacity(state) != 0) { + return -1; + } - if (!(state->flags & BFTW_SORT)) { - SLIST_PREPEND(&state->dirs, file); + int fd = dup_cloexec(file->fd); + if (fd < 0) { + return -1; } + --cache->capacity; - return 0; + file->dir = NULL; + file->fd = fd; + return bftw_ioq_closedir(state, dir); } /** Pop a directory to read from the queue. */ @@ -1094,7 +1198,7 @@ static int bftw_gc(struct bftw_state *state, enum bftw_gc_flags flags) { if (state->dir) { bftw_cache_unpin(&state->cache, state->file); - bftw_file_freedir(&state->cache, state->file); + bftw_freedir(state, state->file); } state->dir = NULL; state->de = NULL; @@ -1131,8 +1235,12 @@ static int bftw_gc(struct bftw_state *state, enum bftw_gc_flags flags) { if (state->previous == file) { state->previous = parent; } - bftw_file_free(&state->cache, file); state->file = parent; + + if (file->fd >= 0) { + bftw_close(state, file); + } + bftw_file_free(&state->cache, file); } return ret; @@ -1147,8 +1255,11 @@ static int bftw_gc(struct bftw_state *state, enum bftw_gc_flags flags) { static int bftw_state_destroy(struct bftw_state *state) { dstrfree(state->path); - if (state->ioq) { - ioq_cancel(state->ioq); + struct ioq *ioq = state->ioq; + if (ioq) { + ioq_cancel(ioq); + while (bftw_ioq_pop(state, true) > 0); + state->ioq = NULL; } SLIST_EXTEND(&state->files, &state->batch); @@ -1156,7 +1267,7 @@ static int bftw_state_destroy(struct bftw_state *state) { bftw_gc(state, BFTW_VISIT_NONE); } while (bftw_pop_dir(state) || bftw_pop_file(state)); - ioq_destroy(state->ioq); + ioq_destroy(ioq); bftw_cache_destroy(&state->cache); diff --git a/src/dir.c b/src/dir.c index 685bac59..ae105105 100644 --- a/src/dir.c +++ b/src/dir.c @@ -15,10 +15,6 @@ #include #include -#ifndef BFS_USE_GETDENTS -# define BFS_USE_GETDENTS (__linux__ || __FreeBSD__) -#endif - #if BFS_USE_GETDENTS # if __linux__ # include @@ -296,25 +292,17 @@ int bfs_closedir(struct bfs_dir *dir) { return ret; } -int bfs_fdclosedir(struct bfs_dir *dir, bool same_fd) { +#if BFS_USE_FDCLOSEDIR +int bfs_fdclosedir(struct bfs_dir *dir) { #if BFS_USE_GETDENTS int ret = dir->fd; #elif __FreeBSD__ int ret = fdclosedir(dir->dir); #else - if (same_fd) { - errno = ENOTSUP; - return -1; - } - - int ret = dup_cloexec(dirfd(dir->dir)); - if (ret < 0) { - return -1; - } - - bfs_closedir(dir); +# error "No implementation of fdclosedir()" #endif sanitize_uninit(dir, DIR_SIZE); return ret; } +#endif diff --git a/src/dir.h b/src/dir.h index 16f592ed..0ccfa81a 100644 --- a/src/dir.h +++ b/src/dir.h @@ -12,6 +12,14 @@ #include "config.h" #include +/** + * Whether the implementation uses the getdents() syscall directly, rather than + * libc's readdir(). + */ +#ifndef BFS_USE_GETDENTS +# define BFS_USE_GETDENTS (__linux__ || __FreeBSD__) +#endif + /** * A directory. */ @@ -128,19 +136,23 @@ int bfs_readdir(struct bfs_dir *dir, struct bfs_dirent *de); */ int bfs_closedir(struct bfs_dir *dir); +/** + * Whether the bfs_fdclosedir() function is supported. + */ +#ifndef BFS_USE_FDCLOSEDIR +# define BFS_USE_FDCLOSEDIR (BFS_USE_GETDENTS || __FreeBSD__) +#endif + +#if BFS_USE_FDCLOSEDIR /** * Extract the file descriptor from an open directory. * * @param dir - * The directory to free. - * @param same_fd - * If true, require that the returned file descriptor is the same one - * that bfs_dirfd() would have returned. Otherwise, it may be a new - * file descriptor for the same directory. + * The directory to detach. * @return - * On success, a file descriptor for the directory is returned. On - * failure, -1 is returned, and the directory remains open. + * The file descriptor of the directory. */ -int bfs_fdclosedir(struct bfs_dir *dir, bool same_fd); +int bfs_fdclosedir(struct bfs_dir *dir); +#endif #endif // BFS_DIR_H diff --git a/src/ioq.c b/src/ioq.c index 1160b340..1a5d7046 100644 --- a/src/ioq.c +++ b/src/ioq.c @@ -20,15 +20,29 @@ * An I/O queue request. */ struct ioq_req { - /** Directory allocation. */ - struct bfs_dir *dir; - /** Base file descriptor for openat(). */ - int dfd; - /** Path to open, relative to dfd. */ - const char *path; + /** The operation. */ + enum ioq_op op; /** Arbitrary user data. */ void *ptr; + + /** Arguments. */ + union { + /** ioq_close() args. */ + struct { + int fd; + } close; + /** ioq_opendir() args. */ + struct { + struct bfs_dir *dir; + int dfd; + const char *path; + } opendir; + /** ioq_closedir() args. */ + struct { + struct bfs_dir *dir; + } closedir; + }; }; /** @@ -297,16 +311,39 @@ static void *ioq_work(void *ptr) { sanitize_uninit(cmd); struct ioq_res *res = &cmd->res; + res->op = req.op; res->ptr = req.ptr; - res->dir = req.dir; + res->error = 0; + + int ret; + switch (req.op) { + case IOQ_CLOSE: + // Always close(), even if we're cancelled, just like a real EINTR + ret = xclose(req.close.fd); + break; + + case IOQ_OPENDIR: + res->opendir.dir = req.opendir.dir; + if (cancel) { + break; + } + ret = bfs_opendir(req.opendir.dir, req.opendir.dfd, req.opendir.path); + if (ret == 0) { + bfs_polldir(req.opendir.dir); + } + break; + + case IOQ_CLOSEDIR: + res->closedir.dir = req.closedir.dir; + ret = bfs_closedir(req.closedir.dir); + break; + } if (cancel) { res->error = EINTR; - } else if (bfs_opendir(req.dir, req.dfd, req.path) != 0) { + } else if (ret != 0) { res->error = errno; - } else { - res->error = 0; - bfs_polldir(res->dir); + bfs_assert(res->error != 0); } ioqq_push(ioq->ready, cmd); @@ -351,24 +388,64 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) { return NULL; } -int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) { +static struct ioq_req *ioq_request(struct ioq *ioq, enum ioq_op op, void *ptr) { + if (load(&ioq->cancel, relaxed)) { + errno = EINTR; + return NULL; + } + if (ioq->size >= ioq->depth) { - return -1; + errno = EAGAIN; + return NULL; } union ioq_cmd *cmd = arena_alloc(&ioq->cmds); if (!cmd) { - return -1; + return NULL; } struct ioq_req *req = &cmd->req; - req->dir = dir; - req->dfd = dfd; - req->path = path; + req->op = op; req->ptr = ptr; - - ioqq_push(ioq->pending, cmd); ++ioq->size; + return req; +} + +int ioq_close(struct ioq *ioq, int fd, void *ptr) { + struct ioq_req *req = ioq_request(ioq, IOQ_CLOSE, ptr); + if (!req) { + return -1; + } + + req->close.fd = fd; + + ioqq_push(ioq->pending, (union ioq_cmd *)req); + return 0; +} + +int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) { + struct ioq_req *req = ioq_request(ioq, IOQ_OPENDIR, ptr); + if (!req) { + return -1; + } + + req->opendir.dir = dir; + req->opendir.dfd = dfd; + req->opendir.path = path; + + ioqq_push(ioq->pending, (union ioq_cmd *)req); + return 0; +} + +int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) { + struct ioq_req *req = ioq_request(ioq, IOQ_CLOSEDIR, ptr); + if (!req) { + return -1; + } + + req->closedir.dir = dir; + + ioqq_push(ioq->pending, (union ioq_cmd *)req); return 0; } diff --git a/src/ioq.h b/src/ioq.h index 064e2e2f..00741ef1 100644 --- a/src/ioq.h +++ b/src/ioq.h @@ -15,17 +15,44 @@ */ struct ioq; +/** + * I/O queue operations. + */ +enum ioq_op { + /** ioq_close(). */ + IOQ_CLOSE, + /** ioq_opendir(). */ + IOQ_OPENDIR, + /** ioq_closedir(). */ + IOQ_CLOSEDIR, +}; + /** * An I/O queue response. */ struct ioq_res { - /** The opened directory. */ - struct bfs_dir *dir; + /** The I/O operation. */ + enum ioq_op op; + /** The error code, if the operation failed. */ int error; /** Arbitrary user data. */ void *ptr; + + /** Operation-specific responses. */ + union { + /** ioq_opendir() response. */ + struct { + /** The opened directory. */ + struct bfs_dir *dir; + } opendir; + /** ioq_closedir() response. */ + struct { + /** The closed directory. */ + struct bfs_dir *dir; + } closedir; + }; }; /** @@ -40,6 +67,20 @@ struct ioq_res { */ struct ioq *ioq_create(size_t depth, size_t nthreads); +/** + * Asynchronous close(). + * + * @param ioq + * The I/O queue. + * @param fd + * The fd to close. + * @param ptr + * An arbitrary pointer to associate with the request. + * @return + * 0 on success, or -1 on failure. + */ +int ioq_close(struct ioq *ioq, int fd, void *ptr); + /** * Asynchronous bfs_opendir(). * @@ -58,6 +99,20 @@ struct ioq *ioq_create(size_t depth, size_t nthreads); */ int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr); +/** + * Asynchronous bfs_closedir(). + * + * @param ioq + * The I/O queue. + * @param dir + * The directory to close. + * @param ptr + * An arbitrary pointer to associate with the request. + * @return + * 0 on success, or -1 on failure. + */ +int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr); + /** * Pop a response from the queue. *