diff options
author | behlendo <behlendo@7e1ea52c-4ff2-0310-8f11-9dd32ca42a1c> | 2008-04-25 22:10:47 +0000 |
---|---|---|
committer | behlendo <behlendo@7e1ea52c-4ff2-0310-8f11-9dd32ca42a1c> | 2008-04-25 22:10:47 +0000 |
commit | bcd68186d8009957b751720a801e4c16bb272e83 (patch) | |
tree | b5b0e1055f5bad5308e07c73bbf99892ec9188ed | |
parent | 839d8b438e1d877fb4a625eed51f556433cbd6b6 (diff) |
New an improved taskq implementation for the SPL. It allows a
configurable number of threads like the Solaris version and almost
all of the options are supported. Unfortunately, it appears to have
made absolutely no difference to our performance numbers. I need
to keep looking for where we are bottle necking.
git-svn-id: https://outreach.scidac.gov/svn/spl/trunk@93 7e1ea52c-4ff2-0310-8f11-9dd32ca42a1c
-rw-r--r-- | include/sys/condvar.h | 18 | ||||
-rw-r--r-- | include/sys/mutex.h | 20 | ||||
-rw-r--r-- | include/sys/taskq.h | 109 | ||||
-rw-r--r-- | modules/spl/spl-taskq.c | 458 | ||||
-rw-r--r-- | modules/splat/splat-taskq.c | 18 |
5 files changed, 484 insertions, 139 deletions
diff --git a/include/sys/condvar.h b/include/sys/condvar.h index fd845d9c1..2878b68a8 100644 --- a/include/sys/condvar.h +++ b/include/sys/condvar.h @@ -28,6 +28,7 @@ typedef enum { CV_DEFAULT=0, CV_DRIVER } kcv_type_t; static __inline__ void cv_init(kcondvar_t *cvp, char *name, kcv_type_t type, void *arg) { + ENTRY; ASSERT(cvp); ASSERT(type == CV_DEFAULT); ASSERT(arg == NULL); @@ -44,11 +45,14 @@ cv_init(kcondvar_t *cvp, char *name, kcv_type_t type, void *arg) if (cvp->cv_name) strcpy(cvp->cv_name, name); } + + EXIT; } static __inline__ void cv_destroy(kcondvar_t *cvp) { + ENTRY; ASSERT(cvp); ASSERT(cvp->cv_magic == CV_MAGIC); spin_lock(&cvp->cv_lock); @@ -60,12 +64,14 @@ cv_destroy(kcondvar_t *cvp) memset(cvp, CV_POISON, sizeof(*cvp)); spin_unlock(&cvp->cv_lock); + EXIT; } static __inline__ void cv_wait(kcondvar_t *cvp, kmutex_t *mtx) { DEFINE_WAIT(wait); + ENTRY; ASSERT(cvp); ASSERT(mtx); @@ -93,6 +99,7 @@ cv_wait(kcondvar_t *cvp, kmutex_t *mtx) atomic_dec(&cvp->cv_waiters); finish_wait(&cvp->cv_event, &wait); + EXIT; } /* 'expire_time' argument is an absolute wall clock time in jiffies. @@ -103,6 +110,7 @@ cv_timedwait(kcondvar_t *cvp, kmutex_t *mtx, clock_t expire_time) { DEFINE_WAIT(wait); clock_t time_left; + ENTRY; ASSERT(cvp); ASSERT(mtx); @@ -120,7 +128,7 @@ cv_timedwait(kcondvar_t *cvp, kmutex_t *mtx, clock_t expire_time) /* XXX - Does not handle jiffie wrap properly */ time_left = expire_time - jiffies; if (time_left <= 0) - return -1; + RETURN(-1); prepare_to_wait_exclusive(&cvp->cv_event, &wait, TASK_UNINTERRUPTIBLE); @@ -136,12 +144,13 @@ cv_timedwait(kcondvar_t *cvp, kmutex_t *mtx, clock_t expire_time) atomic_dec(&cvp->cv_waiters); finish_wait(&cvp->cv_event, &wait); - return (time_left > 0 ? time_left : -1); + RETURN(time_left > 0 ? time_left : -1); } static __inline__ void cv_signal(kcondvar_t *cvp) { + ENTRY; ASSERT(cvp); ASSERT(cvp->cv_magic == CV_MAGIC); @@ -151,6 +160,8 @@ cv_signal(kcondvar_t *cvp) * the wait queue to ensure we don't race waking up processes. */ if (atomic_read(&cvp->cv_waiters) > 0) wake_up(&cvp->cv_event); + + EXIT; } static __inline__ void @@ -158,10 +169,13 @@ cv_broadcast(kcondvar_t *cvp) { ASSERT(cvp); ASSERT(cvp->cv_magic == CV_MAGIC); + ENTRY; /* Wake_up_all() will wake up all waiters even those which * have the WQ_FLAG_EXCLUSIVE flag set. */ if (atomic_read(&cvp->cv_waiters) > 0) wake_up_all(&cvp->cv_event); + + EXIT; } #endif /* _SPL_CONDVAR_H */ diff --git a/include/sys/mutex.h b/include/sys/mutex.h index d7036b566..045842d72 100644 --- a/include/sys/mutex.h +++ b/include/sys/mutex.h @@ -36,6 +36,7 @@ typedef struct { static __inline__ void mutex_init(kmutex_t *mp, char *name, int type, void *ibc) { + ENTRY; ASSERT(mp); ASSERT(ibc == NULL); /* XXX - Spin mutexes not needed */ ASSERT(type == MUTEX_DEFAULT); /* XXX - Only default type supported */ @@ -51,12 +52,14 @@ mutex_init(kmutex_t *mp, char *name, int type, void *ibc) if (mp->km_name) strcpy(mp->km_name, name); } + EXIT; } #undef mutex_destroy static __inline__ void mutex_destroy(kmutex_t *mp) { + ENTRY; ASSERT(mp); ASSERT(mp->km_magic == KM_MAGIC); spin_lock(&mp->km_lock); @@ -66,11 +69,13 @@ mutex_destroy(kmutex_t *mp) memset(mp, KM_POISON, sizeof(*mp)); spin_unlock(&mp->km_lock); + EXIT; } static __inline__ void mutex_enter(kmutex_t *mp) { + ENTRY; ASSERT(mp); ASSERT(mp->km_magic == KM_MAGIC); spin_lock(&mp->km_lock); @@ -91,6 +96,7 @@ mutex_enter(kmutex_t *mp) ASSERT(mp->km_owner == NULL); mp->km_owner = current; spin_unlock(&mp->km_lock); + EXIT; } /* Return 1 if we acquired the mutex, else zero. */ @@ -98,6 +104,7 @@ static __inline__ int mutex_tryenter(kmutex_t *mp) { int rc; + ENTRY; ASSERT(mp); ASSERT(mp->km_magic == KM_MAGIC); @@ -118,14 +125,16 @@ mutex_tryenter(kmutex_t *mp) ASSERT(mp->km_owner == NULL); mp->km_owner = current; spin_unlock(&mp->km_lock); - return 1; + RETURN(1); } - return 0; + + RETURN(0); } static __inline__ void mutex_exit(kmutex_t *mp) { + ENTRY; ASSERT(mp); ASSERT(mp->km_magic == KM_MAGIC); spin_lock(&mp->km_lock); @@ -134,6 +143,7 @@ mutex_exit(kmutex_t *mp) mp->km_owner = NULL; spin_unlock(&mp->km_lock); up(&mp->km_sem); + EXIT; } /* Return 1 if mutex is held by current process, else zero. */ @@ -141,6 +151,7 @@ static __inline__ int mutex_owned(kmutex_t *mp) { int rc; + ENTRY; ASSERT(mp); ASSERT(mp->km_magic == KM_MAGIC); @@ -148,7 +159,7 @@ mutex_owned(kmutex_t *mp) rc = (mp->km_owner == current); spin_unlock(&mp->km_lock); - return rc; + RETURN(rc); } /* Return owner if mutex is owned, else NULL. */ @@ -156,6 +167,7 @@ static __inline__ kthread_t * mutex_owner(kmutex_t *mp) { kthread_t *thr; + ENTRY; ASSERT(mp); ASSERT(mp->km_magic == KM_MAGIC); @@ -163,7 +175,7 @@ mutex_owner(kmutex_t *mp) thr = mp->km_owner; spin_unlock(&mp->km_lock); - return thr; + RETURN(thr); } #ifdef __cplusplus diff --git a/include/sys/taskq.h b/include/sys/taskq.h index 14ba33280..ae8fbdb7b 100644 --- a/include/sys/taskq.h +++ b/include/sys/taskq.h @@ -5,82 +5,75 @@ extern "C" { #endif -/* - * Task Queues - As of linux 2.6.x task queues have been replaced by a - * similar construct called work queues. The big difference on the linux - * side is that functions called from work queues run in process context - * and not interrupt context. - * - * One nice feature of Solaris which does not exist in linux work - * queues in the notion of a dynamic work queue. Rather than implementing - * this in the shim layer I'm hardcoding one-thread per work queue. - * - * XXX - This may end up being a significant performance penalty which - * forces us to implement dynamic workqueues. Which is all very doable - * with a little effort. - */ #include <linux/module.h> -#include <linux/workqueue.h> #include <linux/gfp.h> #include <linux/slab.h> #include <linux/interrupt.h> +#include <linux/kthread.h> #include <sys/types.h> +#include <sys/kmem.h> -#undef DEBUG_TASKQ_UNIMPLEMENTED +#define TASKQ_NAMELEN 31 -#define TASKQ_NAMELEN 31 -#define taskq_t workq_t +#define TASKQ_PREPOPULATE 0x00000001 +#define TASKQ_CPR_SAFE 0x00000002 +#define TASKQ_DYNAMIC 0x00000004 -typedef struct workqueue_struct workq_t; typedef unsigned long taskqid_t; -typedef void (*task_func_t)(void *); - -/* - * Public flags for taskq_create(): bit range 0-15 - */ -#define TASKQ_PREPOPULATE 0x0000 /* XXX - Workqueues fully populate */ -#define TASKQ_CPR_SAFE 0x0000 /* XXX - No analog */ -#define TASKQ_DYNAMIC 0x0000 /* XXX - Worksqueues not dynamic */ +typedef void (task_func_t)(void *); /* * Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as - * KM_SLEEP/KM_NOSLEEP. + * KM_SLEEP/KM_NOSLEEP. TQ_NOQUEUE/TQ_NOALLOC are set particularly + * large so as not to conflict with already used GFP_* defines. */ -#define TQ_SLEEP 0x00 /* XXX - Workqueues don't support */ -#define TQ_NOSLEEP 0x00 /* these sorts of flags. They */ -#define TQ_NOQUEUE 0x00 /* always run in application */ -#define TQ_NOALLOC 0x00 /* context and can sleep. */ - - -#ifdef DEBUG_TASKQ_UNIMPLEMENTED -static __inline__ void taskq_init(void) { -#error "taskq_init() not implemented" -} - -static __inline__ taskq_t * -taskq_create_instance(const char *, int, int, pri_t, int, int, uint_t) { -#error "taskq_create_instance() not implemented" -} - -extern void nulltask(void *); -extern void taskq_suspend(taskq_t *); -extern int taskq_suspended(taskq_t *); -extern void taskq_resume(taskq_t *); - -#endif /* DEBUG_TASKQ_UNIMPLEMENTED */ +#define TQ_SLEEP KM_SLEEP +#define TQ_NOSLEEP KM_NOSLEEP +#define TQ_NOQUEUE 0x01000000 +#define TQ_NOALLOC 0x02000000 +#define TQ_NEW 0x04000000 +#define TQ_ACTIVE 0x80000000 + +typedef struct task { + spinlock_t t_lock; + struct list_head t_list; + taskqid_t t_id; + task_func_t *t_func; + void *t_arg; +} task_t; + +typedef struct taskq { + spinlock_t tq_lock; /* protects taskq_t */ + struct task_struct **tq_threads; /* thread pointers */ + const char *tq_name; /* taskq name */ + int tq_nactive; /* # of active threads */ + int tq_nthreads; /* # of total threads */ + int tq_pri; /* priority */ + int tq_minalloc; /* min task_t pool size */ + int tq_maxalloc; /* max task_t pool size */ + int tq_nalloc; /* cur task_t pool size */ + uint_t tq_flags; /* flags */ + taskqid_t tq_next_id; /* next pend/work id */ + taskqid_t tq_lowest_id; /* lowest pend/work id */ + struct list_head tq_free_list; /* free task_t's */ + struct list_head tq_work_list; /* work task_t's */ + struct list_head tq_pend_list; /* pending task_t's */ + wait_queue_head_t tq_work_waitq; /* new work waitq */ + wait_queue_head_t tq_wait_waitq; /* wait waitq */ +} taskq_t; extern taskqid_t __taskq_dispatch(taskq_t *, task_func_t, void *, uint_t); extern taskq_t *__taskq_create(const char *, int, pri_t, int, int, uint_t); extern void __taskq_destroy(taskq_t *); extern void __taskq_wait(taskq_t *); - -#define taskq_create(name, thr, pri, min, max, flags) \ - __taskq_create(name, thr, pri, min, max, flags) -#define taskq_dispatch(tq, func, priv, flags) \ - __taskq_dispatch(tq, (task_func_t)func, priv, flags) -#define taskq_destroy(tq) __taskq_destroy(tq) -#define taskq_wait(tq) __taskq_wait(tq) -#define taskq_member(tq, kthr) 1 /* XXX -Just be true */ +extern int __taskq_member(taskq_t *, void *); + +#define taskq_member(tq, t) __taskq_member(tq, t) +#define taskq_wait_id(tq, id) __taskq_wait_id(tq, id) +#define taskq_wait(tq) __taskq_wait(tq) +#define taskq_dispatch(tq, f, p, fl) __taskq_dispatch(tq, f, p, fl) +#define taskq_create(n, th, p, mi, ma, fl) __taskq_create(n, th, p, mi, ma, fl) +#define taskq_destroy(tq) __taskq_destroy(tq) #ifdef __cplusplus } diff --git a/modules/spl/spl-taskq.c b/modules/spl/spl-taskq.c index d26b40db8..ad9be695b 100644 --- a/modules/spl/spl-taskq.c +++ b/modules/spl/spl-taskq.c @@ -6,109 +6,427 @@ #define DEBUG_SUBSYSTEM S_TASKQ -/* - * Task queue interface - * - * The taskq_work_wrapper functions are used to manage the work_structs - * which must be submitted to linux. The shim layer allocates a wrapper - * structure for all items which contains a pointer to itself as well as - * the real work to be performed. When the work item run the generic - * handle is called which calls the real work function and then using - * the self pointer frees the work_struct. +/* NOTE: Must be called with tq->tq_lock held, returns a list_t which + * is not attached to the free, work, or pending taskq lists. */ -typedef struct taskq_work_wrapper { - struct work_struct tww_work; - task_func_t tww_func; - void * tww_priv; -} taskq_work_wrapper_t; +static task_t * +task_alloc(taskq_t *tq, uint_t flags) +{ + task_t *t; + int count = 0; + ENTRY; + + ASSERT(tq); + ASSERT(flags & (TQ_SLEEP | TQ_NOSLEEP)); /* One set */ + ASSERT(!((flags & TQ_SLEEP) && (flags & TQ_NOSLEEP))); /* Not both */ + ASSERT(spin_is_locked(&tq->tq_lock)); +retry: + /* Aquire task_t's from free list if available */ + if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) { + t = list_entry(tq->tq_free_list.next, task_t, t_list); + list_del_init(&t->t_list); + RETURN(t); + } + + /* Free list is empty and memory allocs are prohibited */ + if (flags & TQ_NOALLOC) + RETURN(NULL); + + /* Hit maximum task_t pool size */ + if (tq->tq_nalloc >= tq->tq_maxalloc) { + if (flags & TQ_NOSLEEP) + RETURN(NULL); + + /* Sleep periodically polling the free list for an available + * task_t. If a full second passes and we have not found + * one gives up and return a NULL to the caller. */ + if (flags & TQ_SLEEP) { + spin_unlock_irq(&tq->tq_lock); + schedule_timeout(HZ / 100); + spin_lock_irq(&tq->tq_lock); + if (count < 100) + GOTO(retry, count++); + + RETURN(NULL); + } + + /* Unreachable, TQ_SLEEP xor TQ_NOSLEEP */ + SBUG(); + } + + spin_unlock_irq(&tq->tq_lock); + t = kmem_alloc(sizeof(task_t), flags & (TQ_SLEEP | TQ_NOSLEEP)); + spin_lock_irq(&tq->tq_lock); + + if (t) { + spin_lock_init(&t->t_lock); + INIT_LIST_HEAD(&t->t_list); + t->t_id = 0; + t->t_func = NULL; + t->t_arg = NULL; + tq->tq_nalloc++; + } + + RETURN(t); +} + +/* NOTE: Must be called with tq->tq_lock held, expectes the task_t + * to already be removed from the free, work, or pending taskq lists. + */ +static void +task_free(taskq_t *tq, task_t *t) +{ + ENTRY; + + ASSERT(tq); + ASSERT(t); + ASSERT(spin_is_locked(&tq->tq_lock)); + ASSERT(list_empty(&t->t_list)); + + kmem_free(t, sizeof(task_t)); + tq->tq_nalloc--; + EXIT; +} + +/* NOTE: Must be called with tq->tq_lock held, either destroyes the + * task_t if too many exist or moves it to the free list for later use. + */ static void -taskq_work_handler(void *priv) +task_done(taskq_t *tq, task_t *t) { - taskq_work_wrapper_t *tww = priv; + ENTRY; + ASSERT(tq); + ASSERT(t); + ASSERT(spin_is_locked(&tq->tq_lock)); + + list_del_init(&t->t_list); - ASSERT(tww); - ASSERT(tww->tww_func); + if (tq->tq_nalloc <= tq->tq_minalloc) { + t->t_id = 0; + t->t_func = NULL; + t->t_arg = NULL; + list_add(&t->t_list, &tq->tq_free_list); + } else { + task_free(tq, t); + } - /* Call the real function and free the wrapper */ - tww->tww_func(tww->tww_priv); - kfree(tww); + EXIT; } -/* XXX - All flags currently ignored */ -taskqid_t -__taskq_dispatch(taskq_t *tq, task_func_t func, void *priv, uint_t flags) +/* Taskqid's are handed out in a monotonically increasing fashion per + * taskq_t. We don't handle taskqid wrapping yet, but fortuntely it isi + * a 64-bit value so this is probably never going to happen. The lowest + * pending taskqid is stored in the taskq_t to make it easy for any + * taskq_wait()'ers to know if the tasks they're waiting for have + * completed. Unfortunately, tq_task_lowest is kept up to date is + * a pretty brain dead way, something more clever should be done. + */ +static int +taskq_wait_check(taskq_t *tq, taskqid_t id) +{ + RETURN(tq->tq_lowest_id >= id); +} + +/* Expected to wait for all previously scheduled tasks to complete. We do + * not need to wait for tasked scheduled after this call to complete. In + * otherwords we do not need to drain the entire taskq. */ +void +__taskq_wait_id(taskq_t *tq, taskqid_t id) { - struct workqueue_struct *wq = tq; - taskq_work_wrapper_t *tww; - int rc; ENTRY; + ASSERT(tq); + + wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id)); + + EXIT; +} +EXPORT_SYMBOL(__taskq_wait_id); + +void +__taskq_wait(taskq_t *tq) +{ + taskqid_t id; + ENTRY; + ASSERT(tq); + + spin_lock_irq(&tq->tq_lock); + id = tq->tq_next_id; + spin_unlock_irq(&tq->tq_lock); + + __taskq_wait_id(tq, id); + + EXIT; + +} +EXPORT_SYMBOL(__taskq_wait); + +int +__taskq_member(taskq_t *tq, void *t) +{ + int i; + ENTRY; + + ASSERT(tq); + ASSERT(t); + + for (i = 0; i < tq->tq_nthreads; i++) + if (tq->tq_threads[i] == (struct task_struct *)t) + RETURN(1); + + RETURN(0); +} +EXPORT_SYMBOL(__taskq_member); + +taskqid_t +__taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) +{ + task_t *t; + taskqid_t rc = 0; + ENTRY; ASSERT(tq); ASSERT(func); + if (unlikely(in_atomic() && (flags & TQ_SLEEP))) { + CERROR("May schedule while atomic: %s/0x%08x/%d\n", + current->comm, preempt_count(), current->pid); + SBUG(); + } - /* Use GFP_ATOMIC since this may be called in interrupt context */ - tww = (taskq_work_wrapper_t *)kmalloc(sizeof(*tww), GFP_ATOMIC); - if (!tww) - RETURN((taskqid_t)0); + spin_lock_irq(&tq->tq_lock); - INIT_WORK(&(tww->tww_work), taskq_work_handler, tww); - tww->tww_func = func; - tww->tww_priv = priv; + /* Taskq being destroyed and all tasks drained */ + if (!(tq->tq_flags & TQ_ACTIVE)) + GOTO(out, rc = 0); - rc = queue_work(wq, &(tww->tww_work)); - if (!rc) { - kfree(tww); - RETURN((taskqid_t)0); - } + /* Do not queue the task unless there is idle thread for it */ + ASSERT(tq->tq_nactive <= tq->tq_nthreads); + if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) + GOTO(out, rc = 0); + + if ((t = task_alloc(tq, flags)) == NULL) + GOTO(out, rc = 0); - RETURN((taskqid_t)wq); + + spin_lock(&t->t_lock); + list_add(&t->t_list, &tq->tq_pend_list); + t->t_id = rc = tq->tq_next_id; + tq->tq_next_id++; + t->t_func = func; + t->t_arg = arg; + spin_unlock(&t->t_lock); + + wake_up(&tq->tq_work_waitq); +out: + spin_unlock_irq(&tq->tq_lock); + RETURN(rc); } EXPORT_SYMBOL(__taskq_dispatch); -/* XXX - We must fully implement dynamic workqueues since they make a - * significant impact in terms of performance. For now I've made - * a trivial compromise. If you ask for one thread you get one - * thread, if you ask for more than that you get one per core. - * It's unclear if you ever really need/want more than one per-core - * anyway. More analysis is required. - * - * name - Workqueue names are limited to 10 chars - * pri - Ignore priority - * min - Ignored until this is a dynamic thread pool - * max - Ignored until this is a dynamic thread pool - * flags - Ignored until this is a dynamic thread_pool - */ +/* NOTE: Must be called with tq->tq_lock held */ +static taskqid_t +taskq_lowest_id(taskq_t *tq) +{ + taskqid_t lowest_id = ~0; + task_t *t; + ENTRY; + + ASSERT(tq); + ASSERT(spin_is_locked(&tq->tq_lock)); + + list_for_each_entry(t, &tq->tq_pend_list, t_list) + if (t->t_id < lowest_id) + lowest_id = t->t_id; + + list_for_each_entry(t, &tq->tq_work_list, t_list) + if (t->t_id < lowest_id) + lowest_id = t->t_id; + + RETURN(lowest_id); +} + +static int +taskq_thread(void *args) +{ + DECLARE_WAITQUEUE(wait, current); + sigset_t blocked; + taskqid_t id; + taskq_t *tq = args; + task_t *t; + ENTRY; + + ASSERT(tq); + current->flags |= PF_NOFREEZE; + + sigfillset(&blocked); + sigprocmask(SIG_BLOCK, &blocked, NULL); + flush_signals(current); + + spin_lock_irq(&tq->tq_lock); + tq->tq_nthreads++; + wake_up(&tq->tq_wait_waitq); + set_current_state(TASK_INTERRUPTIBLE); + + while (!kthread_should_stop()) { + + add_wait_queue(&tq->tq_work_waitq, &wait); + if (list_empty(&tq->tq_pend_list)) { + spin_unlock_irq(&tq->tq_lock); + schedule(); + spin_lock_irq(&tq->tq_lock); + } else { + __set_current_state(TASK_RUNNING); + } + + remove_wait_queue(&tq->tq_work_waitq, &wait); + if (!list_empty(&tq->tq_pend_list)) { + t = list_entry(tq->tq_pend_list.next, task_t, t_list); + list_del_init(&t->t_list); + list_add(&t->t_list, &tq->tq_work_list); + tq->tq_nactive++; + spin_unlock_irq(&tq->tq_lock); + + /* Perform the requested task */ + t->t_func(t->t_arg); + + spin_lock_irq(&tq->tq_lock); + tq->tq_nactive--; + id = t->t_id; + task_done(tq, t); + + /* Update the lowest remaining taskqid yet to run */ + if (tq->tq_lowest_id == id) { + tq->tq_lowest_id = taskq_lowest_id(tq); + ASSERT(tq->tq_lowest_id > id); + } + + wake_up_all(&tq->tq_wait_waitq); + } + + set_current_state(TASK_INTERRUPTIBLE); + + } + + __set_current_state(TASK_RUNNING); + tq->tq_nthreads--; + spin_unlock_irq(&tq->tq_lock); + + RETURN(0); +} + taskq_t * __taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, int maxalloc, uint_t flags) { - taskq_t *tq; - ENTRY; + taskq_t *tq; + struct task_struct *t; + int rc = 0, i, j = 0; + ENTRY; + + ASSERT(name != NULL); + ASSERT(pri <= maxclsyspri); + ASSERT(minalloc >= 0); + ASSERT(maxalloc <= INT_MAX); + ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */ + + tq = kmem_alloc(sizeof(*tq), KM_SLEEP); + if (tq == NULL) + RETURN(NULL); + + tq->tq_threads = kmem_alloc(nthreads * sizeof(t), KM_SLEEP); + if (tq->tq_threads == NULL) { + kmem_free(tq, sizeof(*tq)); + RETURN(NULL); + } + + spin_lock_init(&tq->tq_lock); + spin_lock_irq(&tq->tq_lock); + tq->tq_name = name; + tq->tq_nactive = 0; + tq->tq_nthreads = 0; + tq->tq_pri = pri; + tq->tq_minalloc = minalloc; + tq->tq_maxalloc = maxalloc; + tq->tq_nalloc = 0; + tq->tq_flags = (flags | TQ_ACTIVE); + tq->tq_next_id = 1; + tq->tq_lowest_id = 1; + INIT_LIST_HEAD(&tq->tq_free_list); + INIT_LIST_HEAD(&tq->tq_work_list); + INIT_LIST_HEAD(&tq->tq_pend_list); + init_waitqueue_head(&tq->tq_work_waitq); + init_waitqueue_head(&tq->tq_wait_waitq); + + if (flags & TASKQ_PREPOPULATE) + for (i = 0; i < minalloc; i++) + task_done(tq, task_alloc(tq, TQ_SLEEP | TQ_NEW)); - if (nthreads == 1) - tq = create_singlethread_workqueue(name); - else - tq = create_workqueue(name); + spin_unlock_irq(&tq->tq_lock); - return tq; + for (i = 0; i < nthreads; i++) { + t = kthread_create(taskq_thread, tq, "%s/%d", name, i); + if (t) { + tq->tq_threads[i] = t; + kthread_bind(t, i % num_online_cpus()); + set_user_nice(t, PRIO_TO_NICE(pri)); + wake_up_process(t); + j++; + } else { + tq->tq_threads[i] = NULL; + rc = 1; + } + } + + /* Wait for all threads to be started before potential destroy */ + wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j); + + if (rc) { + __taskq_destroy(tq); + tq = NULL; + } + + RETURN(tq); } EXPORT_SYMBOL(__taskq_create); void __taskq_destroy(taskq_t *tq) { + task_t *t; + int i, nthreads; ENTRY; - destroy_workqueue(tq); - EXIT; -} -EXPORT_SYMBOL(__taskq_destroy); -void -__taskq_wait(taskq_t *tq) -{ - ENTRY; - flush_workqueue(tq); + ASSERT(tq); + spin_lock_irq(&tq->tq_lock); + tq->tq_flags &= ~TQ_ACTIVE; + spin_unlock_irq(&tq->tq_lock); + + /* TQ_ACTIVE cleared prevents new tasks being added to pending */ + __taskq_wait(tq); + + nthreads = tq->tq_nthreads; + for (i = 0; i < nthreads; i++) + if (tq->tq_threads[i]) + kthread_stop(tq->tq_threads[i]); + + spin_lock_irq(&tq->tq_lock); + + while (!list_empty(&tq->tq_free_list)) { + t = list_entry(tq->tq_free_list.next, task_t, t_list); + list_del_init(&t->t_list); + task_free(tq, t); + } + + ASSERT(tq->tq_nthreads == 0); + ASSERT(tq->tq_nalloc == 0); + ASSERT(list_empty(&tq->tq_free_list)); + ASSERT(list_empty(&tq->tq_work_list)); + ASSERT(list_empty(&tq->tq_pend_list)); + + spin_unlock_irq(&tq->tq_lock); + kmem_free(tq->tq_threads, nthreads * sizeof(task_t *)); + kmem_free(tq, sizeof(taskq_t)); + EXIT; } -EXPORT_SYMBOL(__taskq_wait); +EXPORT_SYMBOL(__taskq_destroy); diff --git a/modules/splat/splat-taskq.c b/modules/splat/splat-taskq.c index 6effc4cab..b4d07c95d 100644 --- a/modules/splat/splat-taskq.c +++ b/modules/splat/splat-taskq.c @@ -43,7 +43,8 @@ splat_taskq_test1(struct file *file, void *arg) splat_vprint(file, SPLAT_TASKQ_TEST1_NAME, "Taskq '%s' creating\n", SPLAT_TASKQ_TEST1_NAME); - if ((tq = taskq_create(SPLAT_TASKQ_TEST1_NAME, 1, 0, 0, 0, 0)) == NULL) { + if ((tq = taskq_create(SPLAT_TASKQ_TEST1_NAME, 1, maxclsyspri, + 50, INT_MAX, TASKQ_PREPOPULATE)) == NULL) { splat_vprint(file, SPLAT_TASKQ_TEST1_NAME, "Taskq '%s' create failed\n", SPLAT_TASKQ_TEST1_NAME); @@ -58,7 +59,8 @@ splat_taskq_test1(struct file *file, void *arg) splat_vprint(file, SPLAT_TASKQ_TEST1_NAME, "Taskq '%s' function '%s' dispatching\n", tq_arg.name, sym2str(splat_taskq_test1_func)); - if ((id = taskq_dispatch(tq, splat_taskq_test1_func, &tq_arg, 0)) == 0) { + if ((id = taskq_dispatch(tq, splat_taskq_test1_func, + &tq_arg, TQ_SLEEP)) == 0) { splat_vprint(file, SPLAT_TASKQ_TEST1_NAME, "Taskq '%s' function '%s' dispatch failed\n", tq_arg.name, sym2str(splat_taskq_test1_func)); @@ -109,6 +111,8 @@ splat_taskq_test2_func2(void *arg) } #define TEST2_TASKQS 8 +#define TEST2_THREADS_PER_TASKQ 4 + static int splat_taskq_test2(struct file *file, void *arg) { taskq_t *tq[TEST2_TASKQS] = { NULL }; @@ -121,7 +125,9 @@ splat_taskq_test2(struct file *file, void *arg) { splat_vprint(file, SPLAT_TASKQ_TEST2_NAME, "Taskq '%s/%d' " "creating\n", SPLAT_TASKQ_TEST2_NAME, i); if ((tq[i] = taskq_create(SPLAT_TASKQ_TEST2_NAME, - 1, 0, 0, 0, 0)) == NULL) { + TEST2_THREADS_PER_TASKQ, + maxclsyspri, 50, INT_MAX, + TASKQ_PREPOPULATE)) == NULL) { splat_vprint(file, SPLAT_TASKQ_TEST2_NAME, "Taskq '%s/%d' create failed\n", SPLAT_TASKQ_TEST2_NAME, i); @@ -139,7 +145,8 @@ splat_taskq_test2(struct file *file, void *arg) { tq_args[i].name, tq_args[i].id, sym2str(splat_taskq_test2_func1)); if ((id = taskq_dispatch( - tq[i], splat_taskq_test2_func1, &tq_args[i], 0)) == 0) { + tq[i], splat_taskq_test2_func1, + &tq_args[i], TQ_SLEEP)) == 0) { splat_vprint(file, SPLAT_TASKQ_TEST2_NAME, "Taskq '%s/%d' function '%s' dispatch " "failed\n", tq_args[i].name, tq_args[i].id, @@ -153,7 +160,8 @@ splat_taskq_test2(struct file *file, void *arg) { tq_args[i].name, tq_args[i].id, sym2str(splat_taskq_test2_func2)); if ((id = taskq_dispatch( - tq[i], splat_taskq_test2_func2, &tq_args[i], 0)) == 0) { + tq[i], splat_taskq_test2_func2, + &tq_args[i], TQ_SLEEP)) == 0) { splat_vprint(file, SPLAT_TASKQ_TEST2_NAME, "Taskq '%s/%d' function '%s' dispatch failed\n", tq_args[i].name, tq_args[i].id, |