diff --git a/include/sys/bqueue.h b/include/sys/bqueue.h index b9621966027a..bd66e172c66e 100644 --- a/include/sys/bqueue.h +++ b/include/sys/bqueue.h @@ -27,10 +27,14 @@ extern "C" { typedef struct bqueue { list_t bq_list; + size_t bq_size; + list_t bq_deqing_list; + size_t bq_deqing_size; + list_t bq_enqing_list; + size_t bq_enqing_size; kmutex_t bq_lock; kcondvar_t bq_add_cv; kcondvar_t bq_pop_cv; - size_t bq_size; size_t bq_maxsize; uint_t bq_fill_fraction; size_t bq_node_offset; @@ -47,7 +51,6 @@ void bqueue_destroy(bqueue_t *); void bqueue_enqueue(bqueue_t *, void *, size_t); void bqueue_enqueue_flush(bqueue_t *, void *, size_t); void *bqueue_dequeue(bqueue_t *); -boolean_t bqueue_empty(bqueue_t *); #ifdef __cplusplus } diff --git a/module/zfs/bqueue.c b/module/zfs/bqueue.c index ec5ce4388ec8..a1a3b31186d4 100644 --- a/module/zfs/bqueue.c +++ b/module/zfs/bqueue.c @@ -27,19 +27,26 @@ obj2node(bqueue_t *q, void *data) /* * Initialize a blocking queue The maximum capacity of the queue is set to - * size. Types that are stored in a bqueue must contain a bqueue_node_t, - * and node_offset must be its offset from the start of the struct. - * fill_fraction is a performance tuning value; when the queue is full, any - * threads attempting to enqueue records will block. They will block until - * they're signaled, which will occur when the queue is at least 1/fill_fraction + * size. Types that are stored in a bqueue must contain a bqueue_node_t, and + * node_offset must be its offset from the start of the struct. fill_fraction + * is a performance tuning value; when the queue is full, any threads + * attempting to enqueue records will block. They will block until they're + * signaled, which will occur when the queue is at least 1/fill_fraction * empty. Similar behavior occurs on dequeue; if the queue is empty, threads - * block. They will be signalled when the queue has 1/fill_fraction full, or - * when bqueue_flush is called. As a result, you must call bqueue_flush when - * you enqueue your final record on a thread, in case the dequeueing threads are - * currently blocked and that enqueue does not cause them to be awoken. - * Alternatively, this behavior can be disabled (causing signaling to happen - * immediately) by setting fill_fraction to any value larger than size. - * Return 0 on success, or -1 on failure. + * block. They will be signalled when the queue has 1/fill_fraction full. + * As a result, you must call bqueue_enqueue_flush() when you enqueue your + * final record on a thread, in case the dequeuing threads are currently + * blocked and that enqueue does not cause them to be woken. Alternatively, + * this behavior can be disabled (causing signaling to happen immediately) by + * setting fill_fraction to any value larger than size. Return 0 on success, + * or -1 on failure. + * + * Note: The caller must ensure that for a given bqueue_t, there's only a + * single call to bqueue_enqueue() running at a time (e.g. by calling only + * from a single thread, or with locking around the call). Similarly, the + * caller must ensure that there's only a single call to bqueue_dequeue() + * running at a time. However, the one call to bqueue_enqueue() may be + * invoked concurrently with the one call to bqueue_dequeue(). */ int bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset) @@ -49,11 +56,17 @@ bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset) } list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t), node_offset + offsetof(bqueue_node_t, bqn_node)); + list_create(&q->bq_deqing_list, node_offset + sizeof (bqueue_node_t), + node_offset + offsetof(bqueue_node_t, bqn_node)); + list_create(&q->bq_enqing_list, node_offset + sizeof (bqueue_node_t), + node_offset + offsetof(bqueue_node_t, bqn_node)); cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL); cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL); mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL); q->bq_node_offset = node_offset; q->bq_size = 0; + q->bq_deqing_size = 0; + q->bq_enqing_size = 0; q->bq_maxsize = size; q->bq_fill_fraction = fill_fraction; return (0); @@ -69,9 +82,13 @@ bqueue_destroy(bqueue_t *q) { mutex_enter(&q->bq_lock); ASSERT0(q->bq_size); + ASSERT0(q->bq_deqing_size); + ASSERT0(q->bq_enqing_size); cv_destroy(&q->bq_add_cv); cv_destroy(&q->bq_pop_cv); list_destroy(&q->bq_list); + list_destroy(&q->bq_deqing_list); + list_destroy(&q->bq_enqing_list); mutex_exit(&q->bq_lock); mutex_destroy(&q->bq_lock); } @@ -81,23 +98,24 @@ bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush) { ASSERT3U(item_size, >, 0); ASSERT3U(item_size, <=, q->bq_maxsize); - mutex_enter(&q->bq_lock); + obj2node(q, data)->bqn_size = item_size; - while (q->bq_size && q->bq_size + item_size > q->bq_maxsize) { - /* - * Wake up bqueue_dequeue() thread if already sleeping in order - * to prevent the deadlock condition - */ - cv_signal(&q->bq_pop_cv); - cv_wait_sig(&q->bq_add_cv, &q->bq_lock); - } - q->bq_size += item_size; - list_insert_tail(&q->bq_list, data); - if (flush) + q->bq_enqing_size += item_size; + list_insert_tail(&q->bq_enqing_list, data); + + if (flush || + q->bq_enqing_size >= q->bq_maxsize / q->bq_fill_fraction) { + /* Append the enquing list to the shared list. */ + mutex_enter(&q->bq_lock); + while (q->bq_size > q->bq_maxsize) { + cv_wait_sig(&q->bq_add_cv, &q->bq_lock); + } + q->bq_size += q->bq_enqing_size; + list_move_tail(&q->bq_list, &q->bq_enqing_list); + q->bq_enqing_size = 0; cv_broadcast(&q->bq_pop_cv); - else if (q->bq_size >= q->bq_maxsize / q->bq_fill_fraction) - cv_signal(&q->bq_pop_cv); - mutex_exit(&q->bq_lock); + mutex_exit(&q->bq_lock); + } } /* @@ -115,8 +133,8 @@ bqueue_enqueue(bqueue_t *q, void *data, size_t item_size) * Enqueue an entry, and then flush the queue. This forces the popping threads * to wake up, even if we're below the fill fraction. We have this in a single * function, rather than having a separate call, because it prevents race - * conditions between the enqueuing thread and the dequeueing thread, where the - * enqueueing thread will wake up the dequeueing thread, that thread will + * conditions between the enqueuing thread and the dequeuing thread, where the + * enqueueing thread will wake up the dequeuing thread, that thread will * destroy the condvar before the enqueuing thread is done. */ void @@ -132,27 +150,26 @@ bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size) void * bqueue_dequeue(bqueue_t *q) { - void *ret = NULL; - size_t item_size; - mutex_enter(&q->bq_lock); - while (q->bq_size == 0) { - cv_wait_sig(&q->bq_pop_cv, &q->bq_lock); + void *ret = list_remove_head(&q->bq_deqing_list); + if (ret == NULL) { + /* + * Dequeuing list is empty. Wait for there to be something on + * the shared list, then move the entire shared list to the + * dequeuing list. + */ + mutex_enter(&q->bq_lock); + while (q->bq_size == 0) { + cv_wait_sig(&q->bq_pop_cv, &q->bq_lock); + } + ASSERT0(q->bq_deqing_size); + ASSERT(list_is_empty(&q->bq_deqing_list)); + list_move_tail(&q->bq_deqing_list, &q->bq_list); + q->bq_deqing_size = q->bq_size; + q->bq_size = 0; + cv_broadcast(&q->bq_add_cv); + mutex_exit(&q->bq_lock); + ret = list_remove_head(&q->bq_deqing_list); } - ret = list_remove_head(&q->bq_list); - ASSERT3P(ret, !=, NULL); - item_size = obj2node(q, ret)->bqn_size; - q->bq_size -= item_size; - if (q->bq_size <= q->bq_maxsize - (q->bq_maxsize / q->bq_fill_fraction)) - cv_signal(&q->bq_add_cv); - mutex_exit(&q->bq_lock); + q->bq_deqing_size -= obj2node(q, ret)->bqn_size; return (ret); } - -/* - * Returns true if the space used is 0. - */ -boolean_t -bqueue_empty(bqueue_t *q) -{ - return (q->bq_size == 0); -}