diff options
Diffstat (limited to 'module/zfs/bqueue.c')
-rw-r--r-- | module/zfs/bqueue.c | 111 |
1 files changed, 111 insertions, 0 deletions
diff --git a/module/zfs/bqueue.c b/module/zfs/bqueue.c new file mode 100644 index 000000000..1ddc697b5 --- /dev/null +++ b/module/zfs/bqueue.c @@ -0,0 +1,111 @@ +/* + * CDDL HEADER START + * + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + * + * CDDL HEADER END + */ +/* + * Copyright (c) 2014 by Delphix. All rights reserved. + */ + +#include <sys/bqueue.h> +#include <sys/zfs_context.h> + +static inline bqueue_node_t * +obj2node(bqueue_t *q, void *data) +{ + return ((bqueue_node_t *)((char *)data + q->bq_node_offset)); +} + +/* + * Initialize a blocking queue The maximum capacity of the queue is set to + * size. Types that want to be stored in a bqueue must contain a bqueue_node_t, + * and offset should give its offset from the start of the struct. Return 0 on + * success, or -1 on failure. + */ +int +bqueue_init(bqueue_t *q, uint64_t size, size_t node_offset) +{ + list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t), + node_offset + offsetof(bqueue_node_t, bqn_node)); + cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL); + cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL); + mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL); + q->bq_node_offset = node_offset; + q->bq_size = 0; + q->bq_maxsize = size; + return (0); +} + +/* + * Destroy a blocking queue. This function asserts that there are no + * elements in the queue, and no one is blocked on the condition + * variables. + */ +void +bqueue_destroy(bqueue_t *q) +{ + ASSERT0(q->bq_size); + cv_destroy(&q->bq_add_cv); + cv_destroy(&q->bq_pop_cv); + mutex_destroy(&q->bq_lock); + list_destroy(&q->bq_list); +} + +/* + * Add data to q, consuming size units of capacity. If there is insufficient + * capacity to consume size units, block until capacity exists. Asserts size is + * > 0. + */ +void +bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size) +{ + ASSERT3U(item_size, >, 0); + ASSERT3U(item_size, <, q->bq_maxsize); + mutex_enter(&q->bq_lock); + obj2node(q, data)->bqn_size = item_size; + while (q->bq_size + item_size > q->bq_maxsize) { + cv_wait(&q->bq_add_cv, &q->bq_lock); + } + q->bq_size += item_size; + list_insert_tail(&q->bq_list, data); + cv_signal(&q->bq_pop_cv); + mutex_exit(&q->bq_lock); +} +/* + * Take the first element off of q. If there are no elements on the queue, wait + * until one is put there. Return the removed element. + */ +void * +bqueue_dequeue(bqueue_t *q) +{ + void *ret; + uint64_t item_size; + mutex_enter(&q->bq_lock); + while (q->bq_size == 0) { + cv_wait(&q->bq_pop_cv, &q->bq_lock); + } + ret = list_remove_head(&q->bq_list); + item_size = obj2node(q, ret)->bqn_size; + q->bq_size -= item_size; + mutex_exit(&q->bq_lock); + cv_signal(&q->bq_add_cv); + return (ret); +} + +/* + * Returns true if the space used is 0. + */ +boolean_t +bqueue_empty(bqueue_t *q) +{ + return (q->bq_size == 0); +} |