From bcd68186d8009957b751720a801e4c16bb272e83 Mon Sep 17 00:00:00 2001 From: behlendo Date: Fri, 25 Apr 2008 22:10:47 +0000 Subject: New an improved taskq implementation for the SPL. It allows a configurable number of threads like the Solaris version and almost all of the options are supported. Unfortunately, it appears to have made absolutely no difference to our performance numbers. I need to keep looking for where we are bottle necking. git-svn-id: https://outreach.scidac.gov/svn/spl/trunk@93 7e1ea52c-4ff2-0310-8f11-9dd32ca42a1c --- modules/spl/spl-taskq.c | 458 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 388 insertions(+), 70 deletions(-) (limited to 'modules/spl') diff --git a/modules/spl/spl-taskq.c b/modules/spl/spl-taskq.c index d26b40db8..ad9be695b 100644 --- a/modules/spl/spl-taskq.c +++ b/modules/spl/spl-taskq.c @@ -6,109 +6,427 @@ #define DEBUG_SUBSYSTEM S_TASKQ -/* - * Task queue interface - * - * The taskq_work_wrapper functions are used to manage the work_structs - * which must be submitted to linux. The shim layer allocates a wrapper - * structure for all items which contains a pointer to itself as well as - * the real work to be performed. When the work item run the generic - * handle is called which calls the real work function and then using - * the self pointer frees the work_struct. +/* NOTE: Must be called with tq->tq_lock held, returns a list_t which + * is not attached to the free, work, or pending taskq lists. */ -typedef struct taskq_work_wrapper { - struct work_struct tww_work; - task_func_t tww_func; - void * tww_priv; -} taskq_work_wrapper_t; +static task_t * +task_alloc(taskq_t *tq, uint_t flags) +{ + task_t *t; + int count = 0; + ENTRY; + + ASSERT(tq); + ASSERT(flags & (TQ_SLEEP | TQ_NOSLEEP)); /* One set */ + ASSERT(!((flags & TQ_SLEEP) && (flags & TQ_NOSLEEP))); /* Not both */ + ASSERT(spin_is_locked(&tq->tq_lock)); +retry: + /* Aquire task_t's from free list if available */ + if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) { + t = list_entry(tq->tq_free_list.next, task_t, t_list); + list_del_init(&t->t_list); + RETURN(t); + } + + /* Free list is empty and memory allocs are prohibited */ + if (flags & TQ_NOALLOC) + RETURN(NULL); + + /* Hit maximum task_t pool size */ + if (tq->tq_nalloc >= tq->tq_maxalloc) { + if (flags & TQ_NOSLEEP) + RETURN(NULL); + + /* Sleep periodically polling the free list for an available + * task_t. If a full second passes and we have not found + * one gives up and return a NULL to the caller. */ + if (flags & TQ_SLEEP) { + spin_unlock_irq(&tq->tq_lock); + schedule_timeout(HZ / 100); + spin_lock_irq(&tq->tq_lock); + if (count < 100) + GOTO(retry, count++); + + RETURN(NULL); + } + + /* Unreachable, TQ_SLEEP xor TQ_NOSLEEP */ + SBUG(); + } + + spin_unlock_irq(&tq->tq_lock); + t = kmem_alloc(sizeof(task_t), flags & (TQ_SLEEP | TQ_NOSLEEP)); + spin_lock_irq(&tq->tq_lock); + + if (t) { + spin_lock_init(&t->t_lock); + INIT_LIST_HEAD(&t->t_list); + t->t_id = 0; + t->t_func = NULL; + t->t_arg = NULL; + tq->tq_nalloc++; + } + + RETURN(t); +} + +/* NOTE: Must be called with tq->tq_lock held, expectes the task_t + * to already be removed from the free, work, or pending taskq lists. + */ +static void +task_free(taskq_t *tq, task_t *t) +{ + ENTRY; + + ASSERT(tq); + ASSERT(t); + ASSERT(spin_is_locked(&tq->tq_lock)); + ASSERT(list_empty(&t->t_list)); + + kmem_free(t, sizeof(task_t)); + tq->tq_nalloc--; + EXIT; +} + +/* NOTE: Must be called with tq->tq_lock held, either destroyes the + * task_t if too many exist or moves it to the free list for later use. + */ static void -taskq_work_handler(void *priv) +task_done(taskq_t *tq, task_t *t) { - taskq_work_wrapper_t *tww = priv; + ENTRY; + ASSERT(tq); + ASSERT(t); + ASSERT(spin_is_locked(&tq->tq_lock)); + + list_del_init(&t->t_list); - ASSERT(tww); - ASSERT(tww->tww_func); + if (tq->tq_nalloc <= tq->tq_minalloc) { + t->t_id = 0; + t->t_func = NULL; + t->t_arg = NULL; + list_add(&t->t_list, &tq->tq_free_list); + } else { + task_free(tq, t); + } - /* Call the real function and free the wrapper */ - tww->tww_func(tww->tww_priv); - kfree(tww); + EXIT; } -/* XXX - All flags currently ignored */ -taskqid_t -__taskq_dispatch(taskq_t *tq, task_func_t func, void *priv, uint_t flags) +/* Taskqid's are handed out in a monotonically increasing fashion per + * taskq_t. We don't handle taskqid wrapping yet, but fortuntely it isi + * a 64-bit value so this is probably never going to happen. The lowest + * pending taskqid is stored in the taskq_t to make it easy for any + * taskq_wait()'ers to know if the tasks they're waiting for have + * completed. Unfortunately, tq_task_lowest is kept up to date is + * a pretty brain dead way, something more clever should be done. + */ +static int +taskq_wait_check(taskq_t *tq, taskqid_t id) +{ + RETURN(tq->tq_lowest_id >= id); +} + +/* Expected to wait for all previously scheduled tasks to complete. We do + * not need to wait for tasked scheduled after this call to complete. In + * otherwords we do not need to drain the entire taskq. */ +void +__taskq_wait_id(taskq_t *tq, taskqid_t id) { - struct workqueue_struct *wq = tq; - taskq_work_wrapper_t *tww; - int rc; ENTRY; + ASSERT(tq); + + wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id)); + + EXIT; +} +EXPORT_SYMBOL(__taskq_wait_id); + +void +__taskq_wait(taskq_t *tq) +{ + taskqid_t id; + ENTRY; + ASSERT(tq); + + spin_lock_irq(&tq->tq_lock); + id = tq->tq_next_id; + spin_unlock_irq(&tq->tq_lock); + + __taskq_wait_id(tq, id); + + EXIT; + +} +EXPORT_SYMBOL(__taskq_wait); + +int +__taskq_member(taskq_t *tq, void *t) +{ + int i; + ENTRY; + + ASSERT(tq); + ASSERT(t); + + for (i = 0; i < tq->tq_nthreads; i++) + if (tq->tq_threads[i] == (struct task_struct *)t) + RETURN(1); + + RETURN(0); +} +EXPORT_SYMBOL(__taskq_member); + +taskqid_t +__taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) +{ + task_t *t; + taskqid_t rc = 0; + ENTRY; ASSERT(tq); ASSERT(func); + if (unlikely(in_atomic() && (flags & TQ_SLEEP))) { + CERROR("May schedule while atomic: %s/0x%08x/%d\n", + current->comm, preempt_count(), current->pid); + SBUG(); + } - /* Use GFP_ATOMIC since this may be called in interrupt context */ - tww = (taskq_work_wrapper_t *)kmalloc(sizeof(*tww), GFP_ATOMIC); - if (!tww) - RETURN((taskqid_t)0); + spin_lock_irq(&tq->tq_lock); - INIT_WORK(&(tww->tww_work), taskq_work_handler, tww); - tww->tww_func = func; - tww->tww_priv = priv; + /* Taskq being destroyed and all tasks drained */ + if (!(tq->tq_flags & TQ_ACTIVE)) + GOTO(out, rc = 0); - rc = queue_work(wq, &(tww->tww_work)); - if (!rc) { - kfree(tww); - RETURN((taskqid_t)0); - } + /* Do not queue the task unless there is idle thread for it */ + ASSERT(tq->tq_nactive <= tq->tq_nthreads); + if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) + GOTO(out, rc = 0); + + if ((t = task_alloc(tq, flags)) == NULL) + GOTO(out, rc = 0); - RETURN((taskqid_t)wq); + + spin_lock(&t->t_lock); + list_add(&t->t_list, &tq->tq_pend_list); + t->t_id = rc = tq->tq_next_id; + tq->tq_next_id++; + t->t_func = func; + t->t_arg = arg; + spin_unlock(&t->t_lock); + + wake_up(&tq->tq_work_waitq); +out: + spin_unlock_irq(&tq->tq_lock); + RETURN(rc); } EXPORT_SYMBOL(__taskq_dispatch); -/* XXX - We must fully implement dynamic workqueues since they make a - * significant impact in terms of performance. For now I've made - * a trivial compromise. If you ask for one thread you get one - * thread, if you ask for more than that you get one per core. - * It's unclear if you ever really need/want more than one per-core - * anyway. More analysis is required. - * - * name - Workqueue names are limited to 10 chars - * pri - Ignore priority - * min - Ignored until this is a dynamic thread pool - * max - Ignored until this is a dynamic thread pool - * flags - Ignored until this is a dynamic thread_pool - */ +/* NOTE: Must be called with tq->tq_lock held */ +static taskqid_t +taskq_lowest_id(taskq_t *tq) +{ + taskqid_t lowest_id = ~0; + task_t *t; + ENTRY; + + ASSERT(tq); + ASSERT(spin_is_locked(&tq->tq_lock)); + + list_for_each_entry(t, &tq->tq_pend_list, t_list) + if (t->t_id < lowest_id) + lowest_id = t->t_id; + + list_for_each_entry(t, &tq->tq_work_list, t_list) + if (t->t_id < lowest_id) + lowest_id = t->t_id; + + RETURN(lowest_id); +} + +static int +taskq_thread(void *args) +{ + DECLARE_WAITQUEUE(wait, current); + sigset_t blocked; + taskqid_t id; + taskq_t *tq = args; + task_t *t; + ENTRY; + + ASSERT(tq); + current->flags |= PF_NOFREEZE; + + sigfillset(&blocked); + sigprocmask(SIG_BLOCK, &blocked, NULL); + flush_signals(current); + + spin_lock_irq(&tq->tq_lock); + tq->tq_nthreads++; + wake_up(&tq->tq_wait_waitq); + set_current_state(TASK_INTERRUPTIBLE); + + while (!kthread_should_stop()) { + + add_wait_queue(&tq->tq_work_waitq, &wait); + if (list_empty(&tq->tq_pend_list)) { + spin_unlock_irq(&tq->tq_lock); + schedule(); + spin_lock_irq(&tq->tq_lock); + } else { + __set_current_state(TASK_RUNNING); + } + + remove_wait_queue(&tq->tq_work_waitq, &wait); + if (!list_empty(&tq->tq_pend_list)) { + t = list_entry(tq->tq_pend_list.next, task_t, t_list); + list_del_init(&t->t_list); + list_add(&t->t_list, &tq->tq_work_list); + tq->tq_nactive++; + spin_unlock_irq(&tq->tq_lock); + + /* Perform the requested task */ + t->t_func(t->t_arg); + + spin_lock_irq(&tq->tq_lock); + tq->tq_nactive--; + id = t->t_id; + task_done(tq, t); + + /* Update the lowest remaining taskqid yet to run */ + if (tq->tq_lowest_id == id) { + tq->tq_lowest_id = taskq_lowest_id(tq); + ASSERT(tq->tq_lowest_id > id); + } + + wake_up_all(&tq->tq_wait_waitq); + } + + set_current_state(TASK_INTERRUPTIBLE); + + } + + __set_current_state(TASK_RUNNING); + tq->tq_nthreads--; + spin_unlock_irq(&tq->tq_lock); + + RETURN(0); +} + taskq_t * __taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, int maxalloc, uint_t flags) { - taskq_t *tq; - ENTRY; + taskq_t *tq; + struct task_struct *t; + int rc = 0, i, j = 0; + ENTRY; + + ASSERT(name != NULL); + ASSERT(pri <= maxclsyspri); + ASSERT(minalloc >= 0); + ASSERT(maxalloc <= INT_MAX); + ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */ + + tq = kmem_alloc(sizeof(*tq), KM_SLEEP); + if (tq == NULL) + RETURN(NULL); + + tq->tq_threads = kmem_alloc(nthreads * sizeof(t), KM_SLEEP); + if (tq->tq_threads == NULL) { + kmem_free(tq, sizeof(*tq)); + RETURN(NULL); + } + + spin_lock_init(&tq->tq_lock); + spin_lock_irq(&tq->tq_lock); + tq->tq_name = name; + tq->tq_nactive = 0; + tq->tq_nthreads = 0; + tq->tq_pri = pri; + tq->tq_minalloc = minalloc; + tq->tq_maxalloc = maxalloc; + tq->tq_nalloc = 0; + tq->tq_flags = (flags | TQ_ACTIVE); + tq->tq_next_id = 1; + tq->tq_lowest_id = 1; + INIT_LIST_HEAD(&tq->tq_free_list); + INIT_LIST_HEAD(&tq->tq_work_list); + INIT_LIST_HEAD(&tq->tq_pend_list); + init_waitqueue_head(&tq->tq_work_waitq); + init_waitqueue_head(&tq->tq_wait_waitq); + + if (flags & TASKQ_PREPOPULATE) + for (i = 0; i < minalloc; i++) + task_done(tq, task_alloc(tq, TQ_SLEEP | TQ_NEW)); - if (nthreads == 1) - tq = create_singlethread_workqueue(name); - else - tq = create_workqueue(name); + spin_unlock_irq(&tq->tq_lock); - return tq; + for (i = 0; i < nthreads; i++) { + t = kthread_create(taskq_thread, tq, "%s/%d", name, i); + if (t) { + tq->tq_threads[i] = t; + kthread_bind(t, i % num_online_cpus()); + set_user_nice(t, PRIO_TO_NICE(pri)); + wake_up_process(t); + j++; + } else { + tq->tq_threads[i] = NULL; + rc = 1; + } + } + + /* Wait for all threads to be started before potential destroy */ + wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j); + + if (rc) { + __taskq_destroy(tq); + tq = NULL; + } + + RETURN(tq); } EXPORT_SYMBOL(__taskq_create); void __taskq_destroy(taskq_t *tq) { + task_t *t; + int i, nthreads; ENTRY; - destroy_workqueue(tq); - EXIT; -} -EXPORT_SYMBOL(__taskq_destroy); -void -__taskq_wait(taskq_t *tq) -{ - ENTRY; - flush_workqueue(tq); + ASSERT(tq); + spin_lock_irq(&tq->tq_lock); + tq->tq_flags &= ~TQ_ACTIVE; + spin_unlock_irq(&tq->tq_lock); + + /* TQ_ACTIVE cleared prevents new tasks being added to pending */ + __taskq_wait(tq); + + nthreads = tq->tq_nthreads; + for (i = 0; i < nthreads; i++) + if (tq->tq_threads[i]) + kthread_stop(tq->tq_threads[i]); + + spin_lock_irq(&tq->tq_lock); + + while (!list_empty(&tq->tq_free_list)) { + t = list_entry(tq->tq_free_list.next, task_t, t_list); + list_del_init(&t->t_list); + task_free(tq, t); + } + + ASSERT(tq->tq_nthreads == 0); + ASSERT(tq->tq_nalloc == 0); + ASSERT(list_empty(&tq->tq_free_list)); + ASSERT(list_empty(&tq->tq_work_list)); + ASSERT(list_empty(&tq->tq_pend_list)); + + spin_unlock_irq(&tq->tq_lock); + kmem_free(tq->tq_threads, nthreads * sizeof(task_t *)); + kmem_free(tq, sizeof(taskq_t)); + EXIT; } -EXPORT_SYMBOL(__taskq_wait); +EXPORT_SYMBOL(__taskq_destroy); -- cgit v1.2.3