summaryrefslogtreecommitdiffstats
path: root/module/zfs
diff options
context:
space:
mode:
Diffstat (limited to 'module/zfs')
-rw-r--r--module/zfs/dmu_tx.c61
-rw-r--r--module/zfs/include/sys/dmu.h27
-rw-r--r--module/zfs/include/sys/dmu_impl.h13
-rw-r--r--module/zfs/include/sys/dmu_tx.h6
-rw-r--r--module/zfs/include/sys/txg.h1
-rw-r--r--module/zfs/include/sys/txg_impl.h1
-rw-r--r--module/zfs/txg.c84
7 files changed, 191 insertions, 2 deletions
diff --git a/module/zfs/dmu_tx.c b/module/zfs/dmu_tx.c
index bf560e565..ea065951f 100644
--- a/module/zfs/dmu_tx.c
+++ b/module/zfs/dmu_tx.c
@@ -48,6 +48,8 @@ dmu_tx_create_dd(dsl_dir_t *dd)
tx->tx_pool = dd->dd_pool;
list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t),
offsetof(dmu_tx_hold_t, txh_node));
+ list_create(&tx->tx_callbacks, sizeof (dmu_callback_t),
+ offsetof(dmu_callback_t, dcb_node));
#ifdef ZFS_DEBUG
refcount_create(&tx->tx_space_written);
refcount_create(&tx->tx_space_freed);
@@ -1020,6 +1022,9 @@ dmu_tx_commit(dmu_tx_t *tx)
if (tx->tx_tempreserve_cookie)
dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx);
+ if (!list_is_empty(&tx->tx_callbacks))
+ txg_rele_commit_cb(&tx->tx_txgh, &tx->tx_callbacks);
+
if (tx->tx_anyobj == FALSE)
txg_rele_to_sync(&tx->tx_txgh);
list_destroy(&tx->tx_holds);
@@ -1032,6 +1037,8 @@ dmu_tx_commit(dmu_tx_t *tx)
refcount_destroy_many(&tx->tx_space_freed,
refcount_count(&tx->tx_space_freed));
#endif
+ ASSERT(list_is_empty(&tx->tx_callbacks));
+ list_destroy(&tx->tx_callbacks);
kmem_free(tx, sizeof (dmu_tx_t));
}
@@ -1039,6 +1046,7 @@ void
dmu_tx_abort(dmu_tx_t *tx)
{
dmu_tx_hold_t *txh;
+ dmu_callback_t *dcb;
ASSERT(tx->tx_txg == 0);
@@ -1050,6 +1058,16 @@ dmu_tx_abort(dmu_tx_t *tx)
if (dn != NULL)
dnode_rele(dn, tx);
}
+
+ while ((dcb = list_head(&tx->tx_callbacks))) {
+ list_remove(&tx->tx_callbacks, dcb);
+
+ /*
+ * Call the callback with an error code. The callback will
+ * call dmu_tx_callback_data_destroy to free the memory.
+ */
+ dcb->dcb_func(dcb->dcb_data, ECANCELED);
+ }
list_destroy(&tx->tx_holds);
#ifdef ZFS_DEBUG
refcount_destroy_many(&tx->tx_space_written,
@@ -1057,6 +1075,7 @@ dmu_tx_abort(dmu_tx_t *tx)
refcount_destroy_many(&tx->tx_space_freed,
refcount_count(&tx->tx_space_freed));
#endif
+ list_destroy(&tx->tx_callbacks);
kmem_free(tx, sizeof (dmu_tx_t));
}
@@ -1066,3 +1085,45 @@ dmu_tx_get_txg(dmu_tx_t *tx)
ASSERT(tx->tx_txg != 0);
return (tx->tx_txg);
}
+
+void *
+dmu_tx_callback_data_create(size_t bytes)
+{
+ dmu_callback_t *dcb;
+
+ dcb = kmem_alloc(sizeof (dmu_callback_t) + bytes, KM_SLEEP);
+
+ dcb->dcb_magic = DMU_CALLBACK_MAGIC;
+ dcb->dcb_bytes = bytes;
+
+ return &dcb->dcb_data;
+}
+
+int
+dmu_tx_callback_commit_add(dmu_tx_t *tx, dmu_callback_func_t *dcb_func,
+ void *dcb_data)
+{
+ dmu_callback_t *dcb = container_of(dcb_data, dmu_callback_t, dcb_data);
+
+ if (dcb->dcb_magic != DMU_CALLBACK_MAGIC)
+ return (EINVAL);
+
+ dcb->dcb_func = dcb_func;
+
+ list_insert_tail(&tx->tx_callbacks, dcb);
+
+ return (0);
+}
+
+int
+dmu_tx_callback_data_destroy(void *dcb_data)
+{
+ dmu_callback_t *dcb = container_of(dcb_data, dmu_callback_t, dcb_data);
+
+ if (dcb->dcb_magic != DMU_CALLBACK_MAGIC)
+ return (EINVAL);
+
+ kmem_free(dcb, sizeof (dmu_callback_t) + dcb->dcb_bytes);
+
+ return (0);
+}
diff --git a/module/zfs/include/sys/dmu.h b/module/zfs/include/sys/dmu.h
index 3b1e5c8fb..e4bcdfbe0 100644
--- a/module/zfs/include/sys/dmu.h
+++ b/module/zfs/include/sys/dmu.h
@@ -64,6 +64,7 @@ struct objset_impl;
typedef struct objset objset_t;
typedef struct dmu_tx dmu_tx_t;
typedef struct dsl_dir dsl_dir_t;
+typedef void dmu_callback_func_t(void *dcb_data, int error);
typedef enum dmu_object_type {
DMU_OT_NONE,
@@ -430,6 +431,32 @@ void dmu_tx_wait(dmu_tx_t *tx);
void dmu_tx_commit(dmu_tx_t *tx);
/*
+ * To add a commit callback, you must first call dmu_tx_callback_data_create().
+ * This will return a pointer to a memory area of size "bytes" (which can be 0,
+ * or just the size of a pointer if there is a large or existing external data
+ * struct to be referenced) that the caller and the callback can use to exchange
+ * data.
+ *
+ * The callback can then be registered by calling dmu_tx_callback_commit_add()
+ * with the pointer returned by dmu_tx_callback_data_create() passed in the
+ * dcb_data argument. The transaction must be already created, but it cannot
+ * be committed or aborted. It can be assigned to a txg or not.
+ *
+ * The callback will be called after the transaction has been safely written
+ * to stable storage and will also be called if the dmu_tx is aborted.
+ * If there is any error which prevents the transaction from being committed
+ * to disk, the callback will be called with a value of error != 0.
+ *
+ * When the callback data is no longer needed, it must be destroyed by the
+ * caller's code with dmu_tx_callback_data_destroy(). This is typically done at
+ * the end of the callback function.
+ */
+void *dmu_tx_callback_data_create(size_t bytes);
+int dmu_tx_callback_commit_add(dmu_tx_t *tx, dmu_callback_func_t *dcb_func,
+ void *dcb_data);
+int dmu_tx_callback_data_destroy(void *dcb_data);
+
+/*
* Free up the data blocks for a defined range of a file. If size is
* zero, the range from offset to end-of-file is freed.
*/
diff --git a/module/zfs/include/sys/dmu_impl.h b/module/zfs/include/sys/dmu_impl.h
index 96ce688e1..f32ab6ad7 100644
--- a/module/zfs/include/sys/dmu_impl.h
+++ b/module/zfs/include/sys/dmu_impl.h
@@ -232,6 +232,19 @@ extern "C" {
struct objset;
struct dmu_pool;
+#define DMU_CALLBACK_MAGIC 0xca11bac0ca11bacfull
+
+#define container_of(ptr, type, member) \
+ ((type *)((char *)(ptr) - offsetof(type, member)))
+
+typedef struct dmu_callback {
+ list_node_t dcb_node; /* linked to tx_callbacks list */
+ uint64_t dcb_magic; /* magic number to verify header */
+ dmu_callback_func_t *dcb_func; /* caller function pointer */
+ size_t dcb_bytes; /* caller private data size */
+ char dcb_data[0]; /* caller private data */
+} dmu_callback_t;
+
#ifdef __cplusplus
}
#endif
diff --git a/module/zfs/include/sys/dmu_tx.h b/module/zfs/include/sys/dmu_tx.h
index 2727daaaa..47b9bcaa5 100644
--- a/module/zfs/include/sys/dmu_tx.h
+++ b/module/zfs/include/sys/dmu_tx.h
@@ -59,6 +59,7 @@ struct dmu_tx {
txg_handle_t tx_txgh;
void *tx_tempreserve_cookie;
struct dmu_tx_hold *tx_needassign_txh;
+ list_t tx_callbacks; /* list of dmu_callback_t on this dmu_tx */
uint8_t tx_anyobj;
int tx_err;
#ifdef ZFS_DEBUG
@@ -109,6 +110,11 @@ void dmu_tx_abort(dmu_tx_t *tx);
uint64_t dmu_tx_get_txg(dmu_tx_t *tx);
void dmu_tx_wait(dmu_tx_t *tx);
+void *dmu_tx_callback_data_create(size_t bytes);
+int dmu_tx_callback_commit_add(dmu_tx_t *tx, dmu_callback_func_t *dcb_func,
+ void *dcb_data);
+int dmu_tx_callback_data_destroy(void *dcb_data);
+
/*
* These routines are defined in dmu_spa.h, and are called by the SPA.
*/
diff --git a/module/zfs/include/sys/txg.h b/module/zfs/include/sys/txg.h
index 23bdff211..1349bd4dc 100644
--- a/module/zfs/include/sys/txg.h
+++ b/module/zfs/include/sys/txg.h
@@ -71,6 +71,7 @@ extern void txg_sync_stop(struct dsl_pool *dp);
extern uint64_t txg_hold_open(struct dsl_pool *dp, txg_handle_t *txghp);
extern void txg_rele_to_quiesce(txg_handle_t *txghp);
extern void txg_rele_to_sync(txg_handle_t *txghp);
+extern void txg_rele_commit_cb(txg_handle_t *txghp, list_t *tx_callbacks);
extern void txg_suspend(struct dsl_pool *dp);
extern void txg_resume(struct dsl_pool *dp);
diff --git a/module/zfs/include/sys/txg_impl.h b/module/zfs/include/sys/txg_impl.h
index 7413c662b..a9a7c358b 100644
--- a/module/zfs/include/sys/txg_impl.h
+++ b/module/zfs/include/sys/txg_impl.h
@@ -37,6 +37,7 @@ struct tx_cpu {
kmutex_t tc_lock;
kcondvar_t tc_cv[TXG_SIZE];
uint64_t tc_count[TXG_SIZE];
+ list_t tc_callbacks[TXG_SIZE]; /* post-commit callbacks */
char tc_pad[16];
};
diff --git a/module/zfs/txg.c b/module/zfs/txg.c
index 2bbf2f086..f3b0fc92e 100644
--- a/module/zfs/txg.c
+++ b/module/zfs/txg.c
@@ -26,6 +26,7 @@
#include <sys/zfs_context.h>
#include <sys/txg_impl.h>
#include <sys/dmu_impl.h>
+#include <sys/dmu_tx.h>
#include <sys/dsl_pool.h>
#include <sys/callb.h>
@@ -57,6 +58,9 @@ txg_init(dsl_pool_t *dp, uint64_t txg)
for (i = 0; i < TXG_SIZE; i++) {
cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
NULL);
+ list_create(&tx->tx_cpu[c].tc_callbacks[i],
+ sizeof (dmu_callback_t), offsetof(dmu_callback_t,
+ dcb_node));
}
}
@@ -84,8 +88,11 @@ txg_fini(dsl_pool_t *dp)
int i;
mutex_destroy(&tx->tx_cpu[c].tc_lock);
- for (i = 0; i < TXG_SIZE; i++)
+ for (i = 0; i < TXG_SIZE; i++) {
cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
+ ASSERT(list_is_empty(&tx->tx_cpu[c].tc_callbacks[i]));
+ list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
+ }
}
kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
@@ -231,6 +238,21 @@ txg_rele_to_sync(txg_handle_t *th)
th->th_cpu = NULL; /* defensive */
}
+void
+txg_rele_commit_cb(txg_handle_t *th, list_t *tx_callbacks)
+{
+ dmu_callback_t *dcb;
+ tx_cpu_t *tc = th->th_cpu;
+ int g = th->th_txg & TXG_MASK;
+
+ mutex_enter(&tc->tc_lock);
+ while ((dcb = list_head(tx_callbacks))) {
+ list_remove(tx_callbacks, dcb);
+ list_insert_tail(&tc->tc_callbacks[g], dcb);
+ }
+ mutex_exit(&tc->tc_lock);
+}
+
static void
txg_quiesce(dsl_pool_t *dp, uint64_t txg)
{
@@ -271,7 +293,8 @@ txg_sync_thread(dsl_pool_t *dp)
{
tx_state_t *tx = &dp->dp_tx;
callb_cpr_t cpr;
- uint64_t start, delta;
+ uint64_t timeout, start, delta, timer;
+ int c, target;
txg_thread_enter(tx, &cpr);
@@ -333,6 +356,63 @@ txg_sync_thread(dsl_pool_t *dp)
spa_sync(dp->dp_spa, txg);
delta = lbolt - start;
+ /*
+ * Call all the callbacks for this txg. The callbacks must
+ * call dmu_tx_callback_data_destroy to free memory.
+ */
+ for (c = 0; c < max_ncpus; c++) {
+ dmu_callback_t *dcb;
+ tx_cpu_t *tc = &tx->tx_cpu[c];
+ int g = txg & TXG_MASK;
+ /* No need to lock tx_cpu_t */
+
+ while ((dcb = list_head(&tc->tc_callbacks[g]))) {
+ list_remove(&tc->tc_callbacks[g], dcb);
+ dcb->dcb_func(dcb->dcb_data, 0);
+ }
+ }
+
+ written = dp->dp_space_towrite[txg & TXG_MASK];
+ dp->dp_space_towrite[txg & TXG_MASK] = 0;
+ ASSERT(dp->dp_tempreserved[txg & TXG_MASK] == 0);
+
+ /*
+ * If the write limit max has not been explicitly set, set it
+ * to a fraction of available phisical memory (default 1/8th).
+ * Note that we must inflate the limit because the spa
+ * inflates write sizes to account for data replication.
+ * Check this each sync phase to catch changing memory size.
+ */
+ if (zfs_write_limit_inflated == 0 ||
+ (zfs_write_limit_shift && zfs_write_limit_max !=
+ physmem * PAGESIZE >> zfs_write_limit_shift)) {
+ zfs_write_limit_max =
+ physmem * PAGESIZE >> zfs_write_limit_shift;
+ zfs_write_limit_inflated =
+ spa_get_asize(dp->dp_spa, zfs_write_limit_max);
+ if (zfs_write_limit_min > zfs_write_limit_inflated)
+ zfs_write_limit_inflated = zfs_write_limit_min;
+ }
+
+ /*
+ * Attempt to keep the sync time consistant by adjusting the
+ * amount of write traffic allowed into each transaction group.
+ */
+ target = zfs_txg_synctime * hz;
+ if (delta > target) {
+ uint64_t old = MIN(dp->dp_write_limit, written);
+
+ dp->dp_write_limit = MAX(zfs_write_limit_min,
+ old * target / delta);
+ } else if (written >= dp->dp_write_limit &&
+ delta >> 3 < target >> 3) {
+ uint64_t rescale =
+ MIN((100 * target) / delta, 200);
+
+ dp->dp_write_limit = MIN(zfs_write_limit_inflated,
+ written * rescale / 100);
+ }
+
mutex_enter(&tx->tx_sync_lock);
rw_enter(&tx->tx_suspend, RW_WRITER);
tx->tx_synced_txg = txg;