diff options
-rw-r--r-- | src/util/u_queue.c | 50 | ||||
-rw-r--r-- | src/util/u_queue.h | 8 |
2 files changed, 52 insertions, 6 deletions
diff --git a/src/util/u_queue.c b/src/util/u_queue.c index 5e0c1095569..cd0b95b6ead 100644 --- a/src/util/u_queue.c +++ b/src/util/u_queue.c @@ -34,7 +34,8 @@ #include "u_process.h" static void -util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads); +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, + bool finish_locked); /**************************************************************************** * Wait for all queues to assert idle when exit() is called. @@ -55,7 +56,7 @@ atexit_handler(void) mtx_lock(&exit_mutex); /* Wait for all queues to assert idle. */ LIST_FOR_EACH_ENTRY(iter, &queue_list, head) { - util_queue_kill_threads(iter, 0); + util_queue_kill_threads(iter, 0, false); } mtx_unlock(&exit_mutex); } @@ -340,6 +341,39 @@ util_queue_create_thread(struct util_queue *queue, unsigned index) return true; } +void +util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads) +{ + num_threads = MIN2(num_threads, queue->max_threads); + num_threads = MAX2(num_threads, 1); + + mtx_lock(&queue->finish_lock); + unsigned old_num_threads = queue->num_threads; + + if (num_threads == old_num_threads) { + mtx_unlock(&queue->finish_lock); + return; + } + + if (num_threads < old_num_threads) { + util_queue_kill_threads(queue, num_threads, true); + mtx_unlock(&queue->finish_lock); + return; + } + + /* Create threads. + * + * We need to update num_threads first, because threads terminate + * when thread_index < num_threads. + */ + queue->num_threads = num_threads; + for (unsigned i = old_num_threads; i < num_threads; i++) { + if (!util_queue_create_thread(queue, i)) + break; + } + mtx_unlock(&queue->finish_lock); +} + bool util_queue_init(struct util_queue *queue, const char *name, @@ -378,6 +412,7 @@ util_queue_init(struct util_queue *queue, } queue->flags = flags; + queue->max_threads = num_threads; queue->num_threads = num_threads; queue->max_jobs = max_jobs; @@ -429,12 +464,14 @@ fail: } static void -util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads) +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, + bool finish_locked) { unsigned i; /* Signal all threads to terminate. */ - mtx_lock(&queue->finish_lock); + if (!finish_locked) + mtx_lock(&queue->finish_lock); if (keep_num_threads >= queue->num_threads) { mtx_unlock(&queue->finish_lock); @@ -453,13 +490,14 @@ util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads) for (i = keep_num_threads; i < old_num_threads; i++) thrd_join(queue->threads[i], NULL); - mtx_unlock(&queue->finish_lock); + if (!finish_locked) + mtx_unlock(&queue->finish_lock); } void util_queue_destroy(struct util_queue *queue) { - util_queue_kill_threads(queue, 0); + util_queue_kill_threads(queue, 0, false); remove_from_atexit_list(queue); cnd_destroy(&queue->has_space_cond); diff --git a/src/util/u_queue.h b/src/util/u_queue.h index 756fa53e1bf..2d269099c20 100644 --- a/src/util/u_queue.h +++ b/src/util/u_queue.h @@ -208,6 +208,7 @@ struct util_queue { thrd_t *threads; unsigned flags; int num_queued; + unsigned max_threads; unsigned num_threads; /* decreasing this number will terminate threads */ int max_jobs; int write_idx, read_idx; /* ring buffer pointers */ @@ -235,6 +236,13 @@ void util_queue_drop_job(struct util_queue *queue, void util_queue_finish(struct util_queue *queue); +/* Adjust the number of active threads. The new number of threads can't be + * greater than the initial number of threads at the creation of the queue, + * and it can't be less than 1. + */ +void +util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads); + int64_t util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index); |