diff options
author | Brian Behlendorf <[email protected]> | 2012-12-06 12:38:19 -0800 |
---|---|---|
committer | Brian Behlendorf <[email protected]> | 2012-12-12 09:54:07 -0800 |
commit | d9acd930b52503582425c6398fc8dbc1d7d4a01b (patch) | |
tree | 7944870d3add0b3fcea705569f52be96a6481329 /module/spl/spl-taskq.c | |
parent | aed8671cb0bfc18f6cd034ecad2e9cf49536d965 (diff) |
taskq delay/cancel functionality
Add the ability to dispatch a delayed task to a taskq. The desired
behavior is for the task to be queued but not executed by a worker
thread until the expiration time is reached. To achieve this two
new functions were added.
* taskq_dispatch_delay() -
This function behaves exactly like taskq_dispatch() however it
takes a third 'expire_time' argument. The caller should pass the
desired time the task should be executed as an absolute value in
jiffies. The task is guarenteed not to run before this time, it
may run slightly latter if all the worker threads are busy.
* taskq_cancel_id() -
Given a task id attempt to cancel the task before it gets executed.
This is primarily useful for canceling delay tasks but can be used for
canceling any previously dispatched task. There are three possible
return values.
0 - The task was found and canceled before it was executed.
ENOENT - The task was not found, either it was already run or an
invalid task id was supplied by the caller.
EBUSY - The task is currently executing any may not be canceled.
This function will block until the task has been completed.
* taskq_wait_all() -
The taskq_wait_id() function was renamed taskq_wait_all() to more
clearly reflect its actual behavior. It is only curreny used by
the splat taskq regression tests.
* taskq_wait_id() -
Historically, the only difference between this function and
taskq_wait() was that you passed the task id. In both functions you
would block until ALL lower task ids which executed. This was
semantically correct but could be very slow particularly if there
were delay tasks submitted.
To better accomidate the delay tasks this function was reimplemnted.
It will now only block until the passed task id has been completed.
This is actually a fairly low risk change for a few reasons.
* Only new ZFS callers will make use of the new interfaces and
very little common code was changed to support the new functions.
* The existing taskq_wait() implementation was not changed just
slightly refactored.
* The newly optimized taskq_wait_id() implementation was never
used by ZFS we can't accidentally introduce a new bug there.
NOTE: This functionality does not exist in the Illumos taskqs.
Signed-off-by: Brian Behlendorf <[email protected]>
Diffstat (limited to 'module/spl/spl-taskq.c')
-rw-r--r-- | module/spl/spl-taskq.c | 452 |
1 files changed, 356 insertions, 96 deletions
diff --git a/module/spl/spl-taskq.c b/module/spl/spl-taskq.c index 2007cf084..c9ae0a50b 100644 --- a/module/spl/spl-taskq.c +++ b/module/spl/spl-taskq.c @@ -69,6 +69,8 @@ retry: t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); + ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL)); + ASSERT(!timer_pending(&t->tqent_timer)); list_del_init(&t->tqent_list); SRETURN(t); @@ -126,6 +128,7 @@ task_free(taskq_t *tq, taskq_ent_t *t) ASSERT(t); ASSERT(spin_is_locked(&tq->tq_lock)); ASSERT(list_empty(&t->tqent_list)); + ASSERT(!timer_pending(&t->tqent_timer)); kmem_free(t, sizeof(taskq_ent_t)); tq->tq_nalloc--; @@ -145,6 +148,9 @@ task_done(taskq_t *tq, taskq_ent_t *t) ASSERT(t); ASSERT(spin_is_locked(&tq->tq_lock)); + /* Wake tasks blocked in taskq_wait_id() */ + wake_up_all(&t->tqent_waitq); + list_del_init(&t->tqent_list); if (tq->tq_nalloc <= tq->tq_minalloc) { @@ -162,56 +168,261 @@ task_done(taskq_t *tq, taskq_ent_t *t) } /* - * As tasks are submitted to the task queue they are assigned a - * monotonically increasing taskqid and added to the tail of the pending - * list. As worker threads become available the tasks are removed from - * the head of the pending or priority list, giving preference to the - * priority list. The tasks are then removed from their respective - * list, and the taskq_thread servicing the task is added to the active - * list, preserving the order using the serviced task's taskqid. - * Finally, as tasks complete the taskq_thread servicing the task is - * removed from the active list. This means that the pending task and - * active taskq_thread lists are always kept sorted by taskqid. Thus the - * lowest outstanding incomplete taskqid can be determined simply by - * checking the min taskqid for each head item on the pending, priority, - * and active taskq_thread list. This value is stored in - * tq->tq_lowest_id and only updated to the new lowest id when the - * previous lowest id completes. All taskqids lower than - * tq->tq_lowest_id must have completed. It is also possible larger - * taskqid's have completed because they may be processed in parallel by - * several worker threads. However, this is not a problem because the - * behavior of taskq_wait_id() is to block until all previously - * submitted taskqid's have completed. - * - * XXX: Taskqid_t wrapping is not handled. However, taskqid_t's are - * 64-bit values so even if a taskq is processing 2^24 (16,777,216) - * taskqid_ts per second it will still take 2^40 seconds, 34,865 years, - * before the wrap occurs. I can live with that for now. + * When a delayed task timer expires remove it from the delay list and + * add it to the priority list in order for immediate processing. */ -static int -taskq_wait_check(taskq_t *tq, taskqid_t id) +static void +task_expire(unsigned long data) { - int rc; + taskq_ent_t *w, *t = (taskq_ent_t *)data; + taskq_t *tq = t->tqent_taskq; + struct list_head *l; spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - rc = (id < tq->tq_lowest_id); + + if (t->tqent_flags & TQENT_FLAG_CANCEL) { + ASSERT(list_empty(&t->tqent_list)); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + return; + } + + /* + * The priority list must be maintained in strict task id order + * from lowest to highest for lowest_id to be easily calculable. + */ + list_del(&t->tqent_list); + list_for_each_prev(l, &tq->tq_prio_list) { + w = list_entry(l, taskq_ent_t, tqent_list); + if (w->tqent_id < t->tqent_id) { + list_add(&t->tqent_list, l); + break; + } + } + if (l == &tq->tq_prio_list) + list_add(&t->tqent_list, &tq->tq_prio_list); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - SRETURN(rc); + wake_up(&tq->tq_work_waitq); +} + +/* + * Returns the lowest incomplete taskqid_t. The taskqid_t may + * be queued on the pending list, on the priority list, on the + * delay list, or on the work list currently being handled, but + * it is not 100% complete yet. + */ +static taskqid_t +taskq_lowest_id(taskq_t *tq) +{ + taskqid_t lowest_id = tq->tq_next_id; + taskq_ent_t *t; + taskq_thread_t *tqt; + SENTRY; + + ASSERT(tq); + ASSERT(spin_is_locked(&tq->tq_lock)); + + if (!list_empty(&tq->tq_pend_list)) { + t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list); + lowest_id = MIN(lowest_id, t->tqent_id); + } + + if (!list_empty(&tq->tq_prio_list)) { + t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list); + lowest_id = MIN(lowest_id, t->tqent_id); + } + + if (!list_empty(&tq->tq_delay_list)) { + t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list); + lowest_id = MIN(lowest_id, t->tqent_id); + } + + if (!list_empty(&tq->tq_active_list)) { + tqt = list_entry(tq->tq_active_list.next, taskq_thread_t, + tqt_active_list); + ASSERT(tqt->tqt_id != 0); + lowest_id = MIN(lowest_id, tqt->tqt_id); + } + + SRETURN(lowest_id); +} + +/* + * Insert a task into a list keeping the list sorted by increasing taskqid. + */ +static void +taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt) +{ + taskq_thread_t *w; + struct list_head *l; + + SENTRY; + ASSERT(tq); + ASSERT(tqt); + ASSERT(spin_is_locked(&tq->tq_lock)); + + list_for_each_prev(l, &tq->tq_active_list) { + w = list_entry(l, taskq_thread_t, tqt_active_list); + if (w->tqt_id < tqt->tqt_id) { + list_add(&tqt->tqt_active_list, l); + break; + } + } + if (l == &tq->tq_active_list) + list_add(&tqt->tqt_active_list, &tq->tq_active_list); + + SEXIT; +} + +/* + * Find and return a task from the given list if it exists. The list + * must be in lowest to highest task id order. + */ +static taskq_ent_t * +taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id) +{ + struct list_head *l; + taskq_ent_t *t; + SENTRY; + + ASSERT(spin_is_locked(&tq->tq_lock)); + + list_for_each(l, lh) { + t = list_entry(l, taskq_ent_t, tqent_list); + + if (t->tqent_id == id) + SRETURN(t); + + if (t->tqent_id > id) + break; + } + + SRETURN(NULL); } +/* + * Find an already dispatched task given the task id regardless of what + * state it is in. If a task is still pending or executing it will be + * returned and 'active' set appropriately. If the task has already + * been run then NULL is returned. + */ +static taskq_ent_t * +taskq_find(taskq_t *tq, taskqid_t id, int *active) +{ + taskq_thread_t *tqt; + struct list_head *l; + taskq_ent_t *t; + SENTRY; + + ASSERT(spin_is_locked(&tq->tq_lock)); + *active = 0; + + t = taskq_find_list(tq, &tq->tq_delay_list, id); + if (t) + SRETURN(t); + + t = taskq_find_list(tq, &tq->tq_prio_list, id); + if (t) + SRETURN(t); + + t = taskq_find_list(tq, &tq->tq_pend_list, id); + if (t) + SRETURN(t); + + list_for_each(l, &tq->tq_active_list) { + tqt = list_entry(l, taskq_thread_t, tqt_active_list); + if (tqt->tqt_id == id) { + t = tqt->tqt_task; + *active = 1; + SRETURN(t); + } + } + + SRETURN(NULL); +} + +/* + * The taskq_wait_id() function blocks until the passed task id completes. + * This does not guarantee that all lower task id's have completed. + */ void taskq_wait_id(taskq_t *tq, taskqid_t id) { + DEFINE_WAIT(wait); + taskq_ent_t *t; + int active = 0; SENTRY; + ASSERT(tq); + ASSERT(id > 0); - wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id)); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + t = taskq_find(tq, id, &active); + if (t) + prepare_to_wait(&t->tqent_waitq, &wait, TASK_UNINTERRUPTIBLE); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + /* + * We rely on the kernels autoremove_wake_function() function to + * remove us from the wait queue in the context of wake_up(). + * Once woken the taskq_ent_t pointer must never be accessed. + */ + if (t) { + t = NULL; + schedule(); + __set_current_state(TASK_RUNNING); + } SEXIT; } EXPORT_SYMBOL(taskq_wait_id); +/* + * The taskq_wait() function will block until all previously submitted + * tasks have been completed. A previously submitted task is defined as + * a task with a lower task id than the current task queue id. Note that + * all task id's are assigned monotonically at dispatch time. + * + * Waiting for all previous tasks to complete is accomplished by tracking + * the lowest outstanding task id. As tasks are dispatched they are added + * added to the tail of the pending, priority, or delay lists. And as + * worker threads become available the tasks are removed from the heads + * of these lists and linked to the worker threads. This ensures the + * lists are kept in lowest to highest task id order. + * + * Therefore the lowest outstanding task id can be quickly determined by + * checking the head item from all of these lists. This value is stored + * with the task queue as the lowest id. It only needs to be recalculated + * when either the task with the current lowest id completes or is canceled. + * + * By blocking until the lowest task id exceeds the current task id when + * the function was called we ensure all previous tasks have completed. + * + * NOTE: When there are multiple worked threads it is possible for larger + * task ids to complete before smaller ones. Conversely when the task + * queue contains delay tasks with small task ids, you may block for a + * considerable length of time waiting for them to expire and execute. + */ +static int +taskq_wait_check(taskq_t *tq, taskqid_t id) +{ + int rc; + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + rc = (id < tq->tq_lowest_id); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + SRETURN(rc); +} + +void +taskq_wait_all(taskq_t *tq, taskqid_t id) +{ + wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id)); +} +EXPORT_SYMBOL(taskq_wait_all); + void taskq_wait(taskq_t *tq) { @@ -224,7 +435,7 @@ taskq_wait(taskq_t *tq) id = tq->tq_next_id - 1; spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - taskq_wait_id(tq, id); + taskq_wait_all(tq, id); SEXIT; @@ -251,6 +462,63 @@ taskq_member(taskq_t *tq, void *t) } EXPORT_SYMBOL(taskq_member); +/* + * Cancel an already dispatched task given the task id. Still pending tasks + * will be immediately canceled, and if the task is active the function will + * block until it completes. Preallocated tasks which are canceled must be + * freed by the caller. + */ +int +taskq_cancel_id(taskq_t *tq, taskqid_t id) +{ + taskq_ent_t *t; + int active = 0; + int rc = ENOENT; + SENTRY; + + ASSERT(tq); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + t = taskq_find(tq, id, &active); + if (t && !active) { + list_del_init(&t->tqent_list); + t->tqent_flags |= TQENT_FLAG_CANCEL; + + /* + * When canceling the lowest outstanding task id we + * must recalculate the new lowest outstanding id. + */ + if (tq->tq_lowest_id == t->tqent_id) { + tq->tq_lowest_id = taskq_lowest_id(tq); + ASSERT3S(tq->tq_lowest_id, >, t->tqent_id); + } + + /* + * The task_expire() function takes the tq->tq_lock so drop + * drop the lock before synchronously cancelling the timer. + */ + if (timer_pending(&t->tqent_timer)) { + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + del_timer_sync(&t->tqent_timer); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + } + + if (!(t->tqent_flags & TQENT_FLAG_PREALLOC)) + task_done(tq, t); + + rc = 0; + } + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + if (active) { + taskq_wait_id(tq, id); + rc = EBUSY; + } + + SRETURN(rc); +} +EXPORT_SYMBOL(taskq_cancel_id); + taskqid_t taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) { @@ -287,6 +555,10 @@ taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) tq->tq_next_id++; t->tqent_func = func; t->tqent_arg = arg; + t->tqent_taskq = tq; + t->tqent_timer.data = 0; + t->tqent_timer.function = NULL; + t->tqent_timer.expires = 0; ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); @@ -299,6 +571,50 @@ out: } EXPORT_SYMBOL(taskq_dispatch); +taskqid_t +taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, + uint_t flags, clock_t expire_time) +{ + taskq_ent_t *t; + taskqid_t rc = 0; + SENTRY; + + ASSERT(tq); + ASSERT(func); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + + /* Taskq being destroyed and all tasks drained */ + if (!(tq->tq_flags & TQ_ACTIVE)) + SGOTO(out, rc = 0); + + if ((t = task_alloc(tq, flags)) == NULL) + SGOTO(out, rc = 0); + + spin_lock(&t->tqent_lock); + + /* Queue to the delay list for subsequent execution */ + list_add_tail(&t->tqent_list, &tq->tq_delay_list); + + t->tqent_id = rc = tq->tq_next_id; + tq->tq_next_id++; + t->tqent_func = func; + t->tqent_arg = arg; + t->tqent_taskq = tq; + t->tqent_timer.data = (unsigned long)t; + t->tqent_timer.function = task_expire; + t->tqent_timer.expires = (unsigned long)expire_time; + add_timer(&t->tqent_timer); + + ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); + + spin_unlock(&t->tqent_lock); +out: + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + SRETURN(rc); +} +EXPORT_SYMBOL(taskq_dispatch_delay); + void taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, taskq_ent_t *t) @@ -335,6 +651,7 @@ taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, tq->tq_next_id++; t->tqent_func = func; t->tqent_arg = arg; + t->tqent_taskq = tq; spin_unlock(&t->tqent_lock); @@ -356,78 +673,17 @@ void taskq_init_ent(taskq_ent_t *t) { spin_lock_init(&t->tqent_lock); + init_waitqueue_head(&t->tqent_waitq); + init_timer(&t->tqent_timer); INIT_LIST_HEAD(&t->tqent_list); t->tqent_id = 0; t->tqent_func = NULL; t->tqent_arg = NULL; t->tqent_flags = 0; + t->tqent_taskq = NULL; } EXPORT_SYMBOL(taskq_init_ent); -/* - * Returns the lowest incomplete taskqid_t. The taskqid_t may - * be queued on the pending list, on the priority list, or on - * the work list currently being handled, but it is not 100% - * complete yet. - */ -static taskqid_t -taskq_lowest_id(taskq_t *tq) -{ - taskqid_t lowest_id = tq->tq_next_id; - taskq_ent_t *t; - taskq_thread_t *tqt; - SENTRY; - - ASSERT(tq); - ASSERT(spin_is_locked(&tq->tq_lock)); - - if (!list_empty(&tq->tq_pend_list)) { - t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list); - lowest_id = MIN(lowest_id, t->tqent_id); - } - - if (!list_empty(&tq->tq_prio_list)) { - t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list); - lowest_id = MIN(lowest_id, t->tqent_id); - } - - if (!list_empty(&tq->tq_active_list)) { - tqt = list_entry(tq->tq_active_list.next, taskq_thread_t, - tqt_active_list); - ASSERT(tqt->tqt_id != 0); - lowest_id = MIN(lowest_id, tqt->tqt_id); - } - - SRETURN(lowest_id); -} - -/* - * Insert a task into a list keeping the list sorted by increasing taskqid. - */ -static void -taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt) -{ - taskq_thread_t *w; - struct list_head *l; - - SENTRY; - ASSERT(tq); - ASSERT(tqt); - ASSERT(spin_is_locked(&tq->tq_lock)); - - list_for_each_prev(l, &tq->tq_active_list) { - w = list_entry(l, taskq_thread_t, tqt_active_list); - if (w->tqt_id < tqt->tqt_id) { - list_add(&tqt->tqt_active_list, l); - break; - } - } - if (l == &tq->tq_active_list) - list_add(&tqt->tqt_active_list, &tq->tq_active_list); - - SEXIT; -} - static int taskq_thread(void *args) { @@ -481,6 +737,7 @@ taskq_thread(void *args) * preallocated taskq_ent_t, tqent_id must be * stored prior to executing tqent_func. */ tqt->tqt_id = t->tqent_id; + tqt->tqt_task = t; /* We must store a copy of the flags prior to * servicing the task (servicing a prealloc'd task @@ -499,6 +756,7 @@ taskq_thread(void *args) spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); tq->tq_nactive--; list_del_init(&tqt->tqt_active_list); + tqt->tqt_task = NULL; /* For prealloc'd tasks, we don't free anything. */ if ((tq->tq_flags & TASKQ_DYNAMIC) || @@ -576,6 +834,7 @@ taskq_create(const char *name, int nthreads, pri_t pri, INIT_LIST_HEAD(&tq->tq_free_list); INIT_LIST_HEAD(&tq->tq_pend_list); INIT_LIST_HEAD(&tq->tq_prio_list); + INIT_LIST_HEAD(&tq->tq_delay_list); init_waitqueue_head(&tq->tq_work_waitq); init_waitqueue_head(&tq->tq_wait_waitq); @@ -669,6 +928,7 @@ taskq_destroy(taskq_t *tq) ASSERT(list_empty(&tq->tq_free_list)); ASSERT(list_empty(&tq->tq_pend_list)); ASSERT(list_empty(&tq->tq_prio_list)); + ASSERT(list_empty(&tq->tq_delay_list)); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); |