diff options
author | Marek Olšák <[email protected]> | 2016-06-11 15:40:28 +0200 |
---|---|---|
committer | Marek Olšák <[email protected]> | 2016-06-24 12:24:40 +0200 |
commit | 404d0d50d8aaf60597668e65a2d7c96cdea53aea (patch) | |
tree | 4bc6013efc181d8678a00ac5f434bc4433772172 /src/gallium/auxiliary/util | |
parent | 4358f6dd130680d60d48d6646959c11c8d7ca13d (diff) |
gallium/u_queue: add an option to have multiple worker threads
independent jobs don't have to be stuck on only one thread
v2: use CALLOC & FREE
Reviewed-by: Nicolai Hähnle <[email protected]>
Diffstat (limited to 'src/gallium/auxiliary/util')
-rw-r--r-- | src/gallium/auxiliary/util/u_queue.c | 66 | ||||
-rw-r--r-- | src/gallium/auxiliary/util/u_queue.h | 12 |
2 files changed, 63 insertions, 15 deletions
diff --git a/src/gallium/auxiliary/util/u_queue.c b/src/gallium/auxiliary/util/u_queue.c index 7a67c1169d9..a958c04d5db 100644 --- a/src/gallium/auxiliary/util/u_queue.c +++ b/src/gallium/auxiliary/util/u_queue.c @@ -25,6 +25,7 @@ */ #include "u_queue.h" +#include "u_memory.h" #include "os/os_time.h" static void @@ -48,15 +49,23 @@ util_queue_job_wait(struct util_queue_fence *fence) pipe_mutex_unlock(fence->mutex); } -static PIPE_THREAD_ROUTINE(util_queue_thread_func, param) +struct thread_input { + struct util_queue *queue; + int thread_index; +}; + +static PIPE_THREAD_ROUTINE(util_queue_thread_func, input) { - struct util_queue *queue = (struct util_queue*)param; + struct util_queue *queue = ((struct thread_input*)input)->queue; + int thread_index = ((struct thread_input*)input)->thread_index; + + FREE(input); while (1) { struct util_queue_job job; pipe_semaphore_wait(&queue->queued); - if (queue->kill_thread) + if (queue->kill_threads) break; pipe_mutex_lock(queue->lock); @@ -68,7 +77,7 @@ static PIPE_THREAD_ROUTINE(util_queue_thread_func, param) pipe_semaphore_signal(&queue->has_space); if (job.job) { - queue->execute_job(job.job); + queue->execute_job(job.job, thread_index); util_queue_fence_signal(job.fence); } } @@ -88,9 +97,13 @@ static PIPE_THREAD_ROUTINE(util_queue_thread_func, param) bool util_queue_init(struct util_queue *queue, unsigned max_jobs, - void (*execute_job)(void *)) + unsigned num_threads, + void (*execute_job)(void *, int)) { + unsigned i; + memset(queue, 0, sizeof(*queue)); + queue->num_threads = num_threads; queue->max_jobs = max_jobs; queue->jobs = (struct util_queue_job*) @@ -103,13 +116,36 @@ util_queue_init(struct util_queue *queue, pipe_semaphore_init(&queue->has_space, max_jobs); pipe_semaphore_init(&queue->queued, 0); - queue->thread = pipe_thread_create(util_queue_thread_func, queue); - if (!queue->thread) + queue->threads = (pipe_thread*)CALLOC(num_threads, sizeof(pipe_thread)); + if (!queue->threads) goto fail; + /* start threads */ + for (i = 0; i < num_threads; i++) { + struct thread_input *input = MALLOC_STRUCT(thread_input); + input->queue = queue; + input->thread_index = i; + + queue->threads[i] = pipe_thread_create(util_queue_thread_func, input); + + if (!queue->threads[i]) { + FREE(input); + + if (i == 0) { + /* no threads created, fail */ + goto fail; + } else { + /* at least one thread created, so use it */ + queue->num_threads = i+1; + break; + } + } + } return true; fail: + FREE(queue->threads); + if (queue->jobs) { pipe_semaphore_destroy(&queue->has_space); pipe_semaphore_destroy(&queue->queued); @@ -124,13 +160,23 @@ fail: void util_queue_destroy(struct util_queue *queue) { - queue->kill_thread = 1; - pipe_semaphore_signal(&queue->queued); - pipe_thread_wait(queue->thread); + unsigned i; + + /* Signal all threads to terminate. */ + pipe_mutex_lock(queue->queued.mutex); + queue->kill_threads = 1; + queue->queued.counter = queue->num_threads; + pipe_condvar_broadcast(queue->queued.cond); + pipe_mutex_unlock(queue->queued.mutex); + + for (i = 0; i < queue->num_threads; i++) + pipe_thread_wait(queue->threads[i]); + pipe_semaphore_destroy(&queue->has_space); pipe_semaphore_destroy(&queue->queued); pipe_mutex_destroy(queue->lock); FREE(queue->jobs); + FREE(queue->threads); } void diff --git a/src/gallium/auxiliary/util/u_queue.h b/src/gallium/auxiliary/util/u_queue.h index acebb51382f..f3aa4f6f5c6 100644 --- a/src/gallium/auxiliary/util/u_queue.h +++ b/src/gallium/auxiliary/util/u_queue.h @@ -54,17 +54,19 @@ struct util_queue { pipe_mutex lock; pipe_semaphore has_space; pipe_semaphore queued; - pipe_thread thread; - int kill_thread; + pipe_thread *threads; + unsigned num_threads; + int kill_threads; int max_jobs; int write_idx, read_idx; /* ring buffer pointers */ struct util_queue_job *jobs; - void (*execute_job)(void *job); + void (*execute_job)(void *job, int thread_index); }; bool util_queue_init(struct util_queue *queue, unsigned max_jobs, - void (*execute_job)(void *)); + unsigned num_threads, + void (*execute_job)(void *, int)); void util_queue_destroy(struct util_queue *queue); void util_queue_fence_init(struct util_queue_fence *fence); void util_queue_fence_destroy(struct util_queue_fence *fence); @@ -78,7 +80,7 @@ void util_queue_job_wait(struct util_queue_fence *fence); static inline bool util_queue_is_initialized(struct util_queue *queue) { - return queue->thread != 0; + return queue->threads != NULL; } static inline bool |