diff options
author | Brian Behlendorf <[email protected]> | 2009-03-19 20:59:12 -0700 |
---|---|---|
committer | Brian Behlendorf <[email protected]> | 2009-03-19 20:59:12 -0700 |
commit | eb811f7fa91e495862a7f86525841b9f682f1715 (patch) | |
tree | f1d09f593fadee2b92514cfe8a6e60499c8a3acf /module | |
parent | a71161249ab773b526cc316cf84cfbe44cf1e245 (diff) | |
parent | c89153393fb221e5f6073674cf4660b0bdb44be2 (diff) |
Merge branch 'feature-branch' into refs/top-bases/zfs-branch
Conflicts:
module/zfs/include/sys/dmu_tx.h
module/zfs/include/sys/txg.h
Diffstat (limited to 'module')
-rw-r--r-- | module/zfs/dmu_tx.c | 43 | ||||
-rw-r--r-- | module/zfs/include/sys/dmu.h | 20 | ||||
-rw-r--r-- | module/zfs/include/sys/dmu_tx.h | 12 | ||||
-rw-r--r-- | module/zfs/include/sys/txg.h | 3 | ||||
-rw-r--r-- | module/zfs/include/sys/txg_impl.h | 8 | ||||
-rw-r--r-- | module/zfs/txg.c | 108 |
6 files changed, 183 insertions, 11 deletions
diff --git a/module/zfs/dmu_tx.c b/module/zfs/dmu_tx.c index 18a640d6d..bd4548741 100644 --- a/module/zfs/dmu_tx.c +++ b/module/zfs/dmu_tx.c @@ -48,6 +48,8 @@ dmu_tx_create_dd(dsl_dir_t *dd) tx->tx_pool = dd->dd_pool; list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t), offsetof(dmu_tx_hold_t, txh_node)); + list_create(&tx->tx_callbacks, sizeof (dmu_tx_callback_t), + offsetof(dmu_tx_callback_t, dcb_node)); #ifdef ZFS_DEBUG refcount_create(&tx->tx_space_written); refcount_create(&tx->tx_space_freed); @@ -1020,8 +1022,13 @@ dmu_tx_commit(dmu_tx_t *tx) if (tx->tx_tempreserve_cookie) dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx); + if (!list_is_empty(&tx->tx_callbacks)) + txg_register_callbacks(&tx->tx_txgh, &tx->tx_callbacks); + if (tx->tx_anyobj == FALSE) txg_rele_to_sync(&tx->tx_txgh); + + list_destroy(&tx->tx_callbacks); list_destroy(&tx->tx_holds); #ifdef ZFS_DEBUG dprintf("towrite=%llu written=%llu tofree=%llu freed=%llu\n", @@ -1050,6 +1057,14 @@ dmu_tx_abort(dmu_tx_t *tx) if (dn != NULL) dnode_rele(dn, tx); } + + /* + * Call any registered callbacks with an error code. + */ + if (!list_is_empty(&tx->tx_callbacks)) + dmu_tx_callback(&tx->tx_callbacks, ECANCELED); + + list_destroy(&tx->tx_callbacks); list_destroy(&tx->tx_holds); #ifdef ZFS_DEBUG refcount_destroy_many(&tx->tx_space_written, @@ -1066,3 +1081,31 @@ dmu_tx_get_txg(dmu_tx_t *tx) ASSERT(tx->tx_txg != 0); return (tx->tx_txg); } + +void +dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *func, void *data) +{ + dmu_tx_callback_t *dcb; + + dcb = kmem_alloc(sizeof (dmu_tx_callback_t), KM_SLEEP); + + dcb->dcb_func = func; + dcb->dcb_data = data; + + list_insert_tail(&tx->tx_callbacks, dcb); +} + +/* + * Call all the commit callbacks on a list, with a given error code. + */ +void +dmu_tx_callback(list_t *cb_list, int error) +{ + dmu_tx_callback_t *dcb; + + while (dcb = list_head(cb_list)) { + list_remove(cb_list, dcb); + dcb->dcb_func(dcb->dcb_data, error); + kmem_free(dcb, sizeof (dmu_tx_callback_t)); + } +} diff --git a/module/zfs/include/sys/dmu.h b/module/zfs/include/sys/dmu.h index 3b1e5c8fb..392431a18 100644 --- a/module/zfs/include/sys/dmu.h +++ b/module/zfs/include/sys/dmu.h @@ -430,6 +430,26 @@ void dmu_tx_wait(dmu_tx_t *tx); void dmu_tx_commit(dmu_tx_t *tx); /* + * To register a commit callback, dmu_tx_callback_register() must be called. + * + * dcb_data is a pointer to caller private data that is passed on as a + * callback parameter. The caller is responsible for properly allocating and + * freeing it. + * + * When registering a callback, the transaction must be already created, but + * it cannot be committed or aborted. It can be assigned to a txg or not. + * + * The callback will be called after the transaction has been safely written + * to stable storage and will also be called if the dmu_tx is aborted. + * If there is any error which prevents the transaction from being committed to + * disk, the callback will be called with a value of error != 0. + */ +typedef void dmu_tx_callback_func_t(void *dcb_data, int error); + +void dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *dcb_func, + void *dcb_data); + +/* * Free up the data blocks for a defined range of a file. If size is * zero, the range from offset to end-of-file is freed. */ diff --git a/module/zfs/include/sys/dmu_tx.h b/module/zfs/include/sys/dmu_tx.h index 2fc1fee17..7fcab936f 100644 --- a/module/zfs/include/sys/dmu_tx.h +++ b/module/zfs/include/sys/dmu_tx.h @@ -26,8 +26,6 @@ #ifndef _SYS_DMU_TX_H #define _SYS_DMU_TX_H - - #include <sys/inttypes.h> #include <sys/dmu.h> #include <sys/txg.h> @@ -59,6 +57,7 @@ struct dmu_tx { txg_handle_t tx_txgh; void *tx_tempreserve_cookie; struct dmu_tx_hold *tx_needassign_txh; + list_t tx_callbacks; /* list of dmu_tx_callback_t on this dmu_tx */ uint8_t tx_anyobj; int tx_err; #ifdef ZFS_DEBUG @@ -98,6 +97,11 @@ typedef struct dmu_tx_hold { #endif } dmu_tx_hold_t; +typedef struct dmu_tx_callback { + list_node_t dcb_node; /* linked to tx_callbacks list */ + dmu_tx_callback_func_t *dcb_func; /* caller function pointer */ + void *dcb_data; /* caller private data */ +} dmu_tx_callback_t; /* * These routines are defined in dmu.h, and are called by the user. @@ -109,6 +113,10 @@ void dmu_tx_abort(dmu_tx_t *tx); uint64_t dmu_tx_get_txg(dmu_tx_t *tx); void dmu_tx_wait(dmu_tx_t *tx); +void dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *dcb_func, + void *dcb_data); +void dmu_tx_callback(list_t *cb_list, int error); + /* * These routines are defined in dmu_spa.h, and are called by the SPA. */ diff --git a/module/zfs/include/sys/txg.h b/module/zfs/include/sys/txg.h index ccc2cc5c6..e679898db 100644 --- a/module/zfs/include/sys/txg.h +++ b/module/zfs/include/sys/txg.h @@ -26,8 +26,6 @@ #ifndef _SYS_TXG_H #define _SYS_TXG_H - - #include <sys/spa.h> #include <sys/zfs_context.h> @@ -71,6 +69,7 @@ extern void txg_sync_stop(struct dsl_pool *dp); extern uint64_t txg_hold_open(struct dsl_pool *dp, txg_handle_t *txghp); extern void txg_rele_to_quiesce(txg_handle_t *txghp); extern void txg_rele_to_sync(txg_handle_t *txghp); +extern void txg_register_callbacks(txg_handle_t *txghp, list_t *tx_callbacks); extern void txg_suspend(struct dsl_pool *dp); extern void txg_resume(struct dsl_pool *dp); diff --git a/module/zfs/include/sys/txg_impl.h b/module/zfs/include/sys/txg_impl.h index 7413c662b..bc7d7c7e5 100644 --- a/module/zfs/include/sys/txg_impl.h +++ b/module/zfs/include/sys/txg_impl.h @@ -33,10 +33,16 @@ extern "C" { #endif +typedef struct tx_cb { + tx_cpu_t *tcb_tc; + uint64_t tcb_txg; +} tx_cb_t; + struct tx_cpu { kmutex_t tc_lock; kcondvar_t tc_cv[TXG_SIZE]; uint64_t tc_count[TXG_SIZE]; + list_t tc_callbacks[TXG_SIZE]; /* commit cb list */ char tc_pad[16]; }; @@ -64,6 +70,8 @@ typedef struct tx_state { kthread_t *tx_sync_thread; kthread_t *tx_quiesce_thread; + + taskq_t *tx_commit_cb_taskq; /* commit callback taskq */ } tx_state_t; #ifdef __cplusplus diff --git a/module/zfs/txg.c b/module/zfs/txg.c index d87b053ed..e0bc524a3 100644 --- a/module/zfs/txg.c +++ b/module/zfs/txg.c @@ -26,6 +26,7 @@ #include <sys/zfs_context.h> #include <sys/txg_impl.h> #include <sys/dmu_impl.h> +#include <sys/dmu_tx.h> #include <sys/dsl_pool.h> #include <sys/callb.h> @@ -57,6 +58,9 @@ txg_init(dsl_pool_t *dp, uint64_t txg) for (i = 0; i < TXG_SIZE; i++) { cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, NULL); + list_create(&tx->tx_cpu[c].tc_callbacks[i], + sizeof (dmu_tx_callback_t), + offsetof(dmu_tx_callback_t, dcb_node)); } } @@ -96,10 +100,15 @@ txg_fini(dsl_pool_t *dp) int i; mutex_destroy(&tx->tx_cpu[c].tc_lock); - for (i = 0; i < TXG_SIZE; i++) + for (i = 0; i < TXG_SIZE; i++) { cv_destroy(&tx->tx_cpu[c].tc_cv[i]); + list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); + } } + if (tx->tx_commit_cb_taskq != NULL) + taskq_destroy(tx->tx_commit_cb_taskq); + kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); bzero(tx, sizeof (tx_state_t)); @@ -229,25 +238,55 @@ txg_rele_to_quiesce(txg_handle_t *th) } void -txg_rele_to_sync(txg_handle_t *th) +txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) { tx_cpu_t *tc = th->th_cpu; int g = th->th_txg & TXG_MASK; mutex_enter(&tc->tc_lock); + list_move_tail(&tc->tc_callbacks[g], tx_callbacks); + mutex_exit(&tc->tc_lock); +} + +static void +txg_exit(tx_cpu_t *tc, uint64_t txg) +{ + int g = txg & TXG_MASK; + + mutex_enter(&tc->tc_lock); ASSERT(tc->tc_count[g] != 0); if (--tc->tc_count[g] == 0) cv_broadcast(&tc->tc_cv[g]); mutex_exit(&tc->tc_lock); +} + +void +txg_rele_to_sync(txg_handle_t *th) +{ + txg_exit(th->th_cpu, th->th_txg); th->th_cpu = NULL; /* defensive */ } static void +txg_wait_exit(tx_state_t *tx, uint64_t txg) +{ + int g = txg & TXG_MASK; + int c; + + for (c = 0; c < max_ncpus; c++) { + tx_cpu_t *tc = &tx->tx_cpu[c]; + mutex_enter(&tc->tc_lock); + while (tc->tc_count[g] != 0) + cv_wait(&tc->tc_cv[g], &tc->tc_lock); + mutex_exit(&tc->tc_lock); + } +} + +static void txg_quiesce(dsl_pool_t *dp, uint64_t txg) { tx_state_t *tx = &dp->dp_tx; - int g = txg & TXG_MASK; int c; /* @@ -269,12 +308,60 @@ txg_quiesce(dsl_pool_t *dp, uint64_t txg) /* * Quiesce the transaction group by waiting for everyone to txg_exit(). */ + txg_wait_exit(tx, txg); +} + +static void +txg_callback(tx_cb_t *tcb) +{ + tx_cpu_t *tc = tcb->tcb_tc; + int g = tcb->tcb_txg & TXG_MASK; + + dmu_tx_callback(&tc->tc_callbacks[g], 0); + + txg_exit(tc, tcb->tcb_txg); + + kmem_free(tcb, sizeof (tx_cb_t)); +} + +/* + * Dispatch the commit callbacks registered on this txg to worker threads. + */ +static void +txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) +{ + int c; + tx_state_t *tx = &dp->dp_tx; + tx_cb_t *tcb; + for (c = 0; c < max_ncpus; c++) { tx_cpu_t *tc = &tx->tx_cpu[c]; - mutex_enter(&tc->tc_lock); - while (tc->tc_count[g] != 0) - cv_wait(&tc->tc_cv[g], &tc->tc_lock); - mutex_exit(&tc->tc_lock); + /* No need to lock tx_cpu_t at this point */ + + int g = txg & TXG_MASK; + + if (list_is_empty(&tc->tc_callbacks[g])) + continue; + + if (tx->tx_commit_cb_taskq == NULL) { + /* + * Commit callback taskq hasn't been created yet. + */ + tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", + max_ncpus, minclsyspri, max_ncpus, max_ncpus * 4, + TASKQ_PREPOPULATE); + } + + tcb = kmem_alloc(sizeof (tx_cb_t), KM_SLEEP); + tcb->tcb_txg = txg; + tcb->tcb_tc = tc; + + /* There shouldn't be any holders on this txg at this point */ + ASSERT3U(tc->tc_count[g], ==, 0); + tc->tc_count[g]++; + + (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *) + txg_callback, tcb, TQ_SLEEP); } } @@ -345,6 +432,13 @@ txg_sync_thread(dsl_pool_t *dp) spa_sync(dp->dp_spa, txg); delta = lbolt - start; + /* + * Dispatch commit callbacks to worker threads and wait for + * them to finish. + */ + txg_dispatch_callbacks(dp, txg); + txg_wait_exit(tx, txg); + mutex_enter(&tx->tx_sync_lock); rw_enter(&tx->tx_suspend, RW_WRITER); tx->tx_synced_txg = txg; |