diff options
author | Brian Behlendorf <[email protected]> | 2015-06-08 14:36:27 -0700 |
---|---|---|
committer | Brian Behlendorf <[email protected]> | 2015-06-24 15:14:18 -0700 |
commit | f7a973d99b5e272f4a6223b8fb7db4fc6d363b41 (patch) | |
tree | 551121411cae17bf6d2c158ca5bd87d0d6347f75 /module/spl | |
parent | 5acb2307b2edc55335996bf0ff78f6bdca24a98d (diff) |
Add TASKQ_DYNAMIC feature
Setting the TASKQ_DYNAMIC flag will create a taskq with dynamic
semantics. Initially only a single worker thread will be created
to service tasks dispatched to the queue. As additional threads
are needed they will be dynamically spawned up to the max number
specified by 'nthreads'. When the threads are no longer needed,
because the taskq is empty, they will automatically terminate.
Due to the low cost of creating and destroying threads under Linux
by default new threads and spawned and terminated aggressively.
There are two modules options which can be tuned to adjust this
behavior if needed.
* spl_taskq_thread_sequential - The number of sequential tasks,
without interruption, which needed to be handled by a worker
thread before a new worker thread is spawned. Default 4.
* spl_taskq_thread_dynamic - Provides the ability to completely
disable the use of dynamic taskqs on the system. This is provided
for the purposes of debugging and troubleshooting. Default 1
(enabled).
This behavior is fundamentally consistent with the dynamic taskq
implementation found in both illumos and FreeBSD.
Signed-off-by: Brian Behlendorf <[email protected]>
Signed-off-by: Tim Chase <[email protected]>
Closes #458
Diffstat (limited to 'module/spl')
-rw-r--r-- | module/spl/spl-taskq.c | 292 |
1 files changed, 226 insertions, 66 deletions
diff --git a/module/spl/spl-taskq.c b/module/spl/spl-taskq.c index 49bb40a25..9cd193369 100644 --- a/module/spl/spl-taskq.c +++ b/module/spl/spl-taskq.c @@ -31,10 +31,24 @@ int spl_taskq_thread_bind = 0; module_param(spl_taskq_thread_bind, int, 0644); MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default"); + +int spl_taskq_thread_dynamic = 1; +module_param(spl_taskq_thread_dynamic, int, 0644); +MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads"); + +int spl_taskq_thread_sequential = 4; +module_param(spl_taskq_thread_sequential, int, 0644); +MODULE_PARM_DESC(spl_taskq_thread_sequential, + "Create new taskq threads after N sequential tasks"); + /* Global system-wide dynamic task queue available for all consumers */ taskq_t *system_taskq; EXPORT_SYMBOL(system_taskq); +/* Private dedicated taskq for creating new taskq threads on demand. */ +static taskq_t *dynamic_taskq; +static taskq_thread_t *taskq_thread_create(taskq_t *); + static int task_km_flags(uint_t flags) { @@ -434,17 +448,22 @@ taskq_member(taskq_t *tq, void *t) { struct list_head *l; taskq_thread_t *tqt; + int found = 0; ASSERT(tq); ASSERT(t); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); list_for_each(l, &tq->tq_thread_list) { tqt = list_entry(l, taskq_thread_t, tqt_thread_list); - if (tqt->tqt_thread == (struct task_struct *)t) - return (1); + if (tqt->tqt_thread == (struct task_struct *)t) { + found = 1; + break; + } } + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - return (0); + return (found); } EXPORT_SYMBOL(taskq_member); @@ -516,7 +535,7 @@ taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); /* Taskq being destroyed and all tasks drained */ - if (!(tq->tq_flags & TQ_ACTIVE)) + if (!(tq->tq_flags & TASKQ_ACTIVE)) goto out; /* Do not queue the task unless there is idle thread for it */ @@ -568,7 +587,7 @@ taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); /* Taskq being destroyed and all tasks drained */ - if (!(tq->tq_flags & TQ_ACTIVE)) + if (!(tq->tq_flags & TASKQ_ACTIVE)) goto out; if ((t = task_alloc(tq, flags)) == NULL) @@ -604,12 +623,11 @@ taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, { ASSERT(tq); ASSERT(func); - ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); /* Taskq being destroyed and all tasks drained */ - if (!(tq->tq_flags & TQ_ACTIVE)) { + if (!(tq->tq_flags & TASKQ_ACTIVE)) { t->tqent_id = 0; goto out; } @@ -664,6 +682,97 @@ taskq_init_ent(taskq_ent_t *t) } EXPORT_SYMBOL(taskq_init_ent); +/* + * Return the next pending task, preference is given to tasks on the + * priority list which were dispatched with TQ_FRONT. + */ +static taskq_ent_t * +taskq_next_ent(taskq_t *tq) +{ + struct list_head *list; + + ASSERT(spin_is_locked(&tq->tq_lock)); + + if (!list_empty(&tq->tq_prio_list)) + list = &tq->tq_prio_list; + else if (!list_empty(&tq->tq_pend_list)) + list = &tq->tq_pend_list; + else + return (NULL); + + return (list_entry(list->next, taskq_ent_t, tqent_list)); +} + +/* + * Spawns a new thread for the specified taskq. + */ +static void +taskq_thread_spawn_task(void *arg) +{ + taskq_t *tq = (taskq_t *)arg; + + (void) taskq_thread_create(tq); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + tq->tq_nspawn--; + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); +} + +/* + * Spawn addition threads for dynamic taskqs (TASKQ_DYNMAIC) the current + * number of threads is insufficient to handle the pending tasks. These + * new threads must be created by the dedicated dynamic_taskq to avoid + * deadlocks between thread creation and memory reclaim. The system_taskq + * which is also a dynamic taskq cannot be safely used for this. + */ +static int +taskq_thread_spawn(taskq_t *tq, int seq_tasks) +{ + int spawning = 0; + + if (!(tq->tq_flags & TASKQ_DYNAMIC)) + return (0); + + if ((seq_tasks > spl_taskq_thread_sequential) && + (tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) && + (tq->tq_flags & TASKQ_ACTIVE)) { + spawning = (++tq->tq_nspawn); + taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task, + tq, TQ_NOSLEEP); + } + + return (spawning); +} + +/* + * Threads in a dynamic taskq should only exit once it has been completely + * drained and no other threads are actively servicing tasks. This prevents + * threads from being created and destroyed more than is required. + * + * The first thread is the thread list is treated as the primary thread. + * There is nothing special about the primary thread but in order to avoid + * all the taskq pids from changing we opt to make it long running. + */ +static int +taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt) +{ + ASSERT(spin_is_locked(&tq->tq_lock)); + + if (!(tq->tq_flags & TASKQ_DYNAMIC)) + return (0); + + if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t, + tqt_thread_list) == tqt) + return (0); + + return + ((tq->tq_nspawn == 0) && /* No threads are being spawned */ + (tq->tq_nactive == 0) && /* No threads are handling tasks */ + (tq->tq_nthreads > 1) && /* More than 1 thread is running */ + (!taskq_next_ent(tq)) && /* There are no pending tasks */ + (spl_taskq_thread_dynamic));/* Dynamic taskqs are allowed */ +} + static int taskq_thread(void *args) { @@ -672,7 +781,7 @@ taskq_thread(void *args) taskq_thread_t *tqt = args; taskq_t *tq; taskq_ent_t *t; - struct list_head *pend_list; + int seq_tasks = 0; ASSERT(tqt); tq = tqt->tqt_tq; @@ -683,7 +792,13 @@ taskq_thread(void *args) flush_signals(current); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + + /* Immediately exit if more threads than allowed were created. */ + if (tq->tq_nthreads >= tq->tq_maxthreads) + goto error; + tq->tq_nthreads++; + list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list); wake_up(&tq->tq_wait_waitq); set_current_state(TASK_INTERRUPTIBLE); @@ -691,25 +806,25 @@ taskq_thread(void *args) if (list_empty(&tq->tq_pend_list) && list_empty(&tq->tq_prio_list)) { + + if (taskq_thread_should_stop(tq, tqt)) { + wake_up_all(&tq->tq_wait_waitq); + break; + } + add_wait_queue_exclusive(&tq->tq_work_waitq, &wait); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + schedule(); + seq_tasks = 0; + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); remove_wait_queue(&tq->tq_work_waitq, &wait); } else { __set_current_state(TASK_RUNNING); } - - if (!list_empty(&tq->tq_prio_list)) - pend_list = &tq->tq_prio_list; - else if (!list_empty(&tq->tq_pend_list)) - pend_list = &tq->tq_pend_list; - else - pend_list = NULL; - - if (pend_list) { - t = list_entry(pend_list->next,taskq_ent_t,tqent_list); + if ((t = taskq_next_ent(tq)) != NULL) { list_del_init(&t->tqent_list); /* In order to support recursively dispatching a @@ -738,8 +853,7 @@ taskq_thread(void *args) tqt->tqt_task = NULL; /* For prealloc'd tasks, we don't free anything. */ - if ((tq->tq_flags & TASKQ_DYNAMIC) || - !(tqt->tqt_flags & TQENT_FLAG_PREALLOC)) + if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC)) task_done(tq, t); /* When the current lowest outstanding taskqid is @@ -749,9 +863,16 @@ taskq_thread(void *args) ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id); } + /* Spawn additional taskq threads if required. */ + if (taskq_thread_spawn(tq, ++seq_tasks)) + seq_tasks = 0; + tqt->tqt_id = 0; tqt->tqt_flags = 0; wake_up_all(&tq->tq_wait_waitq); + } else { + if (taskq_thread_should_stop(tq, tqt)) + break; } set_current_state(TASK_INTERRUPTIBLE); @@ -761,27 +882,56 @@ taskq_thread(void *args) __set_current_state(TASK_RUNNING); tq->tq_nthreads--; list_del_init(&tqt->tqt_thread_list); - kmem_free(tqt, sizeof(taskq_thread_t)); - +error: + kmem_free(tqt, sizeof (taskq_thread_t)); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); return (0); } +static taskq_thread_t * +taskq_thread_create(taskq_t *tq) +{ + static int last_used_cpu = 0; + taskq_thread_t *tqt; + + tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE); + INIT_LIST_HEAD(&tqt->tqt_thread_list); + INIT_LIST_HEAD(&tqt->tqt_active_list); + tqt->tqt_tq = tq; + tqt->tqt_id = 0; + + tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt, + "%s", tq->tq_name); + if (tqt->tqt_thread == NULL) { + kmem_free(tqt, sizeof (taskq_thread_t)); + return (NULL); + } + + if (spl_taskq_thread_bind) { + last_used_cpu = (last_used_cpu + 1) % num_online_cpus(); + kthread_bind(tqt->tqt_thread, last_used_cpu); + } + + set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri)); + wake_up_process(tqt->tqt_thread); + + return (tqt); +} + taskq_t * taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, int maxalloc, uint_t flags) { - static int last_used_cpu = 0; taskq_t *tq; taskq_thread_t *tqt; - int rc = 0, i, j = 0; + int count = 0, rc = 0, i; ASSERT(name != NULL); ASSERT(pri <= maxclsyspri); ASSERT(minalloc >= 0); ASSERT(maxalloc <= INT_MAX); - ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */ + ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */ /* Scale the number of threads using nthreads as a percentage */ if (flags & TASKQ_THREADS_CPU_PCT) { @@ -792,24 +942,25 @@ taskq_create(const char *name, int nthreads, pri_t pri, nthreads = MAX((num_online_cpus() * nthreads) / 100, 1); } - tq = kmem_alloc(sizeof(*tq), KM_PUSHPAGE); + tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE); if (tq == NULL) return (NULL); spin_lock_init(&tq->tq_lock); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); INIT_LIST_HEAD(&tq->tq_thread_list); INIT_LIST_HEAD(&tq->tq_active_list); - tq->tq_name = name; - tq->tq_nactive = 0; - tq->tq_nthreads = 0; - tq->tq_pri = pri; - tq->tq_minalloc = minalloc; - tq->tq_maxalloc = maxalloc; - tq->tq_nalloc = 0; - tq->tq_flags = (flags | TQ_ACTIVE); - tq->tq_next_id = 1; - tq->tq_lowest_id = 1; + tq->tq_name = strdup(name); + tq->tq_nactive = 0; + tq->tq_nthreads = 0; + tq->tq_nspawn = 0; + tq->tq_maxthreads = nthreads; + tq->tq_pri = pri; + tq->tq_minalloc = minalloc; + tq->tq_maxalloc = maxalloc; + tq->tq_nalloc = 0; + tq->tq_flags = (flags | TASKQ_ACTIVE); + tq->tq_next_id = 1; + tq->tq_lowest_id = 1; INIT_LIST_HEAD(&tq->tq_free_list); INIT_LIST_HEAD(&tq->tq_pend_list); INIT_LIST_HEAD(&tq->tq_prio_list); @@ -817,38 +968,28 @@ taskq_create(const char *name, int nthreads, pri_t pri, init_waitqueue_head(&tq->tq_work_waitq); init_waitqueue_head(&tq->tq_wait_waitq); - if (flags & TASKQ_PREPOPULATE) + if (flags & TASKQ_PREPOPULATE) { + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + for (i = 0; i < minalloc; i++) task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW)); - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + } + + if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) + nthreads = 1; for (i = 0; i < nthreads; i++) { - tqt = kmem_alloc(sizeof(*tqt), KM_PUSHPAGE); - INIT_LIST_HEAD(&tqt->tqt_thread_list); - INIT_LIST_HEAD(&tqt->tqt_active_list); - tqt->tqt_tq = tq; - tqt->tqt_id = 0; - - tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt, - "%s/%d", name, i); - if (tqt->tqt_thread) { - list_add(&tqt->tqt_thread_list, &tq->tq_thread_list); - if (spl_taskq_thread_bind) { - last_used_cpu = (last_used_cpu + 1) % num_online_cpus(); - kthread_bind(tqt->tqt_thread, last_used_cpu); - } - set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(pri)); - wake_up_process(tqt->tqt_thread); - j++; - } else { - kmem_free(tqt, sizeof(taskq_thread_t)); + tqt = taskq_thread_create(tq); + if (tqt == NULL) rc = 1; - } + else + count++; } /* Wait for all threads to be started before potential destroy */ - wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j); + wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count); if (rc) { taskq_destroy(tq); @@ -868,10 +1009,16 @@ taskq_destroy(taskq_t *tq) ASSERT(tq); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - tq->tq_flags &= ~TQ_ACTIVE; + tq->tq_flags &= ~TASKQ_ACTIVE; spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - /* TQ_ACTIVE cleared prevents new tasks being added to pending */ + /* + * When TASKQ_ACTIVE is clear new tasks may not be added nor may + * new worker threads be spawned for dynamic taskq. + */ + if (dynamic_taskq != NULL) + taskq_wait_outstanding(dynamic_taskq, 0); + taskq_wait(tq); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); @@ -884,7 +1031,7 @@ taskq_destroy(taskq_t *tq) */ while (!list_empty(&tq->tq_thread_list)) { tqt = list_entry(tq->tq_thread_list.next, - taskq_thread_t, tqt_thread_list); + taskq_thread_t, tqt_thread_list); thread = tqt->tqt_thread; spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); @@ -902,8 +1049,9 @@ taskq_destroy(taskq_t *tq) task_free(tq, t); } - ASSERT(tq->tq_nthreads == 0); - ASSERT(tq->tq_nalloc == 0); + ASSERT0(tq->tq_nthreads); + ASSERT0(tq->tq_nalloc); + ASSERT0(tq->tq_nspawn); ASSERT(list_empty(&tq->tq_thread_list)); ASSERT(list_empty(&tq->tq_active_list)); ASSERT(list_empty(&tq->tq_free_list)); @@ -913,7 +1061,8 @@ taskq_destroy(taskq_t *tq) spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - kmem_free(tq, sizeof(taskq_t)); + strfree(tq->tq_name); + kmem_free(tq, sizeof (taskq_t)); } EXPORT_SYMBOL(taskq_destroy); @@ -927,11 +1076,22 @@ spl_taskq_init(void) if (system_taskq == NULL) return (1); + dynamic_taskq = taskq_create("spl_dynamic_taskq", 1, + minclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE); + if (dynamic_taskq == NULL) { + taskq_destroy(system_taskq); + return (1); + } + return (0); } void spl_taskq_fini(void) { + taskq_destroy(dynamic_taskq); + dynamic_taskq = NULL; + taskq_destroy(system_taskq); + system_taskq = NULL; } |