aboutsummaryrefslogtreecommitdiffstats
path: root/src/gallium/auxiliary/util
diff options
context:
space:
mode:
authorMarek Olšák <[email protected]>2016-06-11 15:40:28 +0200
committerMarek Olšák <[email protected]>2016-06-24 12:24:40 +0200
commit404d0d50d8aaf60597668e65a2d7c96cdea53aea (patch)
tree4bc6013efc181d8678a00ac5f434bc4433772172 /src/gallium/auxiliary/util
parent4358f6dd130680d60d48d6646959c11c8d7ca13d (diff)
gallium/u_queue: add an option to have multiple worker threads
independent jobs don't have to be stuck on only one thread v2: use CALLOC & FREE Reviewed-by: Nicolai Hähnle <[email protected]>
Diffstat (limited to 'src/gallium/auxiliary/util')
-rw-r--r--src/gallium/auxiliary/util/u_queue.c66
-rw-r--r--src/gallium/auxiliary/util/u_queue.h12
2 files changed, 63 insertions, 15 deletions
diff --git a/src/gallium/auxiliary/util/u_queue.c b/src/gallium/auxiliary/util/u_queue.c
index 7a67c1169d9..a958c04d5db 100644
--- a/src/gallium/auxiliary/util/u_queue.c
+++ b/src/gallium/auxiliary/util/u_queue.c
@@ -25,6 +25,7 @@
*/
#include "u_queue.h"
+#include "u_memory.h"
#include "os/os_time.h"
static void
@@ -48,15 +49,23 @@ util_queue_job_wait(struct util_queue_fence *fence)
pipe_mutex_unlock(fence->mutex);
}
-static PIPE_THREAD_ROUTINE(util_queue_thread_func, param)
+struct thread_input {
+ struct util_queue *queue;
+ int thread_index;
+};
+
+static PIPE_THREAD_ROUTINE(util_queue_thread_func, input)
{
- struct util_queue *queue = (struct util_queue*)param;
+ struct util_queue *queue = ((struct thread_input*)input)->queue;
+ int thread_index = ((struct thread_input*)input)->thread_index;
+
+ FREE(input);
while (1) {
struct util_queue_job job;
pipe_semaphore_wait(&queue->queued);
- if (queue->kill_thread)
+ if (queue->kill_threads)
break;
pipe_mutex_lock(queue->lock);
@@ -68,7 +77,7 @@ static PIPE_THREAD_ROUTINE(util_queue_thread_func, param)
pipe_semaphore_signal(&queue->has_space);
if (job.job) {
- queue->execute_job(job.job);
+ queue->execute_job(job.job, thread_index);
util_queue_fence_signal(job.fence);
}
}
@@ -88,9 +97,13 @@ static PIPE_THREAD_ROUTINE(util_queue_thread_func, param)
bool
util_queue_init(struct util_queue *queue,
unsigned max_jobs,
- void (*execute_job)(void *))
+ unsigned num_threads,
+ void (*execute_job)(void *, int))
{
+ unsigned i;
+
memset(queue, 0, sizeof(*queue));
+ queue->num_threads = num_threads;
queue->max_jobs = max_jobs;
queue->jobs = (struct util_queue_job*)
@@ -103,13 +116,36 @@ util_queue_init(struct util_queue *queue,
pipe_semaphore_init(&queue->has_space, max_jobs);
pipe_semaphore_init(&queue->queued, 0);
- queue->thread = pipe_thread_create(util_queue_thread_func, queue);
- if (!queue->thread)
+ queue->threads = (pipe_thread*)CALLOC(num_threads, sizeof(pipe_thread));
+ if (!queue->threads)
goto fail;
+ /* start threads */
+ for (i = 0; i < num_threads; i++) {
+ struct thread_input *input = MALLOC_STRUCT(thread_input);
+ input->queue = queue;
+ input->thread_index = i;
+
+ queue->threads[i] = pipe_thread_create(util_queue_thread_func, input);
+
+ if (!queue->threads[i]) {
+ FREE(input);
+
+ if (i == 0) {
+ /* no threads created, fail */
+ goto fail;
+ } else {
+ /* at least one thread created, so use it */
+ queue->num_threads = i+1;
+ break;
+ }
+ }
+ }
return true;
fail:
+ FREE(queue->threads);
+
if (queue->jobs) {
pipe_semaphore_destroy(&queue->has_space);
pipe_semaphore_destroy(&queue->queued);
@@ -124,13 +160,23 @@ fail:
void
util_queue_destroy(struct util_queue *queue)
{
- queue->kill_thread = 1;
- pipe_semaphore_signal(&queue->queued);
- pipe_thread_wait(queue->thread);
+ unsigned i;
+
+ /* Signal all threads to terminate. */
+ pipe_mutex_lock(queue->queued.mutex);
+ queue->kill_threads = 1;
+ queue->queued.counter = queue->num_threads;
+ pipe_condvar_broadcast(queue->queued.cond);
+ pipe_mutex_unlock(queue->queued.mutex);
+
+ for (i = 0; i < queue->num_threads; i++)
+ pipe_thread_wait(queue->threads[i]);
+
pipe_semaphore_destroy(&queue->has_space);
pipe_semaphore_destroy(&queue->queued);
pipe_mutex_destroy(queue->lock);
FREE(queue->jobs);
+ FREE(queue->threads);
}
void
diff --git a/src/gallium/auxiliary/util/u_queue.h b/src/gallium/auxiliary/util/u_queue.h
index acebb51382f..f3aa4f6f5c6 100644
--- a/src/gallium/auxiliary/util/u_queue.h
+++ b/src/gallium/auxiliary/util/u_queue.h
@@ -54,17 +54,19 @@ struct util_queue {
pipe_mutex lock;
pipe_semaphore has_space;
pipe_semaphore queued;
- pipe_thread thread;
- int kill_thread;
+ pipe_thread *threads;
+ unsigned num_threads;
+ int kill_threads;
int max_jobs;
int write_idx, read_idx; /* ring buffer pointers */
struct util_queue_job *jobs;
- void (*execute_job)(void *job);
+ void (*execute_job)(void *job, int thread_index);
};
bool util_queue_init(struct util_queue *queue,
unsigned max_jobs,
- void (*execute_job)(void *));
+ unsigned num_threads,
+ void (*execute_job)(void *, int));
void util_queue_destroy(struct util_queue *queue);
void util_queue_fence_init(struct util_queue_fence *fence);
void util_queue_fence_destroy(struct util_queue_fence *fence);
@@ -78,7 +80,7 @@ void util_queue_job_wait(struct util_queue_fence *fence);
static inline bool
util_queue_is_initialized(struct util_queue *queue)
{
- return queue->thread != 0;
+ return queue->threads != NULL;
}
static inline bool