aboutsummaryrefslogtreecommitdiffstats
path: root/module/zfs/zil.c
diff options
context:
space:
mode:
Diffstat (limited to 'module/zfs/zil.c')
-rw-r--r--module/zfs/zil.c428
1 files changed, 280 insertions, 148 deletions
diff --git a/module/zfs/zil.c b/module/zfs/zil.c
index d887e4900..f2798270a 100644
--- a/module/zfs/zil.c
+++ b/module/zfs/zil.c
@@ -146,6 +146,9 @@ static uint64_t zil_slog_bulk = 768 * 1024;
static kmem_cache_t *zil_lwb_cache;
static kmem_cache_t *zil_zcw_cache;
+static void zil_lwb_commit(zilog_t *zilog, lwb_t *lwb, itx_t *itx);
+static itx_t *zil_itx_clone(itx_t *oitx);
+
static int
zil_bp_compare(const void *x1, const void *x2)
{
@@ -747,20 +750,21 @@ zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, boolean_t slog, uint64_t txg,
lwb->lwb_blk = *bp;
lwb->lwb_fastwrite = fastwrite;
lwb->lwb_slog = slog;
+ lwb->lwb_indirect = B_FALSE;
+ if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
+ lwb->lwb_nused = lwb->lwb_nfilled = sizeof (zil_chain_t);
+ lwb->lwb_sz = BP_GET_LSIZE(bp);
+ } else {
+ lwb->lwb_nused = lwb->lwb_nfilled = 0;
+ lwb->lwb_sz = BP_GET_LSIZE(bp) - sizeof (zil_chain_t);
+ }
lwb->lwb_state = LWB_STATE_CLOSED;
lwb->lwb_buf = zio_buf_alloc(BP_GET_LSIZE(bp));
- lwb->lwb_max_txg = txg;
lwb->lwb_write_zio = NULL;
lwb->lwb_root_zio = NULL;
lwb->lwb_issued_timestamp = 0;
lwb->lwb_issued_txg = 0;
- if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
- lwb->lwb_nused = sizeof (zil_chain_t);
- lwb->lwb_sz = BP_GET_LSIZE(bp);
- } else {
- lwb->lwb_nused = 0;
- lwb->lwb_sz = BP_GET_LSIZE(bp) - sizeof (zil_chain_t);
- }
+ lwb->lwb_max_txg = txg;
mutex_enter(&zilog->zl_lock);
list_insert_tail(&zilog->zl_lwb_list, lwb);
@@ -1397,6 +1401,8 @@ zil_lwb_flush_vdevs_done(zio_t *zio)
zilog->zl_commit_lr_seq = zilog->zl_lr_seq;
}
+ mutex_exit(&zilog->zl_lock);
+
while ((itx = list_remove_head(&lwb->lwb_itxs)) != NULL)
zil_itx_destroy(itx);
@@ -1429,8 +1435,6 @@ zil_lwb_flush_vdevs_done(zio_t *zio)
mutex_exit(&zcw->zcw_lock);
}
- mutex_exit(&zilog->zl_lock);
-
mutex_enter(&zilog->zl_lwb_io_lock);
txg = lwb->lwb_issued_txg;
ASSERT3U(zilog->zl_lwb_inflight[txg & TXG_MASK], >, 0);
@@ -1666,46 +1670,41 @@ zil_lwb_write_open(zilog_t *zilog, lwb_t *lwb)
EQUIV(lwb->lwb_root_zio == NULL, lwb->lwb_state == LWB_STATE_CLOSED);
EQUIV(lwb->lwb_root_zio != NULL, lwb->lwb_state == LWB_STATE_OPENED);
+ if (lwb->lwb_root_zio != NULL)
+ return;
+
+ lwb->lwb_root_zio = zio_root(zilog->zl_spa,
+ zil_lwb_flush_vdevs_done, lwb, ZIO_FLAG_CANFAIL);
+
+ abd_t *lwb_abd = abd_get_from_buf(lwb->lwb_buf,
+ BP_GET_LSIZE(&lwb->lwb_blk));
+
+ if (!lwb->lwb_slog || zilog->zl_cur_used <= zil_slog_bulk)
+ prio = ZIO_PRIORITY_SYNC_WRITE;
+ else
+ prio = ZIO_PRIORITY_ASYNC_WRITE;
+
SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ]);
/* Lock so zil_sync() doesn't fastwrite_unmark after zio is created */
mutex_enter(&zilog->zl_lock);
- if (lwb->lwb_root_zio == NULL) {
- abd_t *lwb_abd = abd_get_from_buf(lwb->lwb_buf,
- BP_GET_LSIZE(&lwb->lwb_blk));
-
- if (!lwb->lwb_fastwrite) {
- metaslab_fastwrite_mark(zilog->zl_spa, &lwb->lwb_blk);
- lwb->lwb_fastwrite = 1;
- }
-
- if (!lwb->lwb_slog || zilog->zl_cur_used <= zil_slog_bulk)
- prio = ZIO_PRIORITY_SYNC_WRITE;
- else
- prio = ZIO_PRIORITY_ASYNC_WRITE;
-
- lwb->lwb_root_zio = zio_root(zilog->zl_spa,
- zil_lwb_flush_vdevs_done, lwb, ZIO_FLAG_CANFAIL);
- ASSERT3P(lwb->lwb_root_zio, !=, NULL);
+ if (!lwb->lwb_fastwrite) {
+ metaslab_fastwrite_mark(zilog->zl_spa, &lwb->lwb_blk);
+ lwb->lwb_fastwrite = 1;
+ }
- lwb->lwb_write_zio = zio_rewrite(lwb->lwb_root_zio,
- zilog->zl_spa, 0, &lwb->lwb_blk, lwb_abd,
- BP_GET_LSIZE(&lwb->lwb_blk), zil_lwb_write_done, lwb,
- prio, ZIO_FLAG_CANFAIL | ZIO_FLAG_FASTWRITE, &zb);
- ASSERT3P(lwb->lwb_write_zio, !=, NULL);
+ lwb->lwb_write_zio = zio_rewrite(lwb->lwb_root_zio, zilog->zl_spa, 0,
+ &lwb->lwb_blk, lwb_abd, BP_GET_LSIZE(&lwb->lwb_blk),
+ zil_lwb_write_done, lwb, prio,
+ ZIO_FLAG_CANFAIL | ZIO_FLAG_FASTWRITE, &zb);
- lwb->lwb_state = LWB_STATE_OPENED;
+ lwb->lwb_state = LWB_STATE_OPENED;
- zil_lwb_set_zio_dependency(zilog, lwb);
- zilog->zl_last_lwb_opened = lwb;
- }
+ zil_lwb_set_zio_dependency(zilog, lwb);
+ zilog->zl_last_lwb_opened = lwb;
mutex_exit(&zilog->zl_lock);
-
- ASSERT3P(lwb->lwb_root_zio, !=, NULL);
- ASSERT3P(lwb->lwb_write_zio, !=, NULL);
- ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED);
}
/*
@@ -1736,11 +1735,11 @@ static const struct {
static uint_t zil_maxblocksize = SPA_OLD_MAXBLOCKSIZE;
/*
- * Start a log block write and advance to the next log block.
- * Calls are serialized.
+ * Close the log block for being issued and allocate the next one.
+ * Has to be called under zl_issuer_lock to chain more lwbs.
*/
static lwb_t *
-zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb)
+zil_lwb_write_close(zilog_t *zilog, lwb_t *lwb)
{
lwb_t *nlwb = NULL;
zil_chain_t *zilc;
@@ -1748,7 +1747,7 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb)
blkptr_t *bp;
dmu_tx_t *tx;
uint64_t txg;
- uint64_t zil_blksz, wsz;
+ uint64_t zil_blksz;
int i, error;
boolean_t slog;
@@ -1757,16 +1756,17 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb)
ASSERT3P(lwb->lwb_write_zio, !=, NULL);
ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED);
- if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
- zilc = (zil_chain_t *)lwb->lwb_buf;
- bp = &zilc->zc_next_blk;
- } else {
- zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
- bp = &zilc->zc_next_blk;
+ /*
+ * If this lwb includes indirect writes, we have to commit before
+ * creating the transaction, otherwise we may end up in dead lock.
+ */
+ if (lwb->lwb_indirect) {
+ for (itx_t *itx = list_head(&lwb->lwb_itxs); itx;
+ itx = list_next(&lwb->lwb_itxs, itx))
+ zil_lwb_commit(zilog, lwb, itx);
+ lwb->lwb_nused = lwb->lwb_nfilled;
}
- ASSERT(lwb->lwb_nused <= lwb->lwb_sz);
-
/*
* Allocate the next block and save its address in this block
* before writing it in order to establish the log chain.
@@ -1816,17 +1816,13 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb)
zil_blksz = MAX(zil_blksz, zilog->zl_prev_blks[i]);
zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1);
+ if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2)
+ zilc = (zil_chain_t *)lwb->lwb_buf;
+ else
+ zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
+ bp = &zilc->zc_next_blk;
BP_ZERO(bp);
error = zio_alloc_zil(spa, zilog->zl_os, txg, bp, zil_blksz, &slog);
- if (slog) {
- ZIL_STAT_BUMP(zilog, zil_itx_metaslab_slog_count);
- ZIL_STAT_INCR(zilog, zil_itx_metaslab_slog_bytes,
- lwb->lwb_nused);
- } else {
- ZIL_STAT_BUMP(zilog, zil_itx_metaslab_normal_count);
- ZIL_STAT_INCR(zilog, zil_itx_metaslab_normal_bytes,
- lwb->lwb_nused);
- }
if (error == 0) {
ASSERT3U(bp->blk_birth, ==, txg);
bp->blk_cksum = lwb->lwb_blk.blk_cksum;
@@ -1838,17 +1834,47 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb)
nlwb = zil_alloc_lwb(zilog, bp, slog, txg, TRUE);
}
+ lwb->lwb_state = LWB_STATE_ISSUED;
+
+ dmu_tx_commit(tx);
+
+ /*
+ * If there was an allocation failure then nlwb will be null which
+ * forces a txg_wait_synced().
+ */
+ return (nlwb);
+}
+
+/*
+ * Finalize previously closed block and issue the write zio.
+ * Does not require locking.
+ */
+static void
+zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb)
+{
+ zil_chain_t *zilc;
+ int wsz;
+
+ /* Actually fill the lwb with the data if not yet. */
+ if (!lwb->lwb_indirect) {
+ for (itx_t *itx = list_head(&lwb->lwb_itxs); itx;
+ itx = list_next(&lwb->lwb_itxs, itx))
+ zil_lwb_commit(zilog, lwb, itx);
+ lwb->lwb_nused = lwb->lwb_nfilled;
+ }
+
if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
/* For Slim ZIL only write what is used. */
- wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, uint64_t);
- ASSERT3U(wsz, <=, lwb->lwb_sz);
+ wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, int);
+ ASSERT3S(wsz, <=, lwb->lwb_sz);
zio_shrink(lwb->lwb_write_zio, wsz);
wsz = lwb->lwb_write_zio->io_size;
+ zilc = (zil_chain_t *)lwb->lwb_buf;
} else {
wsz = lwb->lwb_sz;
+ zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
}
-
zilc->zc_pad = 0;
zilc->zc_nused = lwb->lwb_nused;
zilc->zc_eck.zec_cksum = lwb->lwb_blk.blk_cksum;
@@ -1858,22 +1884,20 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb)
*/
memset(lwb->lwb_buf + lwb->lwb_nused, 0, wsz - lwb->lwb_nused);
+ if (lwb->lwb_slog) {
+ ZIL_STAT_BUMP(zilog, zil_itx_metaslab_slog_count);
+ ZIL_STAT_INCR(zilog, zil_itx_metaslab_slog_bytes,
+ lwb->lwb_nused);
+ } else {
+ ZIL_STAT_BUMP(zilog, zil_itx_metaslab_normal_count);
+ ZIL_STAT_INCR(zilog, zil_itx_metaslab_normal_bytes,
+ lwb->lwb_nused);
+ }
spa_config_enter(zilog->zl_spa, SCL_STATE, lwb, RW_READER);
-
zil_lwb_add_block(lwb, &lwb->lwb_blk);
lwb->lwb_issued_timestamp = gethrtime();
- lwb->lwb_state = LWB_STATE_ISSUED;
-
zio_nowait(lwb->lwb_root_zio);
zio_nowait(lwb->lwb_write_zio);
-
- dmu_tx_commit(tx);
-
- /*
- * If there was an allocation failure then nlwb will be null which
- * forces a txg_wait_synced().
- */
- return (nlwb);
}
/*
@@ -1909,13 +1933,19 @@ zil_max_copied_data(zilog_t *zilog)
sizeof (lr_write_t));
}
+/*
+ * Estimate space needed in the lwb for the itx. Allocate more lwbs or
+ * split the itx as needed, but don't touch the actual transaction data.
+ * Has to be called under zl_issuer_lock to call zil_lwb_write_close()
+ * to chain more lwbs.
+ */
static lwb_t *
-zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
+zil_lwb_assign(zilog_t *zilog, lwb_t *lwb, itx_t *itx, list_t *ilwbs)
{
- lr_t *lrcb, *lrc;
- lr_write_t *lrwb, *lrw;
- char *lr_buf;
- uint64_t dlen, dnow, dpad, lwb_sp, reclen, txg, max_log_data;
+ itx_t *citx;
+ lr_t *lr, *clr;
+ lr_write_t *lrw;
+ uint64_t dlen, dnow, lwb_sp, reclen, max_log_data;
ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock));
ASSERT3P(lwb, !=, NULL);
@@ -1923,8 +1953,8 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
zil_lwb_write_open(zilog, lwb);
- lrc = &itx->itx_lr;
- lrw = (lr_write_t *)lrc;
+ lr = &itx->itx_lr;
+ lrw = (lr_write_t *)lr;
/*
* A commit itx doesn't represent any on-disk state; instead
@@ -1938,24 +1968,23 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
*
* For more details, see the comment above zil_commit().
*/
- if (lrc->lrc_txtype == TX_COMMIT) {
+ if (lr->lrc_txtype == TX_COMMIT) {
mutex_enter(&zilog->zl_lock);
zil_commit_waiter_link_lwb(itx->itx_private, lwb);
itx->itx_private = NULL;
mutex_exit(&zilog->zl_lock);
+ list_insert_tail(&lwb->lwb_itxs, itx);
return (lwb);
}
- if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY) {
+ if (lr->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY) {
dlen = P2ROUNDUP_TYPED(
lrw->lr_length, sizeof (uint64_t), uint64_t);
- dpad = dlen - lrw->lr_length;
} else {
- dlen = dpad = 0;
+ dlen = 0;
}
- reclen = lrc->lrc_reclen;
+ reclen = lr->lrc_reclen;
zilog->zl_cur_used += (reclen + dlen);
- txg = lrc->lrc_txg;
cont:
/*
@@ -1968,7 +1997,8 @@ cont:
lwb_sp < zil_max_waste_space(zilog) &&
(dlen % max_log_data == 0 ||
lwb_sp < reclen + dlen % max_log_data))) {
- lwb = zil_lwb_write_issue(zilog, lwb);
+ list_insert_tail(ilwbs, lwb);
+ lwb = zil_lwb_write_close(zilog, lwb);
if (lwb == NULL)
return (NULL);
zil_lwb_write_open(zilog, lwb);
@@ -1987,19 +2017,99 @@ cont:
}
dnow = MIN(dlen, lwb_sp - reclen);
- lr_buf = lwb->lwb_buf + lwb->lwb_nused;
- memcpy(lr_buf, lrc, reclen);
- lrcb = (lr_t *)lr_buf; /* Like lrc, but inside lwb. */
- lrwb = (lr_write_t *)lrcb; /* Like lrw, but inside lwb. */
+ if (dlen > dnow) {
+ ASSERT3U(lr->lrc_txtype, ==, TX_WRITE);
+ ASSERT3U(itx->itx_wr_state, ==, WR_NEED_COPY);
+ citx = zil_itx_clone(itx);
+ clr = &citx->itx_lr;
+ lr_write_t *clrw = (lr_write_t *)clr;
+ clrw->lr_length = dnow;
+ lrw->lr_offset += dnow;
+ lrw->lr_length -= dnow;
+ } else {
+ citx = itx;
+ clr = lr;
+ }
+
+ /*
+ * We're actually making an entry, so update lrc_seq to be the
+ * log record sequence number. Note that this is generally not
+ * equal to the itx sequence number because not all transactions
+ * are synchronous, and sometimes spa_sync() gets there first.
+ */
+ clr->lrc_seq = ++zilog->zl_lr_seq;
+
+ lwb->lwb_nused += reclen + dnow;
+ ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz);
+ ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)));
+
+ zil_lwb_add_txg(lwb, lr->lrc_txg);
+ list_insert_tail(&lwb->lwb_itxs, citx);
+
+ dlen -= dnow;
+ if (dlen > 0) {
+ zilog->zl_cur_used += reclen;
+ goto cont;
+ }
+
+ /*
+ * We have to really issue all queued LWBs before we may have to
+ * wait for a txg sync. Otherwise we may end up in a dead lock.
+ */
+ if (lr->lrc_txtype == TX_WRITE) {
+ boolean_t frozen = lr->lrc_txg > spa_freeze_txg(zilog->zl_spa);
+ if (frozen || itx->itx_wr_state == WR_INDIRECT) {
+ lwb_t *tlwb;
+ while ((tlwb = list_remove_head(ilwbs)) != NULL)
+ zil_lwb_write_issue(zilog, tlwb);
+ }
+ if (itx->itx_wr_state == WR_INDIRECT)
+ lwb->lwb_indirect = B_TRUE;
+ if (frozen)
+ txg_wait_synced(zilog->zl_dmu_pool, lr->lrc_txg);
+ }
+
+ return (lwb);
+}
+
+/*
+ * Fill the actual transaction data into the lwb, following zil_lwb_assign().
+ * Does not require locking.
+ */
+static void
+zil_lwb_commit(zilog_t *zilog, lwb_t *lwb, itx_t *itx)
+{
+ lr_t *lr, *lrb;
+ lr_write_t *lrw, *lrwb;
+ char *lr_buf;
+ uint64_t dlen, reclen;
+
+ lr = &itx->itx_lr;
+ lrw = (lr_write_t *)lr;
+
+ if (lr->lrc_txtype == TX_COMMIT)
+ return;
+
+ if (lr->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY) {
+ dlen = P2ROUNDUP_TYPED(
+ lrw->lr_length, sizeof (uint64_t), uint64_t);
+ } else {
+ dlen = 0;
+ }
+ reclen = lr->lrc_reclen;
+ ASSERT3U(reclen + dlen, <=, lwb->lwb_nused - lwb->lwb_nfilled);
+
+ lr_buf = lwb->lwb_buf + lwb->lwb_nfilled;
+ memcpy(lr_buf, lr, reclen);
+ lrb = (lr_t *)lr_buf; /* Like lr, but inside lwb. */
+ lrwb = (lr_write_t *)lrb; /* Like lrw, but inside lwb. */
ZIL_STAT_BUMP(zilog, zil_itx_count);
/*
* If it's a write, fetch the data or get its blkptr as appropriate.
*/
- if (lrc->lrc_txtype == TX_WRITE) {
- if (txg > spa_freeze_txg(zilog->zl_spa))
- txg_wait_synced(zilog->zl_dmu_pool, txg);
+ if (lr->lrc_txtype == TX_WRITE) {
if (itx->itx_wr_state == WR_COPIED) {
ZIL_STAT_BUMP(zilog, zil_itx_copied_count);
ZIL_STAT_INCR(zilog, zil_itx_copied_bytes,
@@ -2010,14 +2120,10 @@ cont:
if (itx->itx_wr_state == WR_NEED_COPY) {
dbuf = lr_buf + reclen;
- lrcb->lrc_reclen += dnow;
- if (lrwb->lr_length > dnow)
- lrwb->lr_length = dnow;
- lrw->lr_offset += dnow;
- lrw->lr_length -= dnow;
+ lrb->lrc_reclen += dlen;
ZIL_STAT_BUMP(zilog, zil_itx_needcopy_count);
ZIL_STAT_INCR(zilog, zil_itx_needcopy_bytes,
- dnow);
+ dlen);
} else {
ASSERT3S(itx->itx_wr_state, ==, WR_INDIRECT);
dbuf = NULL;
@@ -2044,9 +2150,11 @@ cont:
error = zilog->zl_get_data(itx->itx_private,
itx->itx_gen, lrwb, dbuf, lwb,
lwb->lwb_write_zio);
- if (dbuf != NULL && error == 0 && dnow == dlen)
+ if (dbuf != NULL && error == 0) {
/* Zero any padding bytes in the last block. */
- memset((char *)dbuf + lrwb->lr_length, 0, dpad);
+ memset((char *)dbuf + lrwb->lr_length, 0,
+ dlen - lrwb->lr_length);
+ }
/*
* Typically, the only return values we should see from
@@ -2074,39 +2182,26 @@ cont:
error);
zfs_fallthrough;
case EIO:
- txg_wait_synced(zilog->zl_dmu_pool, txg);
+ if (lwb->lwb_indirect) {
+ txg_wait_synced(zilog->zl_dmu_pool,
+ lr->lrc_txg);
+ } else {
+ lwb->lwb_write_zio->io_error = error;
+ }
zfs_fallthrough;
case ENOENT:
zfs_fallthrough;
case EEXIST:
zfs_fallthrough;
case EALREADY:
- return (lwb);
+ return;
}
}
}
- /*
- * We're actually making an entry, so update lrc_seq to be the
- * log record sequence number. Note that this is generally not
- * equal to the itx sequence number because not all transactions
- * are synchronous, and sometimes spa_sync() gets there first.
- */
- lrcb->lrc_seq = ++zilog->zl_lr_seq;
- lwb->lwb_nused += reclen + dnow;
-
- zil_lwb_add_txg(lwb, txg);
-
- ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz);
- ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)));
-
- dlen -= dnow;
- if (dlen > 0) {
- zilog->zl_cur_used += reclen;
- goto cont;
- }
-
- return (lwb);
+ lwb->lwb_nfilled += reclen + dlen;
+ ASSERT3S(lwb->lwb_nfilled, <=, lwb->lwb_nused);
+ ASSERT0(P2PHASE(lwb->lwb_nfilled, sizeof (uint64_t)));
}
itx_t *
@@ -2131,6 +2226,16 @@ zil_itx_create(uint64_t txtype, size_t olrsize)
return (itx);
}
+static itx_t *
+zil_itx_clone(itx_t *oitx)
+{
+ itx_t *itx = zio_data_buf_alloc(oitx->itx_size);
+ memcpy(itx, oitx, oitx->itx_size);
+ itx->itx_callback = NULL;
+ itx->itx_callback_data = NULL;
+ return (itx);
+}
+
void
zil_itx_destroy(itx_t *itx)
{
@@ -2162,7 +2267,7 @@ zil_itxg_clean(void *arg)
/*
* In the general case, commit itxs will not be found
* here, as they'll be committed to an lwb via
- * zil_lwb_commit(), and free'd in that function. Having
+ * zil_lwb_assign(), and free'd in that function. Having
* said that, it is still possible for commit itxs to be
* found here, due to the following race:
*
@@ -2561,7 +2666,7 @@ zil_commit_writer_stall(zilog_t *zilog)
* lwb will be issued to the zio layer to be written to disk.
*/
static void
-zil_process_commit_list(zilog_t *zilog)
+zil_process_commit_list(zilog_t *zilog, zil_commit_waiter_t *zcw, list_t *ilwbs)
{
spa_t *spa = zilog->zl_spa;
list_t nolwb_itxs;
@@ -2663,18 +2768,23 @@ zil_process_commit_list(zilog_t *zilog)
*/
if (frozen || !synced || lrc->lrc_txtype == TX_COMMIT) {
if (lwb != NULL) {
- lwb = zil_lwb_commit(zilog, itx, lwb);
-
- if (lwb == NULL)
+ lwb = zil_lwb_assign(zilog, lwb, itx, ilwbs);
+ if (lwb == NULL) {
list_insert_tail(&nolwb_itxs, itx);
- else
- list_insert_tail(&lwb->lwb_itxs, itx);
+ } else if ((zcw->zcw_lwb != NULL &&
+ zcw->zcw_lwb != lwb) || zcw->zcw_done) {
+ /*
+ * Our lwb is done, leave the rest of
+ * itx list to somebody else who care.
+ */
+ first = B_FALSE;
+ break;
+ }
} else {
if (lrc->lrc_txtype == TX_COMMIT) {
zil_commit_waiter_link_nolwb(
itx->itx_private, &nolwb_waiters);
}
-
list_insert_tail(&nolwb_itxs, itx);
}
} else {
@@ -2690,6 +2800,8 @@ zil_process_commit_list(zilog_t *zilog)
* the ZIL write pipeline; see the comment within
* zil_commit_writer_stall() for more details.
*/
+ while ((lwb = list_remove_head(ilwbs)) != NULL)
+ zil_lwb_write_issue(zilog, lwb);
zil_commit_writer_stall(zilog);
/*
@@ -2735,13 +2847,13 @@ zil_process_commit_list(zilog_t *zilog)
* on the system, such that this function will be
* immediately called again (not necessarily by the same
* thread) and this lwb's zio will be issued via
- * zil_lwb_commit(). This way, the lwb is guaranteed to
+ * zil_lwb_assign(). This way, the lwb is guaranteed to
* be "full" when it is issued to disk, and we'll make
* use of the lwb's size the best we can.
*
* 2. If there isn't sufficient ZIL activity occurring on
* the system, such that this lwb's zio isn't issued via
- * zil_lwb_commit(), zil_commit_waiter() will issue the
+ * zil_lwb_assign(), zil_commit_waiter() will issue the
* lwb's zio. If this occurs, the lwb is not guaranteed
* to be "full" by the time its zio is issued, and means
* the size of the lwb was "too large" given the amount
@@ -2773,10 +2885,15 @@ zil_process_commit_list(zilog_t *zilog)
zfs_commit_timeout_pct / 100;
if (sleep < zil_min_commit_timeout ||
lwb->lwb_sz - lwb->lwb_nused < lwb->lwb_sz / 8) {
- lwb = zil_lwb_write_issue(zilog, lwb);
+ list_insert_tail(ilwbs, lwb);
+ lwb = zil_lwb_write_close(zilog, lwb);
zilog->zl_cur_used = 0;
- if (lwb == NULL)
+ if (lwb == NULL) {
+ while ((lwb = list_remove_head(ilwbs))
+ != NULL)
+ zil_lwb_write_issue(zilog, lwb);
zil_commit_writer_stall(zilog);
+ }
}
}
}
@@ -2799,9 +2916,13 @@ zil_process_commit_list(zilog_t *zilog)
static void
zil_commit_writer(zilog_t *zilog, zil_commit_waiter_t *zcw)
{
+ list_t ilwbs;
+ lwb_t *lwb;
+
ASSERT(!MUTEX_HELD(&zilog->zl_lock));
ASSERT(spa_writeable(zilog->zl_spa));
+ list_create(&ilwbs, sizeof (lwb_t), offsetof(lwb_t, lwb_issue_node));
mutex_enter(&zilog->zl_issuer_lock);
if (zcw->zcw_lwb != NULL || zcw->zcw_done) {
@@ -2828,10 +2949,13 @@ zil_commit_writer(zilog_t *zilog, zil_commit_waiter_t *zcw)
zil_get_commit_list(zilog);
zil_prune_commit_list(zilog);
- zil_process_commit_list(zilog);
+ zil_process_commit_list(zilog, zcw, &ilwbs);
out:
mutex_exit(&zilog->zl_issuer_lock);
+ while ((lwb = list_remove_head(&ilwbs)) != NULL)
+ zil_lwb_write_issue(zilog, lwb);
+ list_destroy(&ilwbs);
}
static void
@@ -2858,7 +2982,7 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
return;
/*
- * In order to call zil_lwb_write_issue() we must hold the
+ * In order to call zil_lwb_write_close() we must hold the
* zilog's "zl_issuer_lock". We can't simply acquire that lock,
* since we're already holding the commit waiter's "zcw_lock",
* and those two locks are acquired in the opposite order
@@ -2876,8 +3000,10 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
* the waiter is marked "done"), so without this check we could
* wind up with a use-after-free error below.
*/
- if (zcw->zcw_done)
+ if (zcw->zcw_done) {
+ lwb = NULL;
goto out;
+ }
ASSERT3P(lwb, ==, zcw->zcw_lwb);
@@ -2896,15 +3022,17 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
* if it's ISSUED or OPENED, and block any other threads that might
* attempt to issue this lwb. For that reason we hold the
* zl_issuer_lock when checking the lwb_state; we must not call
- * zil_lwb_write_issue() if the lwb had already been issued.
+ * zil_lwb_write_close() if the lwb had already been issued.
*
* See the comment above the lwb_state_t structure definition for
* more details on the lwb states, and locking requirements.
*/
if (lwb->lwb_state == LWB_STATE_ISSUED ||
lwb->lwb_state == LWB_STATE_WRITE_DONE ||
- lwb->lwb_state == LWB_STATE_FLUSH_DONE)
+ lwb->lwb_state == LWB_STATE_FLUSH_DONE) {
+ lwb = NULL;
goto out;
+ }
ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED);
@@ -2914,7 +3042,7 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
* since we've reached the commit waiter's timeout and it still
* hasn't been issued.
*/
- lwb_t *nlwb = zil_lwb_write_issue(zilog, lwb);
+ lwb_t *nlwb = zil_lwb_write_close(zilog, lwb);
ASSERT3S(lwb->lwb_state, !=, LWB_STATE_OPENED);
@@ -2934,7 +3062,7 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
if (nlwb == NULL) {
/*
- * When zil_lwb_write_issue() returns NULL, this
+ * When zil_lwb_write_close() returns NULL, this
* indicates zio_alloc_zil() failed to allocate the
* "next" lwb on-disk. When this occurs, the ZIL write
* pipeline must be stalled; see the comment within the
@@ -2956,12 +3084,16 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw)
* lock, which occurs prior to calling dmu_tx_commit()
*/
mutex_exit(&zcw->zcw_lock);
+ zil_lwb_write_issue(zilog, lwb);
+ lwb = NULL;
zil_commit_writer_stall(zilog);
mutex_enter(&zcw->zcw_lock);
}
out:
mutex_exit(&zilog->zl_issuer_lock);
+ if (lwb)
+ zil_lwb_write_issue(zilog, lwb);
ASSERT(MUTEX_HELD(&zcw->zcw_lock));
}
@@ -2976,7 +3108,7 @@ out:
* waited "long enough" and the lwb is still in the "open" state.
*
* Given a sufficient amount of itxs being generated and written using
- * the ZIL, the lwb's zio will be issued via the zil_lwb_commit()
+ * the ZIL, the lwb's zio will be issued via the zil_lwb_assign()
* function. If this does not occur, this secondary responsibility will
* ensure the lwb is issued even if there is not other synchronous
* activity on the system.
@@ -3656,7 +3788,7 @@ zil_close(zilog_t *zilog)
/*
* zl_lwb_max_issued_txg may be larger than lwb_max_txg. It depends
* on the time when the dmu_tx transaction is assigned in
- * zil_lwb_write_issue().
+ * zil_lwb_write_close().
*/
mutex_enter(&zilog->zl_lwb_io_lock);
txg = MAX(zilog->zl_lwb_max_issued_txg, txg);