diff options
Diffstat (limited to 'module/zfs/bqueue.c')
-rw-r--r-- | module/zfs/bqueue.c | 73 |
1 files changed, 58 insertions, 15 deletions
diff --git a/module/zfs/bqueue.c b/module/zfs/bqueue.c index 3fc7fcaaa..22539efc4 100644 --- a/module/zfs/bqueue.c +++ b/module/zfs/bqueue.c @@ -13,7 +13,7 @@ * CDDL HEADER END */ /* - * Copyright (c) 2014 by Delphix. All rights reserved. + * Copyright (c) 2014, 2018 by Delphix. All rights reserved. */ #include <sys/bqueue.h> @@ -27,13 +27,27 @@ obj2node(bqueue_t *q, void *data) /* * Initialize a blocking queue The maximum capacity of the queue is set to - * size. Types that want to be stored in a bqueue must contain a bqueue_node_t, - * and offset should give its offset from the start of the struct. Return 0 on - * success, or -1 on failure. + * size. Types that are stored in a bqueue must contain a bqueue_node_t, + * and node_offset must be its offset from the start of the struct. + * fill_fraction is a performance tuning value; when the queue is full, any + * threads attempting to enqueue records will block. They will block until + * they're signaled, which will occur when the queue is at least 1/fill_fraction + * empty. Similar behavior occurs on dequeue; if the queue is empty, threads + * block. They will be signalled when the queue has 1/fill_fraction full, or + * when bqueue_flush is called. As a result, you must call bqueue_flush when + * you enqueue your final record on a thread, in case the dequeueing threads are + * currently blocked and that enqueue does not cause them to be awoken. + * Alternatively, this behavior can be disabled (causing signaling to happen + * immediately) by setting fill_fraction to any value larger than size. + * Return 0 on success, or -1 on failure. */ int -bqueue_init(bqueue_t *q, uint64_t size, size_t node_offset) +bqueue_init(bqueue_t *q, uint64_t fill_fraction, uint64_t size, + size_t node_offset) { + if (fill_fraction == 0) { + return (-1); + } list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t), node_offset + offsetof(bqueue_node_t, bqn_node)); cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL); @@ -42,6 +56,7 @@ bqueue_init(bqueue_t *q, uint64_t size, size_t node_offset) q->bq_node_offset = node_offset; q->bq_size = 0; q->bq_maxsize = size; + q->bq_fill_fraction = fill_fraction; return (0); } @@ -53,20 +68,18 @@ bqueue_init(bqueue_t *q, uint64_t size, size_t node_offset) void bqueue_destroy(bqueue_t *q) { + mutex_enter(&q->bq_lock); ASSERT0(q->bq_size); cv_destroy(&q->bq_add_cv); cv_destroy(&q->bq_pop_cv); - mutex_destroy(&q->bq_lock); list_destroy(&q->bq_list); + mutex_exit(&q->bq_lock); + mutex_destroy(&q->bq_lock); } -/* - * Add data to q, consuming size units of capacity. If there is insufficient - * capacity to consume size units, block until capacity exists. Asserts size is - * > 0. - */ -void -bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size) +static void +bqueue_enqueue_impl(bqueue_t *q, void *data, uint64_t item_size, + boolean_t flush) { ASSERT3U(item_size, >, 0); ASSERT3U(item_size, <=, q->bq_maxsize); @@ -77,9 +90,38 @@ bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size) } q->bq_size += item_size; list_insert_tail(&q->bq_list, data); - cv_signal(&q->bq_pop_cv); + if (q->bq_size >= q->bq_maxsize / q->bq_fill_fraction) + cv_signal(&q->bq_pop_cv); + if (flush) + cv_broadcast(&q->bq_pop_cv); mutex_exit(&q->bq_lock); } + +/* + * Add data to q, consuming size units of capacity. If there is insufficient + * capacity to consume size units, block until capacity exists. Asserts size is + * > 0. + */ +void +bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size) +{ + bqueue_enqueue_impl(q, data, item_size, B_FALSE); +} + +/* + * Enqueue an entry, and then flush the queue. This forces the popping threads + * to wake up, even if we're below the fill fraction. We have this in a single + * function, rather than having a separate call, because it prevents race + * conditions between the enqueuing thread and the dequeueing thread, where the + * enqueueing thread will wake up the dequeueing thread, that thread will + * destroy the condvar before the enqueuing thread is done. + */ +void +bqueue_enqueue_flush(bqueue_t *q, void *data, uint64_t item_size) +{ + bqueue_enqueue_impl(q, data, item_size, B_TRUE); +} + /* * Take the first element off of q. If there are no elements on the queue, wait * until one is put there. Return the removed element. @@ -97,7 +139,8 @@ bqueue_dequeue(bqueue_t *q) ASSERT3P(ret, !=, NULL); item_size = obj2node(q, ret)->bqn_size; q->bq_size -= item_size; - cv_signal(&q->bq_add_cv); + if (q->bq_size <= q->bq_maxsize - (q->bq_maxsize / q->bq_fill_fraction)) + cv_signal(&q->bq_add_cv); mutex_exit(&q->bq_lock); return (ret); } |