summaryrefslogtreecommitdiffstats
path: root/module
diff options
context:
space:
mode:
authorBrian Behlendorf <[email protected]>2009-03-19 20:59:12 -0700
committerBrian Behlendorf <[email protected]>2009-03-19 20:59:12 -0700
commiteb811f7fa91e495862a7f86525841b9f682f1715 (patch)
treef1d09f593fadee2b92514cfe8a6e60499c8a3acf /module
parenta71161249ab773b526cc316cf84cfbe44cf1e245 (diff)
parentc89153393fb221e5f6073674cf4660b0bdb44be2 (diff)
Merge branch 'feature-branch' into refs/top-bases/zfs-branch
Conflicts: module/zfs/include/sys/dmu_tx.h module/zfs/include/sys/txg.h
Diffstat (limited to 'module')
-rw-r--r--module/zfs/dmu_tx.c43
-rw-r--r--module/zfs/include/sys/dmu.h20
-rw-r--r--module/zfs/include/sys/dmu_tx.h12
-rw-r--r--module/zfs/include/sys/txg.h3
-rw-r--r--module/zfs/include/sys/txg_impl.h8
-rw-r--r--module/zfs/txg.c108
6 files changed, 183 insertions, 11 deletions
diff --git a/module/zfs/dmu_tx.c b/module/zfs/dmu_tx.c
index 18a640d6d..bd4548741 100644
--- a/module/zfs/dmu_tx.c
+++ b/module/zfs/dmu_tx.c
@@ -48,6 +48,8 @@ dmu_tx_create_dd(dsl_dir_t *dd)
tx->tx_pool = dd->dd_pool;
list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t),
offsetof(dmu_tx_hold_t, txh_node));
+ list_create(&tx->tx_callbacks, sizeof (dmu_tx_callback_t),
+ offsetof(dmu_tx_callback_t, dcb_node));
#ifdef ZFS_DEBUG
refcount_create(&tx->tx_space_written);
refcount_create(&tx->tx_space_freed);
@@ -1020,8 +1022,13 @@ dmu_tx_commit(dmu_tx_t *tx)
if (tx->tx_tempreserve_cookie)
dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx);
+ if (!list_is_empty(&tx->tx_callbacks))
+ txg_register_callbacks(&tx->tx_txgh, &tx->tx_callbacks);
+
if (tx->tx_anyobj == FALSE)
txg_rele_to_sync(&tx->tx_txgh);
+
+ list_destroy(&tx->tx_callbacks);
list_destroy(&tx->tx_holds);
#ifdef ZFS_DEBUG
dprintf("towrite=%llu written=%llu tofree=%llu freed=%llu\n",
@@ -1050,6 +1057,14 @@ dmu_tx_abort(dmu_tx_t *tx)
if (dn != NULL)
dnode_rele(dn, tx);
}
+
+ /*
+ * Call any registered callbacks with an error code.
+ */
+ if (!list_is_empty(&tx->tx_callbacks))
+ dmu_tx_callback(&tx->tx_callbacks, ECANCELED);
+
+ list_destroy(&tx->tx_callbacks);
list_destroy(&tx->tx_holds);
#ifdef ZFS_DEBUG
refcount_destroy_many(&tx->tx_space_written,
@@ -1066,3 +1081,31 @@ dmu_tx_get_txg(dmu_tx_t *tx)
ASSERT(tx->tx_txg != 0);
return (tx->tx_txg);
}
+
+void
+dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *func, void *data)
+{
+ dmu_tx_callback_t *dcb;
+
+ dcb = kmem_alloc(sizeof (dmu_tx_callback_t), KM_SLEEP);
+
+ dcb->dcb_func = func;
+ dcb->dcb_data = data;
+
+ list_insert_tail(&tx->tx_callbacks, dcb);
+}
+
+/*
+ * Call all the commit callbacks on a list, with a given error code.
+ */
+void
+dmu_tx_callback(list_t *cb_list, int error)
+{
+ dmu_tx_callback_t *dcb;
+
+ while (dcb = list_head(cb_list)) {
+ list_remove(cb_list, dcb);
+ dcb->dcb_func(dcb->dcb_data, error);
+ kmem_free(dcb, sizeof (dmu_tx_callback_t));
+ }
+}
diff --git a/module/zfs/include/sys/dmu.h b/module/zfs/include/sys/dmu.h
index 3b1e5c8fb..392431a18 100644
--- a/module/zfs/include/sys/dmu.h
+++ b/module/zfs/include/sys/dmu.h
@@ -430,6 +430,26 @@ void dmu_tx_wait(dmu_tx_t *tx);
void dmu_tx_commit(dmu_tx_t *tx);
/*
+ * To register a commit callback, dmu_tx_callback_register() must be called.
+ *
+ * dcb_data is a pointer to caller private data that is passed on as a
+ * callback parameter. The caller is responsible for properly allocating and
+ * freeing it.
+ *
+ * When registering a callback, the transaction must be already created, but
+ * it cannot be committed or aborted. It can be assigned to a txg or not.
+ *
+ * The callback will be called after the transaction has been safely written
+ * to stable storage and will also be called if the dmu_tx is aborted.
+ * If there is any error which prevents the transaction from being committed to
+ * disk, the callback will be called with a value of error != 0.
+ */
+typedef void dmu_tx_callback_func_t(void *dcb_data, int error);
+
+void dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *dcb_func,
+ void *dcb_data);
+
+/*
* Free up the data blocks for a defined range of a file. If size is
* zero, the range from offset to end-of-file is freed.
*/
diff --git a/module/zfs/include/sys/dmu_tx.h b/module/zfs/include/sys/dmu_tx.h
index 2fc1fee17..7fcab936f 100644
--- a/module/zfs/include/sys/dmu_tx.h
+++ b/module/zfs/include/sys/dmu_tx.h
@@ -26,8 +26,6 @@
#ifndef _SYS_DMU_TX_H
#define _SYS_DMU_TX_H
-
-
#include <sys/inttypes.h>
#include <sys/dmu.h>
#include <sys/txg.h>
@@ -59,6 +57,7 @@ struct dmu_tx {
txg_handle_t tx_txgh;
void *tx_tempreserve_cookie;
struct dmu_tx_hold *tx_needassign_txh;
+ list_t tx_callbacks; /* list of dmu_tx_callback_t on this dmu_tx */
uint8_t tx_anyobj;
int tx_err;
#ifdef ZFS_DEBUG
@@ -98,6 +97,11 @@ typedef struct dmu_tx_hold {
#endif
} dmu_tx_hold_t;
+typedef struct dmu_tx_callback {
+ list_node_t dcb_node; /* linked to tx_callbacks list */
+ dmu_tx_callback_func_t *dcb_func; /* caller function pointer */
+ void *dcb_data; /* caller private data */
+} dmu_tx_callback_t;
/*
* These routines are defined in dmu.h, and are called by the user.
@@ -109,6 +113,10 @@ void dmu_tx_abort(dmu_tx_t *tx);
uint64_t dmu_tx_get_txg(dmu_tx_t *tx);
void dmu_tx_wait(dmu_tx_t *tx);
+void dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *dcb_func,
+ void *dcb_data);
+void dmu_tx_callback(list_t *cb_list, int error);
+
/*
* These routines are defined in dmu_spa.h, and are called by the SPA.
*/
diff --git a/module/zfs/include/sys/txg.h b/module/zfs/include/sys/txg.h
index ccc2cc5c6..e679898db 100644
--- a/module/zfs/include/sys/txg.h
+++ b/module/zfs/include/sys/txg.h
@@ -26,8 +26,6 @@
#ifndef _SYS_TXG_H
#define _SYS_TXG_H
-
-
#include <sys/spa.h>
#include <sys/zfs_context.h>
@@ -71,6 +69,7 @@ extern void txg_sync_stop(struct dsl_pool *dp);
extern uint64_t txg_hold_open(struct dsl_pool *dp, txg_handle_t *txghp);
extern void txg_rele_to_quiesce(txg_handle_t *txghp);
extern void txg_rele_to_sync(txg_handle_t *txghp);
+extern void txg_register_callbacks(txg_handle_t *txghp, list_t *tx_callbacks);
extern void txg_suspend(struct dsl_pool *dp);
extern void txg_resume(struct dsl_pool *dp);
diff --git a/module/zfs/include/sys/txg_impl.h b/module/zfs/include/sys/txg_impl.h
index 7413c662b..bc7d7c7e5 100644
--- a/module/zfs/include/sys/txg_impl.h
+++ b/module/zfs/include/sys/txg_impl.h
@@ -33,10 +33,16 @@
extern "C" {
#endif
+typedef struct tx_cb {
+ tx_cpu_t *tcb_tc;
+ uint64_t tcb_txg;
+} tx_cb_t;
+
struct tx_cpu {
kmutex_t tc_lock;
kcondvar_t tc_cv[TXG_SIZE];
uint64_t tc_count[TXG_SIZE];
+ list_t tc_callbacks[TXG_SIZE]; /* commit cb list */
char tc_pad[16];
};
@@ -64,6 +70,8 @@ typedef struct tx_state {
kthread_t *tx_sync_thread;
kthread_t *tx_quiesce_thread;
+
+ taskq_t *tx_commit_cb_taskq; /* commit callback taskq */
} tx_state_t;
#ifdef __cplusplus
diff --git a/module/zfs/txg.c b/module/zfs/txg.c
index d87b053ed..e0bc524a3 100644
--- a/module/zfs/txg.c
+++ b/module/zfs/txg.c
@@ -26,6 +26,7 @@
#include <sys/zfs_context.h>
#include <sys/txg_impl.h>
#include <sys/dmu_impl.h>
+#include <sys/dmu_tx.h>
#include <sys/dsl_pool.h>
#include <sys/callb.h>
@@ -57,6 +58,9 @@ txg_init(dsl_pool_t *dp, uint64_t txg)
for (i = 0; i < TXG_SIZE; i++) {
cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
NULL);
+ list_create(&tx->tx_cpu[c].tc_callbacks[i],
+ sizeof (dmu_tx_callback_t),
+ offsetof(dmu_tx_callback_t, dcb_node));
}
}
@@ -96,10 +100,15 @@ txg_fini(dsl_pool_t *dp)
int i;
mutex_destroy(&tx->tx_cpu[c].tc_lock);
- for (i = 0; i < TXG_SIZE; i++)
+ for (i = 0; i < TXG_SIZE; i++) {
cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
+ list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
+ }
}
+ if (tx->tx_commit_cb_taskq != NULL)
+ taskq_destroy(tx->tx_commit_cb_taskq);
+
kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
bzero(tx, sizeof (tx_state_t));
@@ -229,25 +238,55 @@ txg_rele_to_quiesce(txg_handle_t *th)
}
void
-txg_rele_to_sync(txg_handle_t *th)
+txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
{
tx_cpu_t *tc = th->th_cpu;
int g = th->th_txg & TXG_MASK;
mutex_enter(&tc->tc_lock);
+ list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
+ mutex_exit(&tc->tc_lock);
+}
+
+static void
+txg_exit(tx_cpu_t *tc, uint64_t txg)
+{
+ int g = txg & TXG_MASK;
+
+ mutex_enter(&tc->tc_lock);
ASSERT(tc->tc_count[g] != 0);
if (--tc->tc_count[g] == 0)
cv_broadcast(&tc->tc_cv[g]);
mutex_exit(&tc->tc_lock);
+}
+
+void
+txg_rele_to_sync(txg_handle_t *th)
+{
+ txg_exit(th->th_cpu, th->th_txg);
th->th_cpu = NULL; /* defensive */
}
static void
+txg_wait_exit(tx_state_t *tx, uint64_t txg)
+{
+ int g = txg & TXG_MASK;
+ int c;
+
+ for (c = 0; c < max_ncpus; c++) {
+ tx_cpu_t *tc = &tx->tx_cpu[c];
+ mutex_enter(&tc->tc_lock);
+ while (tc->tc_count[g] != 0)
+ cv_wait(&tc->tc_cv[g], &tc->tc_lock);
+ mutex_exit(&tc->tc_lock);
+ }
+}
+
+static void
txg_quiesce(dsl_pool_t *dp, uint64_t txg)
{
tx_state_t *tx = &dp->dp_tx;
- int g = txg & TXG_MASK;
int c;
/*
@@ -269,12 +308,60 @@ txg_quiesce(dsl_pool_t *dp, uint64_t txg)
/*
* Quiesce the transaction group by waiting for everyone to txg_exit().
*/
+ txg_wait_exit(tx, txg);
+}
+
+static void
+txg_callback(tx_cb_t *tcb)
+{
+ tx_cpu_t *tc = tcb->tcb_tc;
+ int g = tcb->tcb_txg & TXG_MASK;
+
+ dmu_tx_callback(&tc->tc_callbacks[g], 0);
+
+ txg_exit(tc, tcb->tcb_txg);
+
+ kmem_free(tcb, sizeof (tx_cb_t));
+}
+
+/*
+ * Dispatch the commit callbacks registered on this txg to worker threads.
+ */
+static void
+txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
+{
+ int c;
+ tx_state_t *tx = &dp->dp_tx;
+ tx_cb_t *tcb;
+
for (c = 0; c < max_ncpus; c++) {
tx_cpu_t *tc = &tx->tx_cpu[c];
- mutex_enter(&tc->tc_lock);
- while (tc->tc_count[g] != 0)
- cv_wait(&tc->tc_cv[g], &tc->tc_lock);
- mutex_exit(&tc->tc_lock);
+ /* No need to lock tx_cpu_t at this point */
+
+ int g = txg & TXG_MASK;
+
+ if (list_is_empty(&tc->tc_callbacks[g]))
+ continue;
+
+ if (tx->tx_commit_cb_taskq == NULL) {
+ /*
+ * Commit callback taskq hasn't been created yet.
+ */
+ tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
+ max_ncpus, minclsyspri, max_ncpus, max_ncpus * 4,
+ TASKQ_PREPOPULATE);
+ }
+
+ tcb = kmem_alloc(sizeof (tx_cb_t), KM_SLEEP);
+ tcb->tcb_txg = txg;
+ tcb->tcb_tc = tc;
+
+ /* There shouldn't be any holders on this txg at this point */
+ ASSERT3U(tc->tc_count[g], ==, 0);
+ tc->tc_count[g]++;
+
+ (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
+ txg_callback, tcb, TQ_SLEEP);
}
}
@@ -345,6 +432,13 @@ txg_sync_thread(dsl_pool_t *dp)
spa_sync(dp->dp_spa, txg);
delta = lbolt - start;
+ /*
+ * Dispatch commit callbacks to worker threads and wait for
+ * them to finish.
+ */
+ txg_dispatch_callbacks(dp, txg);
+ txg_wait_exit(tx, txg);
+
mutex_enter(&tx->tx_sync_lock);
rw_enter(&tx->tx_suspend, RW_WRITER);
tx->tx_synced_txg = txg;