summaryrefslogtreecommitdiffstats
path: root/module/zfs/zil.c
diff options
context:
space:
mode:
authorBrian Behlendorf <[email protected]>2010-05-28 13:45:14 -0700
committerBrian Behlendorf <[email protected]>2010-05-28 13:45:14 -0700
commit428870ff734fdaccc342b33fc53cf94724409a46 (patch)
tree164e83c0ceda52a843795ed7cd9e95637d02c177 /module/zfs/zil.c
parent6119cb885a976e175a6e827894accf657ff1984f (diff)
Update core ZFS code from build 121 to build 141.
Diffstat (limited to 'module/zfs/zil.c')
-rw-r--r--module/zfs/zil.c1083
1 files changed, 588 insertions, 495 deletions
diff --git a/module/zfs/zil.c b/module/zfs/zil.c
index db3822f5a..4aa4d10b0 100644
--- a/module/zfs/zil.c
+++ b/module/zfs/zil.c
@@ -19,13 +19,13 @@
* CDDL HEADER END
*/
/*
- * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
- * Use is subject to license terms.
+ * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
*/
+/* Portions Copyright 2010 Robert Milkowski */
+
#include <sys/zfs_context.h>
#include <sys/spa.h>
-#include <sys/spa_impl.h>
#include <sys/dmu.h>
#include <sys/zap.h>
#include <sys/arc.h>
@@ -36,6 +36,7 @@
#include <sys/dsl_dataset.h>
#include <sys/vdev.h>
#include <sys/dmu_tx.h>
+#include <sys/dsl_pool.h>
/*
* The zfs intent log (ZIL) saves transaction records of system calls
@@ -66,7 +67,7 @@
/*
* This global ZIL switch affects all pools
*/
-int zil_disable = 0; /* disable intent logging */
+int zil_replay_disable = 0; /* disable intent logging replay */
/*
* Tunable parameter for debugging or performance analysis. Setting
@@ -77,11 +78,17 @@ boolean_t zfs_nocacheflush = B_FALSE;
static kmem_cache_t *zil_lwb_cache;
+static boolean_t zil_empty(zilog_t *zilog);
+
+#define LWB_EMPTY(lwb) ((BP_GET_LSIZE(&lwb->lwb_blk) - \
+ sizeof (zil_chain_t)) == (lwb->lwb_sz - lwb->lwb_nused))
+
+
static int
-zil_dva_compare(const void *x1, const void *x2)
+zil_bp_compare(const void *x1, const void *x2)
{
- const dva_t *dva1 = x1;
- const dva_t *dva2 = x2;
+ const dva_t *dva1 = &((zil_bp_node_t *)x1)->zn_dva;
+ const dva_t *dva2 = &((zil_bp_node_t *)x2)->zn_dva;
if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
return (-1);
@@ -97,34 +104,37 @@ zil_dva_compare(const void *x1, const void *x2)
}
static void
-zil_dva_tree_init(avl_tree_t *t)
+zil_bp_tree_init(zilog_t *zilog)
{
- avl_create(t, zil_dva_compare, sizeof (zil_dva_node_t),
- offsetof(zil_dva_node_t, zn_node));
+ avl_create(&zilog->zl_bp_tree, zil_bp_compare,
+ sizeof (zil_bp_node_t), offsetof(zil_bp_node_t, zn_node));
}
static void
-zil_dva_tree_fini(avl_tree_t *t)
+zil_bp_tree_fini(zilog_t *zilog)
{
- zil_dva_node_t *zn;
+ avl_tree_t *t = &zilog->zl_bp_tree;
+ zil_bp_node_t *zn;
void *cookie = NULL;
while ((zn = avl_destroy_nodes(t, &cookie)) != NULL)
- kmem_free(zn, sizeof (zil_dva_node_t));
+ kmem_free(zn, sizeof (zil_bp_node_t));
avl_destroy(t);
}
-static int
-zil_dva_tree_add(avl_tree_t *t, dva_t *dva)
+int
+zil_bp_tree_add(zilog_t *zilog, const blkptr_t *bp)
{
- zil_dva_node_t *zn;
+ avl_tree_t *t = &zilog->zl_bp_tree;
+ const dva_t *dva = BP_IDENTITY(bp);
+ zil_bp_node_t *zn;
avl_index_t where;
if (avl_find(t, dva, &where) != NULL)
return (EEXIST);
- zn = kmem_alloc(sizeof (zil_dva_node_t), KM_SLEEP);
+ zn = kmem_alloc(sizeof (zil_bp_node_t), KM_SLEEP);
zn->zn_dva = *dva;
avl_insert(t, zn, where);
@@ -149,35 +159,31 @@ zil_init_log_chain(zilog_t *zilog, blkptr_t *bp)
}
/*
- * Read a log block, make sure it's valid, and byteswap it if necessary.
+ * Read a log block and make sure it's valid.
*/
static int
-zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, arc_buf_t **abufpp)
+zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
+ char **end)
{
- blkptr_t blk = *bp;
- zbookmark_t zb;
+ enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
uint32_t aflags = ARC_WAIT;
+ arc_buf_t *abuf = NULL;
+ zbookmark_t zb;
int error;
- zb.zb_objset = bp->blk_cksum.zc_word[ZIL_ZC_OBJSET];
- zb.zb_object = 0;
- zb.zb_level = -1;
- zb.zb_blkid = bp->blk_cksum.zc_word[ZIL_ZC_SEQ];
+ if (zilog->zl_header->zh_claim_txg == 0)
+ zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
- *abufpp = NULL;
+ if (!(zilog->zl_header->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
+ zio_flags |= ZIO_FLAG_SPECULATIVE;
- /*
- * We shouldn't be doing any scrubbing while we're doing log
- * replay, it's OK to not lock.
- */
- error = arc_read_nolock(NULL, zilog->zl_spa, &blk,
- arc_getbuf_func, abufpp, ZIO_PRIORITY_SYNC_READ, ZIO_FLAG_CANFAIL |
- ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB, &aflags, &zb);
+ SET_BOOKMARK(&zb, bp->blk_cksum.zc_word[ZIL_ZC_OBJSET],
+ ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, bp->blk_cksum.zc_word[ZIL_ZC_SEQ]);
+
+ error = dsl_read_nolock(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
+ ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
if (error == 0) {
- char *data = (*abufpp)->b_data;
- uint64_t blksz = BP_GET_LSIZE(bp);
- zil_trailer_t *ztp = (zil_trailer_t *)(data + blksz) - 1;
zio_cksum_t cksum = bp->blk_cksum;
/*
@@ -190,43 +196,102 @@ zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, arc_buf_t **abufpp)
*/
cksum.zc_word[ZIL_ZC_SEQ]++;
- if (bcmp(&cksum, &ztp->zit_next_blk.blk_cksum,
- sizeof (cksum)) || BP_IS_HOLE(&ztp->zit_next_blk) ||
- (ztp->zit_nused > (blksz - sizeof (zil_trailer_t)))) {
- error = ECKSUM;
- }
+ if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
+ zil_chain_t *zilc = abuf->b_data;
+ char *lr = (char *)(zilc + 1);
+ uint64_t len = zilc->zc_nused - sizeof (zil_chain_t);
- if (error) {
- VERIFY(arc_buf_remove_ref(*abufpp, abufpp) == 1);
- *abufpp = NULL;
+ if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
+ sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk)) {
+ error = ECKSUM;
+ } else {
+ bcopy(lr, dst, len);
+ *end = (char *)dst + len;
+ *nbp = zilc->zc_next_blk;
+ }
+ } else {
+ char *lr = abuf->b_data;
+ uint64_t size = BP_GET_LSIZE(bp);
+ zil_chain_t *zilc = (zil_chain_t *)(lr + size) - 1;
+
+ if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
+ sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk) ||
+ (zilc->zc_nused > (size - sizeof (*zilc)))) {
+ error = ECKSUM;
+ } else {
+ bcopy(lr, dst, zilc->zc_nused);
+ *end = (char *)dst + zilc->zc_nused;
+ *nbp = zilc->zc_next_blk;
+ }
}
+
+ VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
+ }
+
+ return (error);
+}
+
+/*
+ * Read a TX_WRITE log data block.
+ */
+static int
+zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
+{
+ enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
+ const blkptr_t *bp = &lr->lr_blkptr;
+ uint32_t aflags = ARC_WAIT;
+ arc_buf_t *abuf = NULL;
+ zbookmark_t zb;
+ int error;
+
+ if (BP_IS_HOLE(bp)) {
+ if (wbuf != NULL)
+ bzero(wbuf, MAX(BP_GET_LSIZE(bp), lr->lr_length));
+ return (0);
}
- dprintf("error %d on %llu:%llu\n", error, zb.zb_objset, zb.zb_blkid);
+ if (zilog->zl_header->zh_claim_txg == 0)
+ zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
+
+ SET_BOOKMARK(&zb, dmu_objset_id(zilog->zl_os), lr->lr_foid,
+ ZB_ZIL_LEVEL, lr->lr_offset / BP_GET_LSIZE(bp));
+
+ error = arc_read_nolock(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
+ ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
+
+ if (error == 0) {
+ if (wbuf != NULL)
+ bcopy(abuf->b_data, wbuf, arc_buf_size(abuf));
+ (void) arc_buf_remove_ref(abuf, &abuf);
+ }
return (error);
}
/*
* Parse the intent log, and call parse_func for each valid record within.
- * Return the highest sequence number.
*/
-uint64_t
+int
zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
{
const zil_header_t *zh = zilog->zl_header;
- uint64_t claim_seq = zh->zh_claim_seq;
- uint64_t seq = 0;
- uint64_t max_seq = 0;
- blkptr_t blk = zh->zh_log;
- arc_buf_t *abuf;
+ boolean_t claimed = !!zh->zh_claim_txg;
+ uint64_t claim_blk_seq = claimed ? zh->zh_claim_blk_seq : UINT64_MAX;
+ uint64_t claim_lr_seq = claimed ? zh->zh_claim_lr_seq : UINT64_MAX;
+ uint64_t max_blk_seq = 0;
+ uint64_t max_lr_seq = 0;
+ uint64_t blk_count = 0;
+ uint64_t lr_count = 0;
+ blkptr_t blk, next_blk;
char *lrbuf, *lrp;
- zil_trailer_t *ztp;
- int reclen, error;
+ int error = 0;
- if (BP_IS_HOLE(&blk))
- return (max_seq);
+ /*
+ * Old logs didn't record the maximum zh_claim_lr_seq.
+ */
+ if (!(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
+ claim_lr_seq = UINT64_MAX;
/*
* Starting at the block pointed to by zh_log we read the log chain.
@@ -237,105 +302,156 @@ zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
* If the log has been claimed, stop if we encounter a sequence
* number greater than the highest claimed sequence number.
*/
- zil_dva_tree_init(&zilog->zl_dva_tree);
- for (;;) {
- seq = blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
+ lrbuf = zio_buf_alloc(SPA_MAXBLOCKSIZE);
+ zil_bp_tree_init(zilog);
- if (claim_seq != 0 && seq > claim_seq)
- break;
-
- ASSERT(max_seq < seq);
- max_seq = seq;
+ for (blk = zh->zh_log; !BP_IS_HOLE(&blk); blk = next_blk) {
+ uint64_t blk_seq = blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
+ int reclen;
+ char *end;
- error = zil_read_log_block(zilog, &blk, &abuf);
+ if (blk_seq > claim_blk_seq)
+ break;
+ if ((error = parse_blk_func(zilog, &blk, arg, txg)) != 0)
+ break;
+ ASSERT3U(max_blk_seq, <, blk_seq);
+ max_blk_seq = blk_seq;
+ blk_count++;
- if (parse_blk_func != NULL)
- parse_blk_func(zilog, &blk, arg, txg);
+ if (max_lr_seq == claim_lr_seq && max_blk_seq == claim_blk_seq)
+ break;
+ error = zil_read_log_block(zilog, &blk, &next_blk, lrbuf, &end);
if (error)
break;
- lrbuf = abuf->b_data;
- ztp = (zil_trailer_t *)(lrbuf + BP_GET_LSIZE(&blk)) - 1;
- blk = ztp->zit_next_blk;
-
- if (parse_lr_func == NULL) {
- VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
- continue;
- }
-
- for (lrp = lrbuf; lrp < lrbuf + ztp->zit_nused; lrp += reclen) {
+ for (lrp = lrbuf; lrp < end; lrp += reclen) {
lr_t *lr = (lr_t *)lrp;
reclen = lr->lrc_reclen;
ASSERT3U(reclen, >=, sizeof (lr_t));
- parse_lr_func(zilog, lr, arg, txg);
+ if (lr->lrc_seq > claim_lr_seq)
+ goto done;
+ if ((error = parse_lr_func(zilog, lr, arg, txg)) != 0)
+ goto done;
+ ASSERT3U(max_lr_seq, <, lr->lrc_seq);
+ max_lr_seq = lr->lrc_seq;
+ lr_count++;
}
- VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
}
- zil_dva_tree_fini(&zilog->zl_dva_tree);
+done:
+ zilog->zl_parse_error = error;
+ zilog->zl_parse_blk_seq = max_blk_seq;
+ zilog->zl_parse_lr_seq = max_lr_seq;
+ zilog->zl_parse_blk_count = blk_count;
+ zilog->zl_parse_lr_count = lr_count;
+
+ ASSERT(!claimed || !(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID) ||
+ (max_blk_seq == claim_blk_seq && max_lr_seq == claim_lr_seq));
+
+ zil_bp_tree_fini(zilog);
+ zio_buf_free(lrbuf, SPA_MAXBLOCKSIZE);
- return (max_seq);
+ return (error);
}
-/* ARGSUSED */
-static void
+static int
zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
{
- spa_t *spa = zilog->zl_spa;
- int err;
-
/*
* Claim log block if not already committed and not already claimed.
+ * If tx == NULL, just verify that the block is claimable.
*/
- if (bp->blk_birth >= first_txg &&
- zil_dva_tree_add(&zilog->zl_dva_tree, BP_IDENTITY(bp)) == 0) {
- err = zio_wait(zio_claim(NULL, spa, first_txg, bp, NULL, NULL,
- ZIO_FLAG_MUSTSUCCEED));
- ASSERT(err == 0);
- }
+ if (bp->blk_birth < first_txg || zil_bp_tree_add(zilog, bp) != 0)
+ return (0);
+
+ return (zio_wait(zio_claim(NULL, zilog->zl_spa,
+ tx == NULL ? 0 : first_txg, bp, spa_claim_notify, NULL,
+ ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB)));
}
-static void
+static int
zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
{
- if (lrc->lrc_txtype == TX_WRITE) {
- lr_write_t *lr = (lr_write_t *)lrc;
- zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg);
- }
+ lr_write_t *lr = (lr_write_t *)lrc;
+ int error;
+
+ if (lrc->lrc_txtype != TX_WRITE)
+ return (0);
+
+ /*
+ * If the block is not readable, don't claim it. This can happen
+ * in normal operation when a log block is written to disk before
+ * some of the dmu_sync() blocks it points to. In this case, the
+ * transaction cannot have been committed to anyone (we would have
+ * waited for all writes to be stable first), so it is semantically
+ * correct to declare this the end of the log.
+ */
+ if (lr->lr_blkptr.blk_birth >= first_txg &&
+ (error = zil_read_log_data(zilog, lr, NULL)) != 0)
+ return (error);
+ return (zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg));
}
/* ARGSUSED */
-static void
+static int
zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
{
- zio_free_blk(zilog->zl_spa, bp, dmu_tx_get_txg(tx));
+ zio_free_zil(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
+
+ return (0);
}
-static void
+static int
zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
{
+ lr_write_t *lr = (lr_write_t *)lrc;
+ blkptr_t *bp = &lr->lr_blkptr;
+
/*
* If we previously claimed it, we need to free it.
*/
- if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE) {
- lr_write_t *lr = (lr_write_t *)lrc;
- blkptr_t *bp = &lr->lr_blkptr;
- if (bp->blk_birth >= claim_txg &&
- !zil_dva_tree_add(&zilog->zl_dva_tree, BP_IDENTITY(bp))) {
- (void) arc_free(NULL, zilog->zl_spa,
- dmu_tx_get_txg(tx), bp, NULL, NULL, ARC_WAIT);
- }
+ if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE &&
+ bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0)
+ zio_free(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
+
+ return (0);
+}
+
+static lwb_t *
+zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, uint64_t txg)
+{
+ lwb_t *lwb;
+
+ lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
+ lwb->lwb_zilog = zilog;
+ lwb->lwb_blk = *bp;
+ lwb->lwb_buf = zio_buf_alloc(BP_GET_LSIZE(bp));
+ lwb->lwb_max_txg = txg;
+ lwb->lwb_zio = NULL;
+ lwb->lwb_tx = NULL;
+ if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
+ lwb->lwb_nused = sizeof (zil_chain_t);
+ lwb->lwb_sz = BP_GET_LSIZE(bp);
+ } else {
+ lwb->lwb_nused = 0;
+ lwb->lwb_sz = BP_GET_LSIZE(bp) - sizeof (zil_chain_t);
}
+
+ mutex_enter(&zilog->zl_lock);
+ list_insert_tail(&zilog->zl_lwb_list, lwb);
+ mutex_exit(&zilog->zl_lock);
+
+ return (lwb);
}
/*
* Create an on-disk intent log.
*/
-static void
+static lwb_t *
zil_create(zilog_t *zilog)
{
const zil_header_t *zh = zilog->zl_header;
- lwb_t *lwb;
+ lwb_t *lwb = NULL;
uint64_t txg = 0;
dmu_tx_t *tx = NULL;
blkptr_t blk;
@@ -352,22 +468,23 @@ zil_create(zilog_t *zilog)
blk = zh->zh_log;
/*
- * If we don't already have an initial log block or we have one
- * but it's the wrong endianness then allocate one.
+ * Allocate an initial log block if:
+ * - there isn't one already
+ * - the existing block is the wrong endianess
*/
if (BP_IS_HOLE(&blk) || BP_SHOULD_BYTESWAP(&blk)) {
tx = dmu_tx_create(zilog->zl_os);
- (void) dmu_tx_assign(tx, TXG_WAIT);
+ VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
txg = dmu_tx_get_txg(tx);
if (!BP_IS_HOLE(&blk)) {
- zio_free_blk(zilog->zl_spa, &blk, txg);
+ zio_free_zil(zilog->zl_spa, txg, &blk);
BP_ZERO(&blk);
}
- error = zio_alloc_blk(zilog->zl_spa, ZIL_MIN_BLKSZ, &blk,
- NULL, txg);
+ error = zio_alloc_zil(zilog->zl_spa, txg, &blk, NULL,
+ ZIL_MIN_BLKSZ, zilog->zl_logbias == ZFS_LOGBIAS_LATENCY);
if (error == 0)
zil_init_log_chain(zilog, &blk);
@@ -376,20 +493,8 @@ zil_create(zilog_t *zilog)
/*
* Allocate a log write buffer (lwb) for the first log block.
*/
- if (error == 0) {
- lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
- lwb->lwb_zilog = zilog;
- lwb->lwb_blk = blk;
- lwb->lwb_nused = 0;
- lwb->lwb_sz = BP_GET_LSIZE(&lwb->lwb_blk);
- lwb->lwb_buf = zio_buf_alloc(lwb->lwb_sz);
- lwb->lwb_max_txg = txg;
- lwb->lwb_zio = NULL;
-
- mutex_enter(&zilog->zl_lock);
- list_insert_tail(&zilog->zl_lwb_list, lwb);
- mutex_exit(&zilog->zl_lock);
- }
+ if (error == 0)
+ lwb = zil_alloc_lwb(zilog, &blk, txg);
/*
* If we just allocated the first log block, commit our transaction
@@ -402,6 +507,8 @@ zil_create(zilog_t *zilog)
}
ASSERT(bcmp(&blk, &zh->zh_log, sizeof (blk)) == 0);
+
+ return (lwb);
}
/*
@@ -426,26 +533,18 @@ zil_destroy(zilog_t *zilog, boolean_t keep_first)
*/
txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
+ zilog->zl_old_header = *zh; /* debugging aid */
+
if (BP_IS_HOLE(&zh->zh_log))
return;
tx = dmu_tx_create(zilog->zl_os);
- (void) dmu_tx_assign(tx, TXG_WAIT);
+ VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
txg = dmu_tx_get_txg(tx);
mutex_enter(&zilog->zl_lock);
- /*
- * It is possible for the ZIL to get the previously mounted zilog
- * structure of the same dataset if quickly remounted and the dbuf
- * eviction has not completed. In this case we can see a non
- * empty lwb list and keep_first will be set. We fix this by
- * clearing the keep_first. This will be slower but it's very rare.
- */
- if (!list_is_empty(&zilog->zl_lwb_list) && keep_first)
- keep_first = B_FALSE;
-
ASSERT3U(zilog->zl_destroy_txg, <, txg);
zilog->zl_destroy_txg = txg;
zilog->zl_keep_first = keep_first;
@@ -457,41 +556,20 @@ zil_destroy(zilog_t *zilog, boolean_t keep_first)
list_remove(&zilog->zl_lwb_list, lwb);
if (lwb->lwb_buf != NULL)
zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
- zio_free_blk(zilog->zl_spa, &lwb->lwb_blk, txg);
+ zio_free_zil(zilog->zl_spa, txg, &lwb->lwb_blk);
kmem_cache_free(zil_lwb_cache, lwb);
}
- } else {
- if (!keep_first) {
- (void) zil_parse(zilog, zil_free_log_block,
- zil_free_log_record, tx, zh->zh_claim_txg);
- }
+ } else if (!keep_first) {
+ (void) zil_parse(zilog, zil_free_log_block,
+ zil_free_log_record, tx, zh->zh_claim_txg);
}
mutex_exit(&zilog->zl_lock);
dmu_tx_commit(tx);
}
-/*
- * return true if the initial log block is not valid
- */
-static boolean_t
-zil_empty(zilog_t *zilog)
-{
- const zil_header_t *zh = zilog->zl_header;
- arc_buf_t *abuf = NULL;
-
- if (BP_IS_HOLE(&zh->zh_log))
- return (B_TRUE);
-
- if (zil_read_log_block(zilog, &zh->zh_log, &abuf) != 0)
- return (B_TRUE);
-
- VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
- return (B_FALSE);
-}
-
int
-zil_claim(char *osname, void *txarg)
+zil_claim(const char *osname, void *txarg)
{
dmu_tx_t *tx = txarg;
uint64_t first_txg = dmu_tx_get_txg(tx);
@@ -500,7 +578,7 @@ zil_claim(char *osname, void *txarg)
objset_t *os;
int error;
- error = dmu_objset_open(osname, DMU_OST_ANY, DS_MODE_USER, &os);
+ error = dmu_objset_hold(osname, FTAG, &os);
if (error) {
cmn_err(CE_WARN, "can't open objset for %s", osname);
return (0);
@@ -509,28 +587,13 @@ zil_claim(char *osname, void *txarg)
zilog = dmu_objset_zil(os);
zh = zil_header_in_syncing_context(zilog);
- if (zilog->zl_spa->spa_log_state == SPA_LOG_CLEAR) {
+ if (spa_get_log_state(zilog->zl_spa) == SPA_LOG_CLEAR) {
if (!BP_IS_HOLE(&zh->zh_log))
- zio_free_blk(zilog->zl_spa, &zh->zh_log, first_txg);
+ zio_free_zil(zilog->zl_spa, first_txg, &zh->zh_log);
BP_ZERO(&zh->zh_log);
dsl_dataset_dirty(dmu_objset_ds(os), tx);
- }
-
- /*
- * Record here whether the zil has any records to replay.
- * If the header block pointer is null or the block points
- * to the stubby then we know there are no valid log records.
- * We use the header to store this state as the the zilog gets
- * freed later in dmu_objset_close().
- * The flags (and the rest of the header fields) are cleared in
- * zil_sync() as a result of a zil_destroy(), after replaying the log.
- *
- * Note, the intent log can be empty but still need the
- * stubby to be claimed.
- */
- if (!zil_empty(zilog)) {
- zh->zh_flags |= ZIL_REPLAY_NEEDED;
- dsl_dataset_dirty(dmu_objset_ds(os), tx);
+ dmu_objset_rele(os, FTAG);
+ return (0);
}
/*
@@ -542,14 +605,19 @@ zil_claim(char *osname, void *txarg)
*/
ASSERT3U(zh->zh_claim_txg, <=, first_txg);
if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
- zh->zh_claim_txg = first_txg;
- zh->zh_claim_seq = zil_parse(zilog, zil_claim_log_block,
+ (void) zil_parse(zilog, zil_claim_log_block,
zil_claim_log_record, tx, first_txg);
+ zh->zh_claim_txg = first_txg;
+ zh->zh_claim_blk_seq = zilog->zl_parse_blk_seq;
+ zh->zh_claim_lr_seq = zilog->zl_parse_lr_seq;
+ if (zilog->zl_parse_lr_count || zilog->zl_parse_blk_count > 1)
+ zh->zh_flags |= ZIL_REPLAY_NEEDED;
+ zh->zh_flags |= ZIL_CLAIM_LR_SEQ_VALID;
dsl_dataset_dirty(dmu_objset_ds(os), tx);
}
ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
- dmu_objset_close(os);
+ dmu_objset_rele(os, FTAG);
return (0);
}
@@ -558,46 +626,36 @@ zil_claim(char *osname, void *txarg)
* Checksum errors are ok as they indicate the end of the chain.
* Any other error (no device or read failure) returns an error.
*/
-/* ARGSUSED */
int
-zil_check_log_chain(char *osname, void *txarg)
+zil_check_log_chain(const char *osname, void *tx)
{
zilog_t *zilog;
- zil_header_t *zh;
- blkptr_t blk;
- arc_buf_t *abuf;
objset_t *os;
- char *lrbuf;
- zil_trailer_t *ztp;
int error;
- error = dmu_objset_open(osname, DMU_OST_ANY, DS_MODE_USER, &os);
+ ASSERT(tx == NULL);
+
+ error = dmu_objset_hold(osname, FTAG, &os);
if (error) {
cmn_err(CE_WARN, "can't open objset for %s", osname);
return (0);
}
zilog = dmu_objset_zil(os);
- zh = zil_header_in_syncing_context(zilog);
- blk = zh->zh_log;
- if (BP_IS_HOLE(&blk)) {
- dmu_objset_close(os);
- return (0); /* no chain */
- }
- for (;;) {
- error = zil_read_log_block(zilog, &blk, &abuf);
- if (error)
- break;
- lrbuf = abuf->b_data;
- ztp = (zil_trailer_t *)(lrbuf + BP_GET_LSIZE(&blk)) - 1;
- blk = ztp->zit_next_blk;
- VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
- }
- dmu_objset_close(os);
- if (error == ECKSUM)
- return (0); /* normal end of chain */
- return (error);
+ /*
+ * Because tx == NULL, zil_claim_log_block() will not actually claim
+ * any blocks, but just determine whether it is possible to do so.
+ * In addition to checking the log chain, zil_claim_log_block()
+ * will invoke zio_claim() with a done func of spa_claim_notify(),
+ * which will update spa_max_claim_txg. See spa_load() for details.
+ */
+ error = zil_parse(zilog, zil_claim_log_block, zil_claim_log_record, tx,
+ zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa));
+
+ dmu_objset_rele(os, FTAG);
+
+ return ((error == ECKSUM || error == ENOENT) ? 0 : error);
}
static int
@@ -615,7 +673,7 @@ zil_vdev_compare(const void *x1, const void *x2)
}
void
-zil_add_block(zilog_t *zilog, blkptr_t *bp)
+zil_add_block(zilog_t *zilog, const blkptr_t *bp)
{
avl_tree_t *t = &zilog->zl_vdev_tree;
avl_index_t where;
@@ -691,9 +749,9 @@ zil_lwb_write_done(zio_t *zio)
{
lwb_t *lwb = zio->io_private;
zilog_t *zilog = lwb->lwb_zilog;
+ dmu_tx_t *tx = lwb->lwb_tx;
ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
- ASSERT(BP_GET_CHECKSUM(zio->io_bp) == ZIO_CHECKSUM_ZILOG);
ASSERT(BP_GET_TYPE(zio->io_bp) == DMU_OT_INTENT_LOG);
ASSERT(BP_GET_LEVEL(zio->io_bp) == 0);
ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
@@ -712,17 +770,15 @@ zil_lwb_write_done(zio_t *zio)
zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
mutex_enter(&zilog->zl_lock);
lwb->lwb_buf = NULL;
- if (zio->io_error)
- zilog->zl_log_error = B_TRUE;
+ lwb->lwb_tx = NULL;
+ mutex_exit(&zilog->zl_lock);
/*
* Now that we've written this log block, we have a stable pointer
* to the next block in the chain, so it's OK to let the txg in
- * which we allocated the next block sync. We still have the
- * zl_lock to ensure zil_sync doesn't kmem free the lwb.
+ * which we allocated the next block sync.
*/
- txg_rele_to_sync(&lwb->lwb_txgh);
- mutex_exit(&zilog->zl_lock);
+ dmu_tx_commit(tx);
}
/*
@@ -733,10 +789,9 @@ zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
{
zbookmark_t zb;
- zb.zb_objset = lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET];
- zb.zb_object = 0;
- zb.zb_level = -1;
- zb.zb_blkid = lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
+ SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
+ ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
+ lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ]);
if (zilog->zl_root_zio == NULL) {
zilog->zl_root_zio = zio_root(zilog->zl_spa, NULL, NULL,
@@ -744,118 +799,147 @@ zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
}
if (lwb->lwb_zio == NULL) {
lwb->lwb_zio = zio_rewrite(zilog->zl_root_zio, zilog->zl_spa,
- 0, &lwb->lwb_blk, lwb->lwb_buf, lwb->lwb_sz,
+ 0, &lwb->lwb_blk, lwb->lwb_buf, BP_GET_LSIZE(&lwb->lwb_blk),
zil_lwb_write_done, lwb, ZIO_PRIORITY_LOG_WRITE,
- ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE, &zb);
+ ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE, &zb);
}
}
/*
+ * Define a limited set of intent log block sizes.
+ * These must be a multiple of 4KB. Note only the amount used (again
+ * aligned to 4KB) actually gets written. However, we can't always just
+ * allocate SPA_MAXBLOCKSIZE as the slog space could be exhausted.
+ */
+uint64_t zil_block_buckets[] = {
+ 4096, /* non TX_WRITE */
+ 8192+4096, /* data base */
+ 32*1024 + 4096, /* NFS writes */
+ UINT64_MAX
+};
+
+/*
+ * Use the slog as long as the logbias is 'latency' and the current commit size
+ * is less than the limit or the total list size is less than 2X the limit.
+ * Limit checking is disabled by setting zil_slog_limit to UINT64_MAX.
+ */
+uint64_t zil_slog_limit = 1024 * 1024;
+#define USE_SLOG(zilog) (((zilog)->zl_logbias == ZFS_LOGBIAS_LATENCY) && \
+ (((zilog)->zl_cur_used < zil_slog_limit) || \
+ ((zilog)->zl_itx_list_sz < (zil_slog_limit << 1))))
+
+/*
* Start a log block write and advance to the next log block.
* Calls are serialized.
*/
static lwb_t *
zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
{
- lwb_t *nlwb;
- zil_trailer_t *ztp = (zil_trailer_t *)(lwb->lwb_buf + lwb->lwb_sz) - 1;
+ lwb_t *nlwb = NULL;
+ zil_chain_t *zilc;
spa_t *spa = zilog->zl_spa;
- blkptr_t *bp = &ztp->zit_next_blk;
+ blkptr_t *bp;
+ dmu_tx_t *tx;
uint64_t txg;
- uint64_t zil_blksz;
- int error;
+ uint64_t zil_blksz, wsz;
+ int i, error;
+
+ if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
+ zilc = (zil_chain_t *)lwb->lwb_buf;
+ bp = &zilc->zc_next_blk;
+ } else {
+ zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
+ bp = &zilc->zc_next_blk;
+ }
- ASSERT(lwb->lwb_nused <= ZIL_BLK_DATA_SZ(lwb));
+ ASSERT(lwb->lwb_nused <= lwb->lwb_sz);
/*
* Allocate the next block and save its address in this block
* before writing it in order to establish the log chain.
* Note that if the allocation of nlwb synced before we wrote
* the block that points at it (lwb), we'd leak it if we crashed.
- * Therefore, we don't do txg_rele_to_sync() until zil_lwb_write_done().
+ * Therefore, we don't do dmu_tx_commit() until zil_lwb_write_done().
+ * We dirty the dataset to ensure that zil_sync() will be called
+ * to clean up in the event of allocation failure or I/O failure.
*/
- txg = txg_hold_open(zilog->zl_dmu_pool, &lwb->lwb_txgh);
- txg_rele_to_quiesce(&lwb->lwb_txgh);
+ tx = dmu_tx_create(zilog->zl_os);
+ VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
+ dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
+ txg = dmu_tx_get_txg(tx);
+
+ lwb->lwb_tx = tx;
/*
- * Pick a ZIL blocksize. We request a size that is the
- * maximum of the previous used size, the current used size and
- * the amount waiting in the queue.
+ * Log blocks are pre-allocated. Here we select the size of the next
+ * block, based on size used in the last block.
+ * - first find the smallest bucket that will fit the block from a
+ * limited set of block sizes. This is because it's faster to write
+ * blocks allocated from the same metaslab as they are adjacent or
+ * close.
+ * - next find the maximum from the new suggested size and an array of
+ * previous sizes. This lessens a picket fence effect of wrongly
+ * guesssing the size if we have a stream of say 2k, 64k, 2k, 64k
+ * requests.
+ *
+ * Note we only write what is used, but we can't just allocate
+ * the maximum block size because we can exhaust the available
+ * pool log space.
*/
- zil_blksz = MAX(zilog->zl_prev_used,
- zilog->zl_cur_used + sizeof (*ztp));
- zil_blksz = MAX(zil_blksz, zilog->zl_itx_list_sz + sizeof (*ztp));
- zil_blksz = P2ROUNDUP_TYPED(zil_blksz, ZIL_MIN_BLKSZ, uint64_t);
- if (zil_blksz > ZIL_MAX_BLKSZ)
- zil_blksz = ZIL_MAX_BLKSZ;
+ zil_blksz = zilog->zl_cur_used + sizeof (zil_chain_t);
+ for (i = 0; zil_blksz > zil_block_buckets[i]; i++)
+ continue;
+ zil_blksz = zil_block_buckets[i];
+ if (zil_blksz == UINT64_MAX)
+ zil_blksz = SPA_MAXBLOCKSIZE;
+ zilog->zl_prev_blks[zilog->zl_prev_rotor] = zil_blksz;
+ for (i = 0; i < ZIL_PREV_BLKS; i++)
+ zil_blksz = MAX(zil_blksz, zilog->zl_prev_blks[i]);
+ zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1);
BP_ZERO(bp);
/* pass the old blkptr in order to spread log blocks across devs */
- error = zio_alloc_blk(spa, zil_blksz, bp, &lwb->lwb_blk, txg);
- if (error) {
- dmu_tx_t *tx = dmu_tx_create_assigned(zilog->zl_dmu_pool, txg);
+ error = zio_alloc_zil(spa, txg, bp, &lwb->lwb_blk, zil_blksz,
+ USE_SLOG(zilog));
+ if (!error) {
+ ASSERT3U(bp->blk_birth, ==, txg);
+ bp->blk_cksum = lwb->lwb_blk.blk_cksum;
+ bp->blk_cksum.zc_word[ZIL_ZC_SEQ]++;
/*
- * We dirty the dataset to ensure that zil_sync() will
- * be called to remove this lwb from our zl_lwb_list.
- * Failing to do so, may leave an lwb with a NULL lwb_buf
- * hanging around on the zl_lwb_list.
+ * Allocate a new log write buffer (lwb).
*/
- dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
- dmu_tx_commit(tx);
-
- /*
- * Since we've just experienced an allocation failure so we
- * terminate the current lwb and send it on its way.
- */
- ztp->zit_pad = 0;
- ztp->zit_nused = lwb->lwb_nused;
- ztp->zit_bt.zbt_cksum = lwb->lwb_blk.blk_cksum;
- zio_nowait(lwb->lwb_zio);
+ nlwb = zil_alloc_lwb(zilog, bp, txg);
- /*
- * By returning NULL the caller will call tx_wait_synced()
- */
- return (NULL);
+ /* Record the block for later vdev flushing */
+ zil_add_block(zilog, &lwb->lwb_blk);
}
- ASSERT3U(bp->blk_birth, ==, txg);
- ztp->zit_pad = 0;
- ztp->zit_nused = lwb->lwb_nused;
- ztp->zit_bt.zbt_cksum = lwb->lwb_blk.blk_cksum;
- bp->blk_cksum = lwb->lwb_blk.blk_cksum;
- bp->blk_cksum.zc_word[ZIL_ZC_SEQ]++;
+ if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
+ /* For Slim ZIL only write what is used. */
+ wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, uint64_t);
+ ASSERT3U(wsz, <=, lwb->lwb_sz);
+ zio_shrink(lwb->lwb_zio, wsz);
- /*
- * Allocate a new log write buffer (lwb).
- */
- nlwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
+ } else {
+ wsz = lwb->lwb_sz;
+ }
- nlwb->lwb_zilog = zilog;
- nlwb->lwb_blk = *bp;
- nlwb->lwb_nused = 0;
- nlwb->lwb_sz = BP_GET_LSIZE(&nlwb->lwb_blk);
- nlwb->lwb_buf = zio_buf_alloc(nlwb->lwb_sz);
- nlwb->lwb_max_txg = txg;
- nlwb->lwb_zio = NULL;
+ zilc->zc_pad = 0;
+ zilc->zc_nused = lwb->lwb_nused;
+ zilc->zc_eck.zec_cksum = lwb->lwb_blk.blk_cksum;
/*
- * Put new lwb at the end of the log chain
+ * clear unused data for security
*/
- mutex_enter(&zilog->zl_lock);
- list_insert_tail(&zilog->zl_lwb_list, nlwb);
- mutex_exit(&zilog->zl_lock);
+ bzero(lwb->lwb_buf + lwb->lwb_nused, wsz - lwb->lwb_nused);
- /* Record the block for later vdev flushing */
- zil_add_block(zilog, &lwb->lwb_blk);
+ zio_nowait(lwb->lwb_zio); /* Kick off the write for the old log block */
/*
- * kick off the write for the old log block
+ * If there was an allocation failure then nlwb will be null which
+ * forces a txg_wait_synced().
*/
- dprintf_bp(&lwb->lwb_blk, "lwb %p txg %llu: ", lwb, txg);
- ASSERT(lwb->lwb_zio);
- zio_nowait(lwb->lwb_zio);
-
return (nlwb);
}
@@ -863,20 +947,20 @@ static lwb_t *
zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
{
lr_t *lrc = &itx->itx_lr; /* common log record */
- lr_write_t *lr = (lr_write_t *)lrc;
+ lr_write_t *lrw = (lr_write_t *)lrc;
+ char *lr_buf;
uint64_t txg = lrc->lrc_txg;
uint64_t reclen = lrc->lrc_reclen;
- uint64_t dlen;
+ uint64_t dlen = 0;
if (lwb == NULL)
return (NULL);
+
ASSERT(lwb->lwb_buf != NULL);
if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY)
dlen = P2ROUNDUP_TYPED(
- lr->lr_length, sizeof (uint64_t), uint64_t);
- else
- dlen = 0;
+ lrw->lr_length, sizeof (uint64_t), uint64_t);
zilog->zl_cur_used += (reclen + dlen);
@@ -885,24 +969,22 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
/*
* If this record won't fit in the current log block, start a new one.
*/
- if (lwb->lwb_nused + reclen + dlen > ZIL_BLK_DATA_SZ(lwb)) {
+ if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
lwb = zil_lwb_write_start(zilog, lwb);
if (lwb == NULL)
return (NULL);
zil_lwb_write_init(zilog, lwb);
- ASSERT(lwb->lwb_nused == 0);
- if (reclen + dlen > ZIL_BLK_DATA_SZ(lwb)) {
+ ASSERT(LWB_EMPTY(lwb));
+ if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
txg_wait_synced(zilog->zl_dmu_pool, txg);
return (lwb);
}
}
- /*
- * Update the lrc_seq, to be log record sequence number. See zil.h
- * Then copy the record to the log buffer.
- */
- lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
- bcopy(lrc, lwb->lwb_buf + lwb->lwb_nused, reclen);
+ lr_buf = lwb->lwb_buf + lwb->lwb_nused;
+ bcopy(lrc, lr_buf, reclen);
+ lrc = (lr_t *)lr_buf;
+ lrw = (lr_write_t *)lrc;
/*
* If it's a write, fetch the data or get its blkptr as appropriate.
@@ -914,18 +996,16 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
char *dbuf;
int error;
- /* alignment is guaranteed */
- lr = (lr_write_t *)(lwb->lwb_buf + lwb->lwb_nused);
if (dlen) {
ASSERT(itx->itx_wr_state == WR_NEED_COPY);
- dbuf = lwb->lwb_buf + lwb->lwb_nused + reclen;
- lr->lr_common.lrc_reclen += dlen;
+ dbuf = lr_buf + reclen;
+ lrw->lr_common.lrc_reclen += dlen;
} else {
ASSERT(itx->itx_wr_state == WR_INDIRECT);
dbuf = NULL;
}
error = zilog->zl_get_data(
- itx->itx_private, lr, dbuf, lwb->lwb_zio);
+ itx->itx_private, lrw, dbuf, lwb->lwb_zio);
if (error == EIO) {
txg_wait_synced(zilog->zl_dmu_pool, txg);
return (lwb);
@@ -938,9 +1018,16 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
}
}
+ /*
+ * We're actually making an entry, so update lrc_seq to be the
+ * log record sequence number. Note that this is generally not
+ * equal to the itx sequence number because not all transactions
+ * are synchronous, and sometimes spa_sync() gets there first.
+ */
+ lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
lwb->lwb_nused += reclen + dlen;
lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
- ASSERT3U(lwb->lwb_nused, <=, ZIL_BLK_DATA_SZ(lwb));
+ ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz);
ASSERT3U(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)), ==, 0);
return (lwb);
@@ -962,12 +1049,19 @@ zil_itx_create(uint64_t txtype, size_t lrsize)
return (itx);
}
+void
+zil_itx_destroy(itx_t *itx)
+{
+ kmem_free(itx, offsetof(itx_t, itx_lr) + itx->itx_lr.lrc_reclen);
+}
+
uint64_t
zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
{
uint64_t seq;
ASSERT(itx->itx_lr.lrc_seq == 0);
+ ASSERT(!zilog->zl_replay);
mutex_enter(&zilog->zl_lock);
list_insert_tail(&zilog->zl_itx_list, itx);
@@ -1016,8 +1110,7 @@ zil_itx_clean(zilog_t *zilog)
/* destroy sync'd log transactions */
while ((itx = list_head(&clean_list)) != NULL) {
list_remove(&clean_list, itx);
- kmem_free(itx, offsetof(itx_t, itx_lr)
- + itx->itx_lr.lrc_reclen);
+ zil_itx_destroy(itx);
}
list_destroy(&clean_list);
}
@@ -1036,7 +1129,7 @@ zil_clean(zilog_t *zilog)
if ((itx != NULL) &&
(itx->itx_lr.lrc_txg <= spa_last_synced_txg(zilog->zl_spa))) {
(void) taskq_dispatch(zilog->zl_clean_taskq,
- (task_func_t *)zil_itx_clean, zilog, TQ_SLEEP);
+ (task_func_t *)zil_itx_clean, zilog, TQ_NOSLEEP);
}
mutex_exit(&zilog->zl_lock);
}
@@ -1046,9 +1139,10 @@ zil_commit_writer(zilog_t *zilog, uint64_t seq, uint64_t foid)
{
uint64_t txg;
uint64_t commit_seq = 0;
- itx_t *itx, *itx_next = (itx_t *)-1;
+ itx_t *itx, *itx_next;
lwb_t *lwb;
spa_t *spa;
+ int error = 0;
zilog->zl_writer = B_TRUE;
ASSERT(zilog->zl_root_zio == NULL);
@@ -1068,77 +1162,64 @@ zil_commit_writer(zilog_t *zilog, uint64_t seq, uint64_t foid)
return;
}
mutex_exit(&zilog->zl_lock);
- zil_create(zilog);
+ lwb = zil_create(zilog);
mutex_enter(&zilog->zl_lock);
- lwb = list_tail(&zilog->zl_lwb_list);
}
}
+ ASSERT(lwb == NULL || lwb->lwb_zio == NULL);
/* Loop through in-memory log transactions filling log blocks. */
DTRACE_PROBE1(zil__cw1, zilog_t *, zilog);
- for (;;) {
+
+ for (itx = list_head(&zilog->zl_itx_list); itx; itx = itx_next) {
/*
- * Find the next itx to push:
- * Push all transactions related to specified foid and all
- * other transactions except TX_WRITE, TX_TRUNCATE,
- * TX_SETATTR and TX_ACL for all other files.
+ * Save the next pointer. Even though we drop zl_lock below,
+ * all threads that can remove itx list entries (other writers
+ * and zil_itx_clean()) can't do so until they have zl_writer.
*/
- if (itx_next != (itx_t *)-1)
- itx = itx_next;
- else
- itx = list_head(&zilog->zl_itx_list);
- for (; itx != NULL; itx = list_next(&zilog->zl_itx_list, itx)) {
- if (foid == 0) /* push all foids? */
- break;
- if (itx->itx_sync) /* push all O_[D]SYNC */
- break;
- switch (itx->itx_lr.lrc_txtype) {
- case TX_SETATTR:
- case TX_WRITE:
- case TX_TRUNCATE:
- case TX_ACL:
- /* lr_foid is same offset for these records */
- if (((lr_write_t *)&itx->itx_lr)->lr_foid
- != foid) {
- continue; /* skip this record */
- }
- }
- break;
- }
- if (itx == NULL)
- break;
+ itx_next = list_next(&zilog->zl_itx_list, itx);
+
+ /*
+ * Determine whether to push this itx.
+ * Push all transactions related to specified foid and
+ * all other transactions except those that can be logged
+ * out of order (TX_WRITE, TX_TRUNCATE, TX_SETATTR, TX_ACL)
+ * for all other files.
+ *
+ * If foid == 0 (meaning "push all foids") or
+ * itx->itx_sync is set (meaning O_[D]SYNC), push regardless.
+ */
+ if (foid != 0 && !itx->itx_sync &&
+ TX_OOO(itx->itx_lr.lrc_txtype) &&
+ ((lr_ooo_t *)&itx->itx_lr)->lr_foid != foid)
+ continue; /* skip this record */
if ((itx->itx_lr.lrc_seq > seq) &&
- ((lwb == NULL) || (lwb->lwb_nused == 0) ||
- (lwb->lwb_nused + itx->itx_sod > ZIL_BLK_DATA_SZ(lwb)))) {
+ ((lwb == NULL) || (LWB_EMPTY(lwb)) ||
+ (lwb->lwb_nused + itx->itx_sod > lwb->lwb_sz)))
break;
- }
- /*
- * Save the next pointer. Even though we soon drop
- * zl_lock all threads that may change the list
- * (another writer or zil_itx_clean) can't do so until
- * they have zl_writer.
- */
- itx_next = list_next(&zilog->zl_itx_list, itx);
list_remove(&zilog->zl_itx_list, itx);
zilog->zl_itx_list_sz -= itx->itx_sod;
+
mutex_exit(&zilog->zl_lock);
+
txg = itx->itx_lr.lrc_txg;
ASSERT(txg);
if (txg > spa_last_synced_txg(spa) ||
txg > spa_freeze_txg(spa))
lwb = zil_lwb_commit(zilog, itx, lwb);
- kmem_free(itx, offsetof(itx_t, itx_lr)
- + itx->itx_lr.lrc_reclen);
+
+ zil_itx_destroy(itx);
+
mutex_enter(&zilog->zl_lock);
}
DTRACE_PROBE1(zil__cw2, zilog_t *, zilog);
/* determine commit sequence number */
itx = list_head(&zilog->zl_itx_list);
if (itx)
- commit_seq = itx->itx_lr.lrc_seq;
+ commit_seq = itx->itx_lr.lrc_seq - 1;
else
commit_seq = zilog->zl_itx_seq;
mutex_exit(&zilog->zl_lock);
@@ -1155,22 +1236,28 @@ zil_commit_writer(zilog_t *zilog, uint64_t seq, uint64_t foid)
*/
if (zilog->zl_root_zio) {
DTRACE_PROBE1(zil__cw3, zilog_t *, zilog);
- (void) zio_wait(zilog->zl_root_zio);
+ error = zio_wait(zilog->zl_root_zio);
zilog->zl_root_zio = NULL;
DTRACE_PROBE1(zil__cw4, zilog_t *, zilog);
zil_flush_vdevs(zilog);
}
- if (zilog->zl_log_error || lwb == NULL) {
- zilog->zl_log_error = 0;
+ if (error || lwb == NULL)
txg_wait_synced(zilog->zl_dmu_pool, 0);
- }
mutex_enter(&zilog->zl_lock);
zilog->zl_writer = B_FALSE;
ASSERT3U(commit_seq, >=, zilog->zl_commit_seq);
zilog->zl_commit_seq = commit_seq;
+
+ /*
+ * Remember the highest committed log sequence number for ztest.
+ * We only update this value when all the log writes succeeded,
+ * because ztest wants to ASSERT that it got the whole log chain.
+ */
+ if (error == 0 && lwb != NULL)
+ zilog->zl_commit_lr_seq = zilog->zl_lr_seq;
}
/*
@@ -1181,7 +1268,7 @@ zil_commit_writer(zilog_t *zilog, uint64_t seq, uint64_t foid)
void
zil_commit(zilog_t *zilog, uint64_t seq, uint64_t foid)
{
- if (zilog == NULL || seq == 0)
+ if (zilog->zl_sync == ZFS_SYNC_DISABLED || seq == 0)
return;
mutex_enter(&zilog->zl_lock);
@@ -1190,7 +1277,7 @@ zil_commit(zilog_t *zilog, uint64_t seq, uint64_t foid)
while (zilog->zl_writer) {
cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
- if (seq < zilog->zl_commit_seq) {
+ if (seq <= zilog->zl_commit_seq) {
mutex_exit(&zilog->zl_lock);
return;
}
@@ -1202,6 +1289,33 @@ zil_commit(zilog_t *zilog, uint64_t seq, uint64_t foid)
}
/*
+ * Report whether all transactions are committed.
+ */
+static boolean_t
+zil_is_committed(zilog_t *zilog)
+{
+ lwb_t *lwb;
+ boolean_t committed;
+
+ mutex_enter(&zilog->zl_lock);
+
+ while (zilog->zl_writer)
+ cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
+
+ if (!list_is_empty(&zilog->zl_itx_list))
+ committed = B_FALSE; /* unpushed transactions */
+ else if ((lwb = list_head(&zilog->zl_lwb_list)) == NULL)
+ committed = B_TRUE; /* intent log never used */
+ else if (list_next(&zilog->zl_lwb_list, lwb) != NULL)
+ committed = B_FALSE; /* zil_sync() not done yet */
+ else
+ committed = B_TRUE; /* everything synced */
+
+ mutex_exit(&zilog->zl_lock);
+ return (committed);
+}
+
+/*
* Called in syncing context to free committed log blocks and update log header.
*/
void
@@ -1210,6 +1324,7 @@ zil_sync(zilog_t *zilog, dmu_tx_t *tx)
zil_header_t *zh = zil_header_in_syncing_context(zilog);
uint64_t txg = dmu_tx_get_txg(tx);
spa_t *spa = zilog->zl_spa;
+ uint64_t *replayed_seq = &zilog->zl_replayed_seq[txg & TXG_MASK];
lwb_t *lwb;
/*
@@ -1223,7 +1338,11 @@ zil_sync(zilog_t *zilog, dmu_tx_t *tx)
ASSERT(zilog->zl_stop_sync == 0);
- zh->zh_replay_seq = zilog->zl_replayed_seq[txg & TXG_MASK];
+ if (*replayed_seq != 0) {
+ ASSERT(zh->zh_replay_seq < *replayed_seq);
+ zh->zh_replay_seq = *replayed_seq;
+ *replayed_seq = 0;
+ }
if (zilog->zl_destroy_txg == txg) {
blkptr_t blk = zh->zh_log;
@@ -1252,7 +1371,7 @@ zil_sync(zilog_t *zilog, dmu_tx_t *tx)
if (lwb->lwb_buf != NULL || lwb->lwb_max_txg > txg)
break;
list_remove(&zilog->zl_lwb_list, lwb);
- zio_free_blk(spa, &lwb->lwb_blk, txg);
+ zio_free_zil(spa, txg, &lwb->lwb_blk);
kmem_cache_free(zil_lwb_cache, lwb);
/*
@@ -1280,6 +1399,18 @@ zil_fini(void)
kmem_cache_destroy(zil_lwb_cache);
}
+void
+zil_set_sync(zilog_t *zilog, uint64_t sync)
+{
+ zilog->zl_sync = sync;
+}
+
+void
+zil_set_logbias(zilog_t *zilog, uint64_t logbias)
+{
+ zilog->zl_logbias = logbias;
+}
+
zilog_t *
zil_alloc(objset_t *os, zil_header_t *zh_phys)
{
@@ -1292,6 +1423,8 @@ zil_alloc(objset_t *os, zil_header_t *zh_phys)
zilog->zl_spa = dmu_objset_spa(os);
zilog->zl_dmu_pool = dmu_objset_pool(os);
zilog->zl_destroy_txg = TXG_INITIAL - 1;
+ zilog->zl_logbias = dmu_objset_logbias(os);
+ zilog->zl_sync = dmu_objset_syncprop(os);
mutex_init(&zilog->zl_lock, NULL, MUTEX_DEFAULT, NULL);
@@ -1368,7 +1501,7 @@ zil_close(zilog_t *zilog)
if (!zil_is_committed(zilog)) {
uint64_t txg;
dmu_tx_t *tx = dmu_tx_create(zilog->zl_os);
- (void) dmu_tx_assign(tx, TXG_WAIT);
+ VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
txg = dmu_tx_get_txg(tx);
dmu_tx_commit(tx);
@@ -1442,88 +1575,88 @@ zil_resume(zilog_t *zilog)
}
typedef struct zil_replay_arg {
- objset_t *zr_os;
zil_replay_func_t **zr_replay;
void *zr_arg;
boolean_t zr_byteswap;
- char *zr_lrbuf;
+ char *zr_lr;
} zil_replay_arg_t;
-static void
+static int
+zil_replay_error(zilog_t *zilog, lr_t *lr, int error)
+{
+ char name[MAXNAMELEN];
+
+ zilog->zl_replaying_seq--; /* didn't actually replay this one */
+
+ dmu_objset_name(zilog->zl_os, name);
+
+ cmn_err(CE_WARN, "ZFS replay transaction error %d, "
+ "dataset %s, seq 0x%llx, txtype %llu %s\n", error, name,
+ (u_longlong_t)lr->lrc_seq,
+ (u_longlong_t)(lr->lrc_txtype & ~TX_CI),
+ (lr->lrc_txtype & TX_CI) ? "CI" : "");
+
+ return (error);
+}
+
+static int
zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
{
zil_replay_arg_t *zr = zra;
const zil_header_t *zh = zilog->zl_header;
uint64_t reclen = lr->lrc_reclen;
uint64_t txtype = lr->lrc_txtype;
- char *name;
- int pass, error;
-
- if (!zilog->zl_replay) /* giving up */
- return;
+ int error = 0;
- if (lr->lrc_txg < claim_txg) /* already committed */
- return;
+ zilog->zl_replaying_seq = lr->lrc_seq;
if (lr->lrc_seq <= zh->zh_replay_seq) /* already replayed */
- return;
+ return (0);
+
+ if (lr->lrc_txg < claim_txg) /* already committed */
+ return (0);
/* Strip case-insensitive bit, still present in log record */
txtype &= ~TX_CI;
- if (txtype == 0 || txtype >= TX_MAX_TYPE) {
- error = EINVAL;
- goto bad;
+ if (txtype == 0 || txtype >= TX_MAX_TYPE)
+ return (zil_replay_error(zilog, lr, EINVAL));
+
+ /*
+ * If this record type can be logged out of order, the object
+ * (lr_foid) may no longer exist. That's legitimate, not an error.
+ */
+ if (TX_OOO(txtype)) {
+ error = dmu_object_info(zilog->zl_os,
+ ((lr_ooo_t *)lr)->lr_foid, NULL);
+ if (error == ENOENT || error == EEXIST)
+ return (0);
}
/*
* Make a copy of the data so we can revise and extend it.
*/
- bcopy(lr, zr->zr_lrbuf, reclen);
+ bcopy(lr, zr->zr_lr, reclen);
+
+ /*
+ * If this is a TX_WRITE with a blkptr, suck in the data.
+ */
+ if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
+ error = zil_read_log_data(zilog, (lr_write_t *)lr,
+ zr->zr_lr + reclen);
+ if (error)
+ return (zil_replay_error(zilog, lr, error));
+ }
/*
* The log block containing this lr may have been byteswapped
* so that we can easily examine common fields like lrc_txtype.
- * However, the log is a mix of different data types, and only the
+ * However, the log is a mix of different record types, and only the
* replay vectors know how to byteswap their records. Therefore, if
* the lr was byteswapped, undo it before invoking the replay vector.
*/
if (zr->zr_byteswap)
- byteswap_uint64_array(zr->zr_lrbuf, reclen);
-
- /*
- * If this is a TX_WRITE with a blkptr, suck in the data.
- */
- if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
- lr_write_t *lrw = (lr_write_t *)lr;
- blkptr_t *wbp = &lrw->lr_blkptr;
- uint64_t wlen = lrw->lr_length;
- char *wbuf = zr->zr_lrbuf + reclen;
-
- if (BP_IS_HOLE(wbp)) { /* compressed to a hole */
- bzero(wbuf, wlen);
- } else {
- /*
- * A subsequent write may have overwritten this block,
- * in which case wbp may have been been freed and
- * reallocated, and our read of wbp may fail with a
- * checksum error. We can safely ignore this because
- * the later write will provide the correct data.
- */
- zbookmark_t zb;
-
- zb.zb_objset = dmu_objset_id(zilog->zl_os);
- zb.zb_object = lrw->lr_foid;
- zb.zb_level = -1;
- zb.zb_blkid = lrw->lr_offset / BP_GET_LSIZE(wbp);
-
- (void) zio_wait(zio_read(NULL, zilog->zl_spa,
- wbp, wbuf, BP_GET_LSIZE(wbp), NULL, NULL,
- ZIO_PRIORITY_SYNC_READ,
- ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE, &zb));
- (void) memmove(wbuf, wbuf + lrw->lr_blkoff, wlen);
- }
- }
+ byteswap_uint64_array(zr->zr_lr, reclen);
/*
* We must now do two things atomically: replay this log record,
@@ -1531,42 +1664,30 @@ zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
* we did so. At the end of each replay function the sequence number
* is updated if we are in replay mode.
*/
- for (pass = 1; pass <= 2; pass++) {
- zilog->zl_replaying_seq = lr->lrc_seq;
- /* Only byteswap (if needed) on the 1st pass. */
- error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lrbuf,
- zr->zr_byteswap && pass == 1);
-
- if (!error)
- return;
-
+ error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, zr->zr_byteswap);
+ if (error) {
/*
* The DMU's dnode layer doesn't see removes until the txg
* commits, so a subsequent claim can spuriously fail with
* EEXIST. So if we receive any error we try syncing out
- * any removes then retry the transaction.
+ * any removes then retry the transaction. Note that we
+ * specify B_FALSE for byteswap now, so we don't do it twice.
*/
- if (pass == 1)
- txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
+ txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
+ error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, B_FALSE);
+ if (error)
+ return (zil_replay_error(zilog, lr, error));
}
-
-bad:
- ASSERT(error);
- name = kmem_alloc(MAXNAMELEN, KM_SLEEP);
- dmu_objset_name(zr->zr_os, name);
- cmn_err(CE_WARN, "ZFS replay transaction error %d, "
- "dataset %s, seq 0x%llx, txtype %llu %s\n",
- error, name, (u_longlong_t)lr->lrc_seq, (u_longlong_t)txtype,
- (lr->lrc_txtype & TX_CI) ? "CI" : "");
- zilog->zl_replay = B_FALSE;
- kmem_free(name, MAXNAMELEN);
+ return (0);
}
/* ARGSUSED */
-static void
+static int
zil_incr_blks(zilog_t *zilog, blkptr_t *bp, void *arg, uint64_t claim_txg)
{
zilog->zl_replay_blks++;
+
+ return (0);
}
/*
@@ -1584,11 +1705,10 @@ zil_replay(objset_t *os, void *arg, zil_replay_func_t *replay_func[TX_MAX_TYPE])
return;
}
- zr.zr_os = os;
zr.zr_replay = replay_func;
zr.zr_arg = arg;
zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zh->zh_log);
- zr.zr_lrbuf = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
+ zr.zr_lr = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
/*
* Wait for in-progress removes to sync before starting replay.
@@ -1596,69 +1716,42 @@ zil_replay(objset_t *os, void *arg, zil_replay_func_t *replay_func[TX_MAX_TYPE])
txg_wait_synced(zilog->zl_dmu_pool, 0);
zilog->zl_replay = B_TRUE;
- zilog->zl_replay_time = lbolt;
+ zilog->zl_replay_time = ddi_get_lbolt();
ASSERT(zilog->zl_replay_blks == 0);
(void) zil_parse(zilog, zil_incr_blks, zil_replay_log_record, &zr,
zh->zh_claim_txg);
- kmem_free(zr.zr_lrbuf, 2 * SPA_MAXBLOCKSIZE);
+ kmem_free(zr.zr_lr, 2 * SPA_MAXBLOCKSIZE);
zil_destroy(zilog, B_FALSE);
txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
zilog->zl_replay = B_FALSE;
}
-/*
- * Report whether all transactions are committed
- */
-int
-zil_is_committed(zilog_t *zilog)
+boolean_t
+zil_replaying(zilog_t *zilog, dmu_tx_t *tx)
{
- lwb_t *lwb;
- int ret;
-
- mutex_enter(&zilog->zl_lock);
- while (zilog->zl_writer)
- cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
-
- /* recent unpushed intent log transactions? */
- if (!list_is_empty(&zilog->zl_itx_list)) {
- ret = B_FALSE;
- goto out;
- }
-
- /* intent log never used? */
- lwb = list_head(&zilog->zl_lwb_list);
- if (lwb == NULL) {
- ret = B_TRUE;
- goto out;
- }
+ if (zilog->zl_sync == ZFS_SYNC_DISABLED)
+ return (B_TRUE);
- /*
- * more than 1 log buffer means zil_sync() hasn't yet freed
- * entries after a txg has committed
- */
- if (list_next(&zilog->zl_lwb_list, lwb)) {
- ret = B_FALSE;
- goto out;
+ if (zilog->zl_replay) {
+ dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
+ zilog->zl_replayed_seq[dmu_tx_get_txg(tx) & TXG_MASK] =
+ zilog->zl_replaying_seq;
+ return (B_TRUE);
}
- ASSERT(zil_empty(zilog));
- ret = B_TRUE;
-out:
- cv_broadcast(&zilog->zl_cv_writer);
- mutex_exit(&zilog->zl_lock);
- return (ret);
+ return (B_FALSE);
}
/* ARGSUSED */
int
-zil_vdev_offline(char *osname, void *arg)
+zil_vdev_offline(const char *osname, void *arg)
{
objset_t *os;
zilog_t *zilog;
int error;
- error = dmu_objset_open(osname, DMU_OST_ANY, DS_MODE_USER, &os);
+ error = dmu_objset_hold(osname, FTAG, &os);
if (error)
return (error);
@@ -1667,6 +1760,6 @@ zil_vdev_offline(char *osname, void *arg)
error = EEXIST;
else
zil_resume(zilog);
- dmu_objset_close(os);
+ dmu_objset_rele(os, FTAG);
return (error);
}