summaryrefslogtreecommitdiffstats
path: root/module/zfs/txg.c
diff options
context:
space:
mode:
Diffstat (limited to 'module/zfs/txg.c')
-rw-r--r--module/zfs/txg.c707
1 files changed, 707 insertions, 0 deletions
diff --git a/module/zfs/txg.c b/module/zfs/txg.c
new file mode 100644
index 000000000..f3b0fc92e
--- /dev/null
+++ b/module/zfs/txg.c
@@ -0,0 +1,707 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License (the "License").
+ * You may not use this file except in compliance with the License.
+ *
+ * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
+ * or http://www.opensolaris.org/os/licensing.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information: Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
+ * Use is subject to license terms.
+ */
+
+#include <sys/zfs_context.h>
+#include <sys/txg_impl.h>
+#include <sys/dmu_impl.h>
+#include <sys/dmu_tx.h>
+#include <sys/dsl_pool.h>
+#include <sys/callb.h>
+
+/*
+ * Pool-wide transaction groups.
+ */
+
+static void txg_sync_thread(dsl_pool_t *dp);
+static void txg_quiesce_thread(dsl_pool_t *dp);
+
+int zfs_txg_timeout = 30; /* max seconds worth of delta per txg */
+
+/*
+ * Prepare the txg subsystem.
+ */
+void
+txg_init(dsl_pool_t *dp, uint64_t txg)
+{
+ tx_state_t *tx = &dp->dp_tx;
+ int c;
+ bzero(tx, sizeof (tx_state_t));
+
+ tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
+
+ for (c = 0; c < max_ncpus; c++) {
+ int i;
+
+ mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
+ for (i = 0; i < TXG_SIZE; i++) {
+ cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
+ NULL);
+ list_create(&tx->tx_cpu[c].tc_callbacks[i],
+ sizeof (dmu_callback_t), offsetof(dmu_callback_t,
+ dcb_node));
+ }
+ }
+
+ rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
+ mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
+
+ tx->tx_open_txg = txg;
+}
+
+/*
+ * Close down the txg subsystem.
+ */
+void
+txg_fini(dsl_pool_t *dp)
+{
+ tx_state_t *tx = &dp->dp_tx;
+ int c;
+
+ ASSERT(tx->tx_threads == 0);
+
+ rw_destroy(&tx->tx_suspend);
+ mutex_destroy(&tx->tx_sync_lock);
+
+ for (c = 0; c < max_ncpus; c++) {
+ int i;
+
+ mutex_destroy(&tx->tx_cpu[c].tc_lock);
+ for (i = 0; i < TXG_SIZE; i++) {
+ cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
+ ASSERT(list_is_empty(&tx->tx_cpu[c].tc_callbacks[i]));
+ list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
+ }
+ }
+
+ kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
+
+ bzero(tx, sizeof (tx_state_t));
+}
+
+/*
+ * Start syncing transaction groups.
+ */
+void
+txg_sync_start(dsl_pool_t *dp)
+{
+ tx_state_t *tx = &dp->dp_tx;
+
+ mutex_enter(&tx->tx_sync_lock);
+
+ dprintf("pool %p\n", dp);
+
+ ASSERT(tx->tx_threads == 0);
+
+ tx->tx_threads = 2;
+
+ tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
+ dp, 0, &p0, TS_RUN, minclsyspri);
+
+ /*
+ * The sync thread can need a larger-than-default stack size on
+ * 32-bit x86. This is due in part to nested pools and
+ * scrub_visitbp() recursion.
+ */
+ tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread,
+ dp, 0, &p0, TS_RUN, minclsyspri);
+
+ mutex_exit(&tx->tx_sync_lock);
+}
+
+static void
+txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
+{
+ CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
+ mutex_enter(&tx->tx_sync_lock);
+}
+
+static void
+txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
+{
+ ASSERT(*tpp != NULL);
+ *tpp = NULL;
+ tx->tx_threads--;
+ cv_broadcast(&tx->tx_exit_cv);
+ CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */
+ thread_exit();
+}
+
+static void
+txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
+{
+ CALLB_CPR_SAFE_BEGIN(cpr);
+
+ if (time)
+ (void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + time);
+ else
+ cv_wait(cv, &tx->tx_sync_lock);
+
+ CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
+}
+
+/*
+ * Stop syncing transaction groups.
+ */
+void
+txg_sync_stop(dsl_pool_t *dp)
+{
+ tx_state_t *tx = &dp->dp_tx;
+
+ dprintf("pool %p\n", dp);
+ /*
+ * Finish off any work in progress.
+ */
+ ASSERT(tx->tx_threads == 2);
+ txg_wait_synced(dp, 0);
+
+ /*
+ * Wake all sync threads and wait for them to die.
+ */
+ mutex_enter(&tx->tx_sync_lock);
+
+ ASSERT(tx->tx_threads == 2);
+
+ tx->tx_exiting = 1;
+
+ cv_broadcast(&tx->tx_quiesce_more_cv);
+ cv_broadcast(&tx->tx_quiesce_done_cv);
+ cv_broadcast(&tx->tx_sync_more_cv);
+
+ while (tx->tx_threads != 0)
+ cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
+
+ tx->tx_exiting = 0;
+
+ mutex_exit(&tx->tx_sync_lock);
+}
+
+uint64_t
+txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
+{
+ tx_state_t *tx = &dp->dp_tx;
+ tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
+ uint64_t txg;
+
+ mutex_enter(&tc->tc_lock);
+
+ txg = tx->tx_open_txg;
+ tc->tc_count[txg & TXG_MASK]++;
+
+ th->th_cpu = tc;
+ th->th_txg = txg;
+
+ return (txg);
+}
+
+void
+txg_rele_to_quiesce(txg_handle_t *th)
+{
+ tx_cpu_t *tc = th->th_cpu;
+
+ mutex_exit(&tc->tc_lock);
+}
+
+void
+txg_rele_to_sync(txg_handle_t *th)
+{
+ tx_cpu_t *tc = th->th_cpu;
+ int g = th->th_txg & TXG_MASK;
+
+ mutex_enter(&tc->tc_lock);
+ ASSERT(tc->tc_count[g] != 0);
+ if (--tc->tc_count[g] == 0)
+ cv_broadcast(&tc->tc_cv[g]);
+ mutex_exit(&tc->tc_lock);
+
+ th->th_cpu = NULL; /* defensive */
+}
+
+void
+txg_rele_commit_cb(txg_handle_t *th, list_t *tx_callbacks)
+{
+ dmu_callback_t *dcb;
+ tx_cpu_t *tc = th->th_cpu;
+ int g = th->th_txg & TXG_MASK;
+
+ mutex_enter(&tc->tc_lock);
+ while ((dcb = list_head(tx_callbacks))) {
+ list_remove(tx_callbacks, dcb);
+ list_insert_tail(&tc->tc_callbacks[g], dcb);
+ }
+ mutex_exit(&tc->tc_lock);
+}
+
+static void
+txg_quiesce(dsl_pool_t *dp, uint64_t txg)
+{
+ tx_state_t *tx = &dp->dp_tx;
+ int g = txg & TXG_MASK;
+ int c;
+
+ /*
+ * Grab all tx_cpu locks so nobody else can get into this txg.
+ */
+ for (c = 0; c < max_ncpus; c++)
+ mutex_enter(&tx->tx_cpu[c].tc_lock);
+
+ ASSERT(txg == tx->tx_open_txg);
+ tx->tx_open_txg++;
+
+ /*
+ * Now that we've incremented tx_open_txg, we can let threads
+ * enter the next transaction group.
+ */
+ for (c = 0; c < max_ncpus; c++)
+ mutex_exit(&tx->tx_cpu[c].tc_lock);
+
+ /*
+ * Quiesce the transaction group by waiting for everyone to txg_exit().
+ */
+ for (c = 0; c < max_ncpus; c++) {
+ tx_cpu_t *tc = &tx->tx_cpu[c];
+ mutex_enter(&tc->tc_lock);
+ while (tc->tc_count[g] != 0)
+ cv_wait(&tc->tc_cv[g], &tc->tc_lock);
+ mutex_exit(&tc->tc_lock);
+ }
+}
+
+static void
+txg_sync_thread(dsl_pool_t *dp)
+{
+ tx_state_t *tx = &dp->dp_tx;
+ callb_cpr_t cpr;
+ uint64_t timeout, start, delta, timer;
+ int c, target;
+
+ txg_thread_enter(tx, &cpr);
+
+ start = delta = 0;
+ for (;;) {
+ uint64_t timer, timeout = zfs_txg_timeout * hz;
+ uint64_t txg;
+
+ /*
+ * We sync when we're scrubbing, there's someone waiting
+ * on us, or the quiesce thread has handed off a txg to
+ * us, or we have reached our timeout.
+ */
+ timer = (delta >= timeout ? 0 : timeout - delta);
+ while ((dp->dp_scrub_func == SCRUB_FUNC_NONE ||
+ spa_shutting_down(dp->dp_spa)) &&
+ !tx->tx_exiting && timer > 0 &&
+ tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
+ tx->tx_quiesced_txg == 0) {
+ dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
+ tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
+ txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
+ delta = lbolt - start;
+ timer = (delta > timeout ? 0 : timeout - delta);
+ }
+
+ /*
+ * Wait until the quiesce thread hands off a txg to us,
+ * prompting it to do so if necessary.
+ */
+ while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
+ if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
+ tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
+ cv_broadcast(&tx->tx_quiesce_more_cv);
+ txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
+ }
+
+ if (tx->tx_exiting)
+ txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
+
+ rw_enter(&tx->tx_suspend, RW_WRITER);
+
+ /*
+ * Consume the quiesced txg which has been handed off to
+ * us. This may cause the quiescing thread to now be
+ * able to quiesce another txg, so we must signal it.
+ */
+ txg = tx->tx_quiesced_txg;
+ tx->tx_quiesced_txg = 0;
+ tx->tx_syncing_txg = txg;
+ cv_broadcast(&tx->tx_quiesce_more_cv);
+ rw_exit(&tx->tx_suspend);
+
+ dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
+ txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
+ mutex_exit(&tx->tx_sync_lock);
+
+ start = lbolt;
+ spa_sync(dp->dp_spa, txg);
+ delta = lbolt - start;
+
+ /*
+ * Call all the callbacks for this txg. The callbacks must
+ * call dmu_tx_callback_data_destroy to free memory.
+ */
+ for (c = 0; c < max_ncpus; c++) {
+ dmu_callback_t *dcb;
+ tx_cpu_t *tc = &tx->tx_cpu[c];
+ int g = txg & TXG_MASK;
+ /* No need to lock tx_cpu_t */
+
+ while ((dcb = list_head(&tc->tc_callbacks[g]))) {
+ list_remove(&tc->tc_callbacks[g], dcb);
+ dcb->dcb_func(dcb->dcb_data, 0);
+ }
+ }
+
+ written = dp->dp_space_towrite[txg & TXG_MASK];
+ dp->dp_space_towrite[txg & TXG_MASK] = 0;
+ ASSERT(dp->dp_tempreserved[txg & TXG_MASK] == 0);
+
+ /*
+ * If the write limit max has not been explicitly set, set it
+ * to a fraction of available phisical memory (default 1/8th).
+ * Note that we must inflate the limit because the spa
+ * inflates write sizes to account for data replication.
+ * Check this each sync phase to catch changing memory size.
+ */
+ if (zfs_write_limit_inflated == 0 ||
+ (zfs_write_limit_shift && zfs_write_limit_max !=
+ physmem * PAGESIZE >> zfs_write_limit_shift)) {
+ zfs_write_limit_max =
+ physmem * PAGESIZE >> zfs_write_limit_shift;
+ zfs_write_limit_inflated =
+ spa_get_asize(dp->dp_spa, zfs_write_limit_max);
+ if (zfs_write_limit_min > zfs_write_limit_inflated)
+ zfs_write_limit_inflated = zfs_write_limit_min;
+ }
+
+ /*
+ * Attempt to keep the sync time consistant by adjusting the
+ * amount of write traffic allowed into each transaction group.
+ */
+ target = zfs_txg_synctime * hz;
+ if (delta > target) {
+ uint64_t old = MIN(dp->dp_write_limit, written);
+
+ dp->dp_write_limit = MAX(zfs_write_limit_min,
+ old * target / delta);
+ } else if (written >= dp->dp_write_limit &&
+ delta >> 3 < target >> 3) {
+ uint64_t rescale =
+ MIN((100 * target) / delta, 200);
+
+ dp->dp_write_limit = MIN(zfs_write_limit_inflated,
+ written * rescale / 100);
+ }
+
+ mutex_enter(&tx->tx_sync_lock);
+ rw_enter(&tx->tx_suspend, RW_WRITER);
+ tx->tx_synced_txg = txg;
+ tx->tx_syncing_txg = 0;
+ rw_exit(&tx->tx_suspend);
+ cv_broadcast(&tx->tx_sync_done_cv);
+ }
+}
+
+static void
+txg_quiesce_thread(dsl_pool_t *dp)
+{
+ tx_state_t *tx = &dp->dp_tx;
+ callb_cpr_t cpr;
+
+ txg_thread_enter(tx, &cpr);
+
+ for (;;) {
+ uint64_t txg;
+
+ /*
+ * We quiesce when there's someone waiting on us.
+ * However, we can only have one txg in "quiescing" or
+ * "quiesced, waiting to sync" state. So we wait until
+ * the "quiesced, waiting to sync" txg has been consumed
+ * by the sync thread.
+ */
+ while (!tx->tx_exiting &&
+ (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
+ tx->tx_quiesced_txg != 0))
+ txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
+
+ if (tx->tx_exiting)
+ txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
+
+ txg = tx->tx_open_txg;
+ dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
+ txg, tx->tx_quiesce_txg_waiting,
+ tx->tx_sync_txg_waiting);
+ mutex_exit(&tx->tx_sync_lock);
+ txg_quiesce(dp, txg);
+ mutex_enter(&tx->tx_sync_lock);
+
+ /*
+ * Hand this txg off to the sync thread.
+ */
+ dprintf("quiesce done, handing off txg %llu\n", txg);
+ tx->tx_quiesced_txg = txg;
+ cv_broadcast(&tx->tx_sync_more_cv);
+ cv_broadcast(&tx->tx_quiesce_done_cv);
+ }
+}
+
+/*
+ * Delay this thread by 'ticks' if we are still in the open transaction
+ * group and there is already a waiting txg quiesing or quiesced. Abort
+ * the delay if this txg stalls or enters the quiesing state.
+ */
+void
+txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
+{
+ tx_state_t *tx = &dp->dp_tx;
+ int timeout = lbolt + ticks;
+
+ /* don't delay if this txg could transition to quiesing immediately */
+ if (tx->tx_open_txg > txg ||
+ tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
+ return;
+
+ mutex_enter(&tx->tx_sync_lock);
+ if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
+ mutex_exit(&tx->tx_sync_lock);
+ return;
+ }
+
+ while (lbolt < timeout &&
+ tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
+ (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
+ timeout);
+
+ mutex_exit(&tx->tx_sync_lock);
+}
+
+void
+txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
+{
+ tx_state_t *tx = &dp->dp_tx;
+
+ mutex_enter(&tx->tx_sync_lock);
+ ASSERT(tx->tx_threads == 2);
+ if (txg == 0)
+ txg = tx->tx_open_txg;
+ if (tx->tx_sync_txg_waiting < txg)
+ tx->tx_sync_txg_waiting = txg;
+ dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
+ txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
+ while (tx->tx_synced_txg < txg) {
+ dprintf("broadcasting sync more "
+ "tx_synced=%llu waiting=%llu dp=%p\n",
+ tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
+ cv_broadcast(&tx->tx_sync_more_cv);
+ cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
+ }
+ mutex_exit(&tx->tx_sync_lock);
+}
+
+void
+txg_wait_open(dsl_pool_t *dp, uint64_t txg)
+{
+ tx_state_t *tx = &dp->dp_tx;
+
+ mutex_enter(&tx->tx_sync_lock);
+ ASSERT(tx->tx_threads == 2);
+ if (txg == 0)
+ txg = tx->tx_open_txg + 1;
+ if (tx->tx_quiesce_txg_waiting < txg)
+ tx->tx_quiesce_txg_waiting = txg;
+ dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
+ txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
+ while (tx->tx_open_txg < txg) {
+ cv_broadcast(&tx->tx_quiesce_more_cv);
+ cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
+ }
+ mutex_exit(&tx->tx_sync_lock);
+}
+
+boolean_t
+txg_stalled(dsl_pool_t *dp)
+{
+ tx_state_t *tx = &dp->dp_tx;
+ return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
+}
+
+boolean_t
+txg_sync_waiting(dsl_pool_t *dp)
+{
+ tx_state_t *tx = &dp->dp_tx;
+
+ return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
+ tx->tx_quiesced_txg != 0);
+}
+
+void
+txg_suspend(dsl_pool_t *dp)
+{
+ tx_state_t *tx = &dp->dp_tx;
+ /* XXX some code paths suspend when they are already suspended! */
+ rw_enter(&tx->tx_suspend, RW_READER);
+}
+
+void
+txg_resume(dsl_pool_t *dp)
+{
+ tx_state_t *tx = &dp->dp_tx;
+ rw_exit(&tx->tx_suspend);
+}
+
+/*
+ * Per-txg object lists.
+ */
+void
+txg_list_create(txg_list_t *tl, size_t offset)
+{
+ int t;
+
+ mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
+
+ tl->tl_offset = offset;
+
+ for (t = 0; t < TXG_SIZE; t++)
+ tl->tl_head[t] = NULL;
+}
+
+void
+txg_list_destroy(txg_list_t *tl)
+{
+ int t;
+
+ for (t = 0; t < TXG_SIZE; t++)
+ ASSERT(txg_list_empty(tl, t));
+
+ mutex_destroy(&tl->tl_lock);
+}
+
+int
+txg_list_empty(txg_list_t *tl, uint64_t txg)
+{
+ return (tl->tl_head[txg & TXG_MASK] == NULL);
+}
+
+/*
+ * Add an entry to the list.
+ * Returns 0 if it's a new entry, 1 if it's already there.
+ */
+int
+txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
+{
+ int t = txg & TXG_MASK;
+ txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
+ int already_on_list;
+
+ mutex_enter(&tl->tl_lock);
+ already_on_list = tn->tn_member[t];
+ if (!already_on_list) {
+ tn->tn_member[t] = 1;
+ tn->tn_next[t] = tl->tl_head[t];
+ tl->tl_head[t] = tn;
+ }
+ mutex_exit(&tl->tl_lock);
+
+ return (already_on_list);
+}
+
+/*
+ * Remove the head of the list and return it.
+ */
+void *
+txg_list_remove(txg_list_t *tl, uint64_t txg)
+{
+ int t = txg & TXG_MASK;
+ txg_node_t *tn;
+ void *p = NULL;
+
+ mutex_enter(&tl->tl_lock);
+ if ((tn = tl->tl_head[t]) != NULL) {
+ p = (char *)tn - tl->tl_offset;
+ tl->tl_head[t] = tn->tn_next[t];
+ tn->tn_next[t] = NULL;
+ tn->tn_member[t] = 0;
+ }
+ mutex_exit(&tl->tl_lock);
+
+ return (p);
+}
+
+/*
+ * Remove a specific item from the list and return it.
+ */
+void *
+txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
+{
+ int t = txg & TXG_MASK;
+ txg_node_t *tn, **tp;
+
+ mutex_enter(&tl->tl_lock);
+
+ for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
+ if ((char *)tn - tl->tl_offset == p) {
+ *tp = tn->tn_next[t];
+ tn->tn_next[t] = NULL;
+ tn->tn_member[t] = 0;
+ mutex_exit(&tl->tl_lock);
+ return (p);
+ }
+ }
+
+ mutex_exit(&tl->tl_lock);
+
+ return (NULL);
+}
+
+int
+txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
+{
+ int t = txg & TXG_MASK;
+ txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
+
+ return (tn->tn_member[t]);
+}
+
+/*
+ * Walk a txg list -- only safe if you know it's not changing.
+ */
+void *
+txg_list_head(txg_list_t *tl, uint64_t txg)
+{
+ int t = txg & TXG_MASK;
+ txg_node_t *tn = tl->tl_head[t];
+
+ return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
+}
+
+void *
+txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
+{
+ int t = txg & TXG_MASK;
+ txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
+
+ tn = tn->tn_next[t];
+
+ return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
+}