diff options
author | Brian Behlendorf <[email protected]> | 2009-03-19 20:30:14 -0700 |
---|---|---|
committer | Brian Behlendorf <[email protected]> | 2009-03-19 20:30:14 -0700 |
commit | 29703a5b4e2e3f753199bd0b2dd6b29a1e99b25a (patch) | |
tree | 398c107daedbad5b9f01e42651c9d61dc2275d52 | |
parent | d164b2093561a9771db07346e6fffc9ca19427a2 (diff) |
Add 'feature-commit-cb' branch for DMU commit callbacks.
-rw-r--r-- | .topdeps | 1 | ||||
-rw-r--r-- | .topmsg | 8 | ||||
-rw-r--r-- | cmd/ztest/ztest.c | 223 | ||||
-rw-r--r-- | module/zfs/dmu_tx.c | 43 | ||||
-rw-r--r-- | module/zfs/include/sys/dmu.h | 20 | ||||
-rw-r--r-- | module/zfs/include/sys/dmu_tx.h | 12 | ||||
-rw-r--r-- | module/zfs/include/sys/txg.h | 3 | ||||
-rw-r--r-- | module/zfs/include/sys/txg_impl.h | 8 | ||||
-rw-r--r-- | module/zfs/txg.c | 108 |
9 files changed, 415 insertions, 11 deletions
diff --git a/.topdeps b/.topdeps new file mode 100644 index 000000000..1f7391f92 --- /dev/null +++ b/.topdeps @@ -0,0 +1 @@ +master diff --git a/.topmsg b/.topmsg new file mode 100644 index 000000000..33c1e1c91 --- /dev/null +++ b/.topmsg @@ -0,0 +1,8 @@ +From: Brian Behlendorf <[email protected]> +Subject: [PATCH] feature commit cb + +DMU commit callbacks. Provides an API to attach callbacks to transactions, +which are called by the DMU when they are safely committed to disk. +See Lustre bug 14117 for details. + +Signed-off-by: Brian Behlendorf <[email protected]> diff --git a/cmd/ztest/ztest.c b/cmd/ztest/ztest.c index 4503a3d02..efe4fd06b 100644 --- a/cmd/ztest/ztest.c +++ b/cmd/ztest/ztest.c @@ -164,6 +164,7 @@ typedef void ztest_func_t(ztest_args_t *); ztest_func_t ztest_dmu_read_write; ztest_func_t ztest_dmu_write_parallel; ztest_func_t ztest_dmu_object_alloc_free; +ztest_func_t ztest_dmu_commit_callbacks; ztest_func_t ztest_zap; ztest_func_t ztest_zap_parallel; ztest_func_t ztest_traverse; @@ -198,6 +199,7 @@ ztest_info_t ztest_info[] = { { ztest_dmu_read_write, 1, &zopt_always }, { ztest_dmu_write_parallel, 30, &zopt_always }, { ztest_dmu_object_alloc_free, 1, &zopt_always }, + { ztest_dmu_commit_callbacks, 10, &zopt_always }, { ztest_zap, 30, &zopt_always }, { ztest_zap_parallel, 100, &zopt_always }, { ztest_dsl_prop_get_set, 1, &zopt_sometimes }, @@ -218,6 +220,16 @@ ztest_info_t ztest_info[] = { #define ZTEST_SYNC_LOCKS 16 /* + * The following struct is used to hold a list of uncalled commit callbacks. + * + * The callbacks are ordered by txg number. + */ +typedef struct ztest_cb_list { + mutex_t zcl_callbacks_lock; + list_t zcl_callbacks; +} ztest_cb_list_t; + +/* * Stuff we need to share writably between parent and child. */ typedef struct ztest_shared { @@ -233,6 +245,7 @@ typedef struct ztest_shared { ztest_info_t zs_info[ZTEST_FUNCS]; mutex_t zs_sync_lock[ZTEST_SYNC_LOCKS]; uint64_t zs_seq[ZTEST_SYNC_LOCKS]; + ztest_cb_list_t zs_cb_list; } ztest_shared_t; static char ztest_dev_template[] = "%s/%s.%llua"; @@ -2433,6 +2446,205 @@ ztest_zap_parallel(ztest_args_t *za) dmu_tx_commit(tx); } +/* + * Commit callback data. + */ +typedef struct ztest_cb_data { + list_node_t zcd_node; + ztest_cb_list_t *zcd_zcl; + uint64_t zcd_txg; + int zcd_expected_err; + boolean_t zcd_added; + boolean_t zcd_called; + spa_t *zcd_spa; +} ztest_cb_data_t; + +static void +ztest_commit_callback(void *arg, int error) +{ + ztest_cb_data_t *data = arg; + ztest_cb_list_t *zcl; + uint64_t synced_txg; + + VERIFY(data != NULL); + VERIFY3S(data->zcd_expected_err, ==, error); + VERIFY(!data->zcd_called); + + synced_txg = spa_last_synced_txg(data->zcd_spa); + if (data->zcd_txg > synced_txg) + fatal(0, "commit callback of txg %llu prematurely called, last" + " synced txg = %llu\n", data->zcd_txg, synced_txg); + + zcl = data->zcd_zcl; + data->zcd_called = B_TRUE; + + if (error == ECANCELED) { + ASSERT3U(data->zcd_txg, ==, 0); + ASSERT(!data->zcd_added); + + /* + * The private callback data should be destroyed here, but + * since we are going to check the zcd_called field after + * dmu_tx_abort(), we will destroy it there. + */ + return; + } + + if (!data->zcd_added) + goto out; + + ASSERT3U(data->zcd_txg, !=, 0); + + /* Remove our callback from the list */ + (void) mutex_lock(&zcl->zcl_callbacks_lock); + list_remove(&zcl->zcl_callbacks, data); + (void) mutex_unlock(&zcl->zcl_callbacks_lock); + +out: + umem_free(data, sizeof (ztest_cb_data_t)); +} + +static ztest_cb_data_t * +ztest_create_cb_data(objset_t *os, ztest_cb_list_t *zcl, uint64_t txg) +{ + ztest_cb_data_t *cb_data; + + cb_data = umem_zalloc(sizeof (ztest_cb_data_t), UMEM_NOFAIL); + + cb_data->zcd_zcl = zcl; + cb_data->zcd_txg = txg; + cb_data->zcd_spa = os->os->os_spa; + + return (cb_data); +} + +/* + * If a number of txgs equal to this threshold have been created after a commit + * callback has been registered but not called, then we assume there is an + * implementation bug. + */ +#define ZTEST_COMMIT_CALLBACK_THRESH 3 + +/* + * Commit callback test. + */ +void +ztest_dmu_commit_callbacks(ztest_args_t *za) +{ + objset_t *os = za->za_os; + dmu_tx_t *tx; + ztest_cb_list_t *zcl = &ztest_shared->zs_cb_list; + ztest_cb_data_t *cb_data[3], *tmp_cb; + uint64_t old_txg, txg; + int i, error; + + tx = dmu_tx_create(os); + + cb_data[0] = ztest_create_cb_data(os, zcl, 0); + dmu_tx_callback_register(tx, ztest_commit_callback, cb_data[0]); + + dmu_tx_hold_write(tx, ZTEST_DIROBJ, za->za_diroff, sizeof (uint64_t)); + + /* Every once in a while, abort the transaction on purpose */ + if (ztest_random(100) == 0) + error = -1; + + if (!error) + error = dmu_tx_assign(tx, TXG_NOWAIT); + + txg = error ? 0 : dmu_tx_get_txg(tx); + + cb_data[1] = ztest_create_cb_data(os, zcl, txg); + dmu_tx_callback_register(tx, ztest_commit_callback, cb_data[1]); + + if (error) { + /* + * It's not a strict requirement to call the registered + * callbacks from inside dmu_tx_abort(), but that's what + * happens in the current implementation so we will check for + * that. + */ + for (i = 0; i < 2; i++) { + cb_data[i]->zcd_expected_err = ECANCELED; + VERIFY(!cb_data[i]->zcd_called); + } + + dmu_tx_abort(tx); + + for (i = 0; i < 2; i++) { + VERIFY(cb_data[i]->zcd_called); + umem_free(cb_data[i], sizeof (ztest_cb_data_t)); + } + + return; + } + + cb_data[0]->zcd_txg = txg; + cb_data[2] = ztest_create_cb_data(os, zcl, txg); + dmu_tx_callback_register(tx, ztest_commit_callback, cb_data[2]); + + /* + * Read existing data to make sure there isn't a future leak. + */ + VERIFY(0 == dmu_read(os, ZTEST_DIROBJ, za->za_diroff, sizeof (uint64_t), + &old_txg)); + + if (old_txg > txg) + fatal(0, "future leak: got %llx, open txg is %llx", old_txg, + txg); + + dmu_write(os, ZTEST_DIROBJ, za->za_diroff, sizeof (uint64_t), &txg, tx); + + (void) mutex_lock(&zcl->zcl_callbacks_lock); + + /* + * Since commit callbacks don't have any ordering requirement and since + * it is theoretically possible for a commit callback to be called + * after an arbitrary amount of time has elapsed since its txg has been + * synced, it is difficult to reliably determine whether a commit + * callback hasn't been called due to high load or due to a flawed + * implementation. + * + * In practice, we will assume that if after a certain number of txgs a + * commit callback hasn't been called, then most likely there's an + * implementation bug.. + */ + tmp_cb = list_head(&zcl->zcl_callbacks); + if (tmp_cb != NULL) + VERIFY3U(tmp_cb->zcd_txg, >, + txg - ZTEST_COMMIT_CALLBACK_THRESH); + + /* + * Let's find the place to insert our callbacks. + * + * Even though the list is ordered by txg, it is possible for the + * insertion point to not be the end because our txg may already be + * quiescing at this point and other callbacks in the open txg may + * have sneaked in. + */ + tmp_cb = list_tail(&zcl->zcl_callbacks); + while (tmp_cb != NULL && tmp_cb->zcd_txg > txg) + tmp_cb = list_prev(&zcl->zcl_callbacks, tmp_cb); + + /* Add the 3 callbacks to the list */ + for (i = 0; i < 3; i++) { + if (tmp_cb == NULL) + list_insert_head(&zcl->zcl_callbacks, cb_data[i]); + else + list_insert_after(&zcl->zcl_callbacks, tmp_cb, + cb_data[i]); + + cb_data[i]->zcd_added = B_TRUE; + VERIFY(!cb_data[i]->zcd_called); + + tmp_cb = cb_data[i]; + } + + (void) mutex_unlock(&zcl->zcl_callbacks_lock); + + dmu_tx_commit(tx); +} + void ztest_dsl_prop_get_set(ztest_args_t *za) { @@ -3041,6 +3253,11 @@ ztest_run(char *pool) (void) _mutex_init(&zs->zs_vdev_lock, USYNC_THREAD, NULL); (void) rwlock_init(&zs->zs_name_lock, USYNC_THREAD, NULL); + (void) _mutex_init(&zs->zs_cb_list.zcl_callbacks_lock, USYNC_THREAD, + NULL); + + list_create(&zs->zs_cb_list.zcl_callbacks, sizeof (ztest_cb_data_t), + offsetof(ztest_cb_data_t, zcd_node)); for (t = 0; t < ZTEST_SYNC_LOCKS; t++) (void) _mutex_init(&zs->zs_sync_lock[t], USYNC_THREAD, NULL); @@ -3240,6 +3457,12 @@ ztest_run(char *pool) spa_close(spa, FTAG); kernel_fini(); + + list_destroy(&zs->zs_cb_list.zcl_callbacks); + + (void) _mutex_destroy(&zs->zs_cb_list.zcl_callbacks_lock); + (void) rwlock_destroy(&zs->zs_name_lock); + (void) _mutex_destroy(&zs->zs_vdev_lock); } void diff --git a/module/zfs/dmu_tx.c b/module/zfs/dmu_tx.c index bf560e565..0af089198 100644 --- a/module/zfs/dmu_tx.c +++ b/module/zfs/dmu_tx.c @@ -48,6 +48,8 @@ dmu_tx_create_dd(dsl_dir_t *dd) tx->tx_pool = dd->dd_pool; list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t), offsetof(dmu_tx_hold_t, txh_node)); + list_create(&tx->tx_callbacks, sizeof (dmu_tx_callback_t), + offsetof(dmu_tx_callback_t, dcb_node)); #ifdef ZFS_DEBUG refcount_create(&tx->tx_space_written); refcount_create(&tx->tx_space_freed); @@ -1020,8 +1022,13 @@ dmu_tx_commit(dmu_tx_t *tx) if (tx->tx_tempreserve_cookie) dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx); + if (!list_is_empty(&tx->tx_callbacks)) + txg_register_callbacks(&tx->tx_txgh, &tx->tx_callbacks); + if (tx->tx_anyobj == FALSE) txg_rele_to_sync(&tx->tx_txgh); + + list_destroy(&tx->tx_callbacks); list_destroy(&tx->tx_holds); #ifdef ZFS_DEBUG dprintf("towrite=%llu written=%llu tofree=%llu freed=%llu\n", @@ -1050,6 +1057,14 @@ dmu_tx_abort(dmu_tx_t *tx) if (dn != NULL) dnode_rele(dn, tx); } + + /* + * Call any registered callbacks with an error code. + */ + if (!list_is_empty(&tx->tx_callbacks)) + dmu_tx_callback(&tx->tx_callbacks, ECANCELED); + + list_destroy(&tx->tx_callbacks); list_destroy(&tx->tx_holds); #ifdef ZFS_DEBUG refcount_destroy_many(&tx->tx_space_written, @@ -1066,3 +1081,31 @@ dmu_tx_get_txg(dmu_tx_t *tx) ASSERT(tx->tx_txg != 0); return (tx->tx_txg); } + +void +dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *func, void *data) +{ + dmu_tx_callback_t *dcb; + + dcb = kmem_alloc(sizeof (dmu_tx_callback_t), KM_SLEEP); + + dcb->dcb_func = func; + dcb->dcb_data = data; + + list_insert_tail(&tx->tx_callbacks, dcb); +} + +/* + * Call all the commit callbacks on a list, with a given error code. + */ +void +dmu_tx_callback(list_t *cb_list, int error) +{ + dmu_tx_callback_t *dcb; + + while (dcb = list_head(cb_list)) { + list_remove(cb_list, dcb); + dcb->dcb_func(dcb->dcb_data, error); + kmem_free(dcb, sizeof (dmu_tx_callback_t)); + } +} diff --git a/module/zfs/include/sys/dmu.h b/module/zfs/include/sys/dmu.h index 3b1e5c8fb..392431a18 100644 --- a/module/zfs/include/sys/dmu.h +++ b/module/zfs/include/sys/dmu.h @@ -430,6 +430,26 @@ void dmu_tx_wait(dmu_tx_t *tx); void dmu_tx_commit(dmu_tx_t *tx); /* + * To register a commit callback, dmu_tx_callback_register() must be called. + * + * dcb_data is a pointer to caller private data that is passed on as a + * callback parameter. The caller is responsible for properly allocating and + * freeing it. + * + * When registering a callback, the transaction must be already created, but + * it cannot be committed or aborted. It can be assigned to a txg or not. + * + * The callback will be called after the transaction has been safely written + * to stable storage and will also be called if the dmu_tx is aborted. + * If there is any error which prevents the transaction from being committed to + * disk, the callback will be called with a value of error != 0. + */ +typedef void dmu_tx_callback_func_t(void *dcb_data, int error); + +void dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *dcb_func, + void *dcb_data); + +/* * Free up the data blocks for a defined range of a file. If size is * zero, the range from offset to end-of-file is freed. */ diff --git a/module/zfs/include/sys/dmu_tx.h b/module/zfs/include/sys/dmu_tx.h index 2727daaaa..7fcab936f 100644 --- a/module/zfs/include/sys/dmu_tx.h +++ b/module/zfs/include/sys/dmu_tx.h @@ -26,8 +26,6 @@ #ifndef _SYS_DMU_TX_H #define _SYS_DMU_TX_H -#pragma ident "%Z%%M% %I% %E% SMI" - #include <sys/inttypes.h> #include <sys/dmu.h> #include <sys/txg.h> @@ -59,6 +57,7 @@ struct dmu_tx { txg_handle_t tx_txgh; void *tx_tempreserve_cookie; struct dmu_tx_hold *tx_needassign_txh; + list_t tx_callbacks; /* list of dmu_tx_callback_t on this dmu_tx */ uint8_t tx_anyobj; int tx_err; #ifdef ZFS_DEBUG @@ -98,6 +97,11 @@ typedef struct dmu_tx_hold { #endif } dmu_tx_hold_t; +typedef struct dmu_tx_callback { + list_node_t dcb_node; /* linked to tx_callbacks list */ + dmu_tx_callback_func_t *dcb_func; /* caller function pointer */ + void *dcb_data; /* caller private data */ +} dmu_tx_callback_t; /* * These routines are defined in dmu.h, and are called by the user. @@ -109,6 +113,10 @@ void dmu_tx_abort(dmu_tx_t *tx); uint64_t dmu_tx_get_txg(dmu_tx_t *tx); void dmu_tx_wait(dmu_tx_t *tx); +void dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *dcb_func, + void *dcb_data); +void dmu_tx_callback(list_t *cb_list, int error); + /* * These routines are defined in dmu_spa.h, and are called by the SPA. */ diff --git a/module/zfs/include/sys/txg.h b/module/zfs/include/sys/txg.h index 23bdff211..e679898db 100644 --- a/module/zfs/include/sys/txg.h +++ b/module/zfs/include/sys/txg.h @@ -26,8 +26,6 @@ #ifndef _SYS_TXG_H #define _SYS_TXG_H -#pragma ident "%Z%%M% %I% %E% SMI" - #include <sys/spa.h> #include <sys/zfs_context.h> @@ -71,6 +69,7 @@ extern void txg_sync_stop(struct dsl_pool *dp); extern uint64_t txg_hold_open(struct dsl_pool *dp, txg_handle_t *txghp); extern void txg_rele_to_quiesce(txg_handle_t *txghp); extern void txg_rele_to_sync(txg_handle_t *txghp); +extern void txg_register_callbacks(txg_handle_t *txghp, list_t *tx_callbacks); extern void txg_suspend(struct dsl_pool *dp); extern void txg_resume(struct dsl_pool *dp); diff --git a/module/zfs/include/sys/txg_impl.h b/module/zfs/include/sys/txg_impl.h index 7413c662b..bc7d7c7e5 100644 --- a/module/zfs/include/sys/txg_impl.h +++ b/module/zfs/include/sys/txg_impl.h @@ -33,10 +33,16 @@ extern "C" { #endif +typedef struct tx_cb { + tx_cpu_t *tcb_tc; + uint64_t tcb_txg; +} tx_cb_t; + struct tx_cpu { kmutex_t tc_lock; kcondvar_t tc_cv[TXG_SIZE]; uint64_t tc_count[TXG_SIZE]; + list_t tc_callbacks[TXG_SIZE]; /* commit cb list */ char tc_pad[16]; }; @@ -64,6 +70,8 @@ typedef struct tx_state { kthread_t *tx_sync_thread; kthread_t *tx_quiesce_thread; + + taskq_t *tx_commit_cb_taskq; /* commit callback taskq */ } tx_state_t; #ifdef __cplusplus diff --git a/module/zfs/txg.c b/module/zfs/txg.c index e3c0e2a13..b5fcc8c4a 100644 --- a/module/zfs/txg.c +++ b/module/zfs/txg.c @@ -26,6 +26,7 @@ #include <sys/zfs_context.h> #include <sys/txg_impl.h> #include <sys/dmu_impl.h> +#include <sys/dmu_tx.h> #include <sys/dsl_pool.h> #include <sys/callb.h> @@ -57,6 +58,9 @@ txg_init(dsl_pool_t *dp, uint64_t txg) for (i = 0; i < TXG_SIZE; i++) { cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, NULL); + list_create(&tx->tx_cpu[c].tc_callbacks[i], + sizeof (dmu_tx_callback_t), + offsetof(dmu_tx_callback_t, dcb_node)); } } @@ -96,10 +100,15 @@ txg_fini(dsl_pool_t *dp) int i; mutex_destroy(&tx->tx_cpu[c].tc_lock); - for (i = 0; i < TXG_SIZE; i++) + for (i = 0; i < TXG_SIZE; i++) { cv_destroy(&tx->tx_cpu[c].tc_cv[i]); + list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); + } } + if (tx->tx_commit_cb_taskq != NULL) + taskq_destroy(tx->tx_commit_cb_taskq); + kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); bzero(tx, sizeof (tx_state_t)); @@ -229,25 +238,55 @@ txg_rele_to_quiesce(txg_handle_t *th) } void -txg_rele_to_sync(txg_handle_t *th) +txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) { tx_cpu_t *tc = th->th_cpu; int g = th->th_txg & TXG_MASK; mutex_enter(&tc->tc_lock); + list_move_tail(&tc->tc_callbacks[g], tx_callbacks); + mutex_exit(&tc->tc_lock); +} + +static void +txg_exit(tx_cpu_t *tc, uint64_t txg) +{ + int g = txg & TXG_MASK; + + mutex_enter(&tc->tc_lock); ASSERT(tc->tc_count[g] != 0); if (--tc->tc_count[g] == 0) cv_broadcast(&tc->tc_cv[g]); mutex_exit(&tc->tc_lock); +} + +void +txg_rele_to_sync(txg_handle_t *th) +{ + txg_exit(th->th_cpu, th->th_txg); th->th_cpu = NULL; /* defensive */ } static void +txg_wait_exit(tx_state_t *tx, uint64_t txg) +{ + int g = txg & TXG_MASK; + int c; + + for (c = 0; c < max_ncpus; c++) { + tx_cpu_t *tc = &tx->tx_cpu[c]; + mutex_enter(&tc->tc_lock); + while (tc->tc_count[g] != 0) + cv_wait(&tc->tc_cv[g], &tc->tc_lock); + mutex_exit(&tc->tc_lock); + } +} + +static void txg_quiesce(dsl_pool_t *dp, uint64_t txg) { tx_state_t *tx = &dp->dp_tx; - int g = txg & TXG_MASK; int c; /* @@ -269,12 +308,60 @@ txg_quiesce(dsl_pool_t *dp, uint64_t txg) /* * Quiesce the transaction group by waiting for everyone to txg_exit(). */ + txg_wait_exit(tx, txg); +} + +static void +txg_callback(tx_cb_t *tcb) +{ + tx_cpu_t *tc = tcb->tcb_tc; + int g = tcb->tcb_txg & TXG_MASK; + + dmu_tx_callback(&tc->tc_callbacks[g], 0); + + txg_exit(tc, tcb->tcb_txg); + + kmem_free(tcb, sizeof (tx_cb_t)); +} + +/* + * Dispatch the commit callbacks registered on this txg to worker threads. + */ +static void +txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) +{ + int c; + tx_state_t *tx = &dp->dp_tx; + tx_cb_t *tcb; + for (c = 0; c < max_ncpus; c++) { tx_cpu_t *tc = &tx->tx_cpu[c]; - mutex_enter(&tc->tc_lock); - while (tc->tc_count[g] != 0) - cv_wait(&tc->tc_cv[g], &tc->tc_lock); - mutex_exit(&tc->tc_lock); + /* No need to lock tx_cpu_t at this point */ + + int g = txg & TXG_MASK; + + if (list_is_empty(&tc->tc_callbacks[g])) + continue; + + if (tx->tx_commit_cb_taskq == NULL) { + /* + * Commit callback taskq hasn't been created yet. + */ + tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", + max_ncpus, minclsyspri, max_ncpus, max_ncpus * 4, + TASKQ_PREPOPULATE); + } + + tcb = kmem_alloc(sizeof (tx_cb_t), KM_SLEEP); + tcb->tcb_txg = txg; + tcb->tcb_tc = tc; + + /* There shouldn't be any holders on this txg at this point */ + ASSERT3U(tc->tc_count[g], ==, 0); + tc->tc_count[g]++; + + (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *) + txg_callback, tcb, TQ_SLEEP); } } @@ -345,6 +432,13 @@ txg_sync_thread(dsl_pool_t *dp) spa_sync(dp->dp_spa, txg); delta = lbolt - start; + /* + * Dispatch commit callbacks to worker threads and wait for + * them to finish. + */ + txg_dispatch_callbacks(dp, txg); + txg_wait_exit(tx, txg); + mutex_enter(&tx->tx_sync_lock); rw_enter(&tx->tx_suspend, RW_WRITER); tx->tx_synced_txg = txg; |