diff options
Diffstat (limited to 'module/zfs')
-rw-r--r-- | module/zfs/zil.c | 428 |
1 files changed, 280 insertions, 148 deletions
diff --git a/module/zfs/zil.c b/module/zfs/zil.c index d887e4900..f2798270a 100644 --- a/module/zfs/zil.c +++ b/module/zfs/zil.c @@ -146,6 +146,9 @@ static uint64_t zil_slog_bulk = 768 * 1024; static kmem_cache_t *zil_lwb_cache; static kmem_cache_t *zil_zcw_cache; +static void zil_lwb_commit(zilog_t *zilog, lwb_t *lwb, itx_t *itx); +static itx_t *zil_itx_clone(itx_t *oitx); + static int zil_bp_compare(const void *x1, const void *x2) { @@ -747,20 +750,21 @@ zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, boolean_t slog, uint64_t txg, lwb->lwb_blk = *bp; lwb->lwb_fastwrite = fastwrite; lwb->lwb_slog = slog; + lwb->lwb_indirect = B_FALSE; + if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) { + lwb->lwb_nused = lwb->lwb_nfilled = sizeof (zil_chain_t); + lwb->lwb_sz = BP_GET_LSIZE(bp); + } else { + lwb->lwb_nused = lwb->lwb_nfilled = 0; + lwb->lwb_sz = BP_GET_LSIZE(bp) - sizeof (zil_chain_t); + } lwb->lwb_state = LWB_STATE_CLOSED; lwb->lwb_buf = zio_buf_alloc(BP_GET_LSIZE(bp)); - lwb->lwb_max_txg = txg; lwb->lwb_write_zio = NULL; lwb->lwb_root_zio = NULL; lwb->lwb_issued_timestamp = 0; lwb->lwb_issued_txg = 0; - if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) { - lwb->lwb_nused = sizeof (zil_chain_t); - lwb->lwb_sz = BP_GET_LSIZE(bp); - } else { - lwb->lwb_nused = 0; - lwb->lwb_sz = BP_GET_LSIZE(bp) - sizeof (zil_chain_t); - } + lwb->lwb_max_txg = txg; mutex_enter(&zilog->zl_lock); list_insert_tail(&zilog->zl_lwb_list, lwb); @@ -1397,6 +1401,8 @@ zil_lwb_flush_vdevs_done(zio_t *zio) zilog->zl_commit_lr_seq = zilog->zl_lr_seq; } + mutex_exit(&zilog->zl_lock); + while ((itx = list_remove_head(&lwb->lwb_itxs)) != NULL) zil_itx_destroy(itx); @@ -1429,8 +1435,6 @@ zil_lwb_flush_vdevs_done(zio_t *zio) mutex_exit(&zcw->zcw_lock); } - mutex_exit(&zilog->zl_lock); - mutex_enter(&zilog->zl_lwb_io_lock); txg = lwb->lwb_issued_txg; ASSERT3U(zilog->zl_lwb_inflight[txg & TXG_MASK], >, 0); @@ -1666,46 +1670,41 @@ zil_lwb_write_open(zilog_t *zilog, lwb_t *lwb) EQUIV(lwb->lwb_root_zio == NULL, lwb->lwb_state == LWB_STATE_CLOSED); EQUIV(lwb->lwb_root_zio != NULL, lwb->lwb_state == LWB_STATE_OPENED); + if (lwb->lwb_root_zio != NULL) + return; + + lwb->lwb_root_zio = zio_root(zilog->zl_spa, + zil_lwb_flush_vdevs_done, lwb, ZIO_FLAG_CANFAIL); + + abd_t *lwb_abd = abd_get_from_buf(lwb->lwb_buf, + BP_GET_LSIZE(&lwb->lwb_blk)); + + if (!lwb->lwb_slog || zilog->zl_cur_used <= zil_slog_bulk) + prio = ZIO_PRIORITY_SYNC_WRITE; + else + prio = ZIO_PRIORITY_ASYNC_WRITE; + SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET], ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ]); /* Lock so zil_sync() doesn't fastwrite_unmark after zio is created */ mutex_enter(&zilog->zl_lock); - if (lwb->lwb_root_zio == NULL) { - abd_t *lwb_abd = abd_get_from_buf(lwb->lwb_buf, - BP_GET_LSIZE(&lwb->lwb_blk)); - - if (!lwb->lwb_fastwrite) { - metaslab_fastwrite_mark(zilog->zl_spa, &lwb->lwb_blk); - lwb->lwb_fastwrite = 1; - } - - if (!lwb->lwb_slog || zilog->zl_cur_used <= zil_slog_bulk) - prio = ZIO_PRIORITY_SYNC_WRITE; - else - prio = ZIO_PRIORITY_ASYNC_WRITE; - - lwb->lwb_root_zio = zio_root(zilog->zl_spa, - zil_lwb_flush_vdevs_done, lwb, ZIO_FLAG_CANFAIL); - ASSERT3P(lwb->lwb_root_zio, !=, NULL); + if (!lwb->lwb_fastwrite) { + metaslab_fastwrite_mark(zilog->zl_spa, &lwb->lwb_blk); + lwb->lwb_fastwrite = 1; + } - lwb->lwb_write_zio = zio_rewrite(lwb->lwb_root_zio, - zilog->zl_spa, 0, &lwb->lwb_blk, lwb_abd, - BP_GET_LSIZE(&lwb->lwb_blk), zil_lwb_write_done, lwb, - prio, ZIO_FLAG_CANFAIL | ZIO_FLAG_FASTWRITE, &zb); - ASSERT3P(lwb->lwb_write_zio, !=, NULL); + lwb->lwb_write_zio = zio_rewrite(lwb->lwb_root_zio, zilog->zl_spa, 0, + &lwb->lwb_blk, lwb_abd, BP_GET_LSIZE(&lwb->lwb_blk), + zil_lwb_write_done, lwb, prio, + ZIO_FLAG_CANFAIL | ZIO_FLAG_FASTWRITE, &zb); - lwb->lwb_state = LWB_STATE_OPENED; + lwb->lwb_state = LWB_STATE_OPENED; - zil_lwb_set_zio_dependency(zilog, lwb); - zilog->zl_last_lwb_opened = lwb; - } + zil_lwb_set_zio_dependency(zilog, lwb); + zilog->zl_last_lwb_opened = lwb; mutex_exit(&zilog->zl_lock); - - ASSERT3P(lwb->lwb_root_zio, !=, NULL); - ASSERT3P(lwb->lwb_write_zio, !=, NULL); - ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED); } /* @@ -1736,11 +1735,11 @@ static const struct { static uint_t zil_maxblocksize = SPA_OLD_MAXBLOCKSIZE; /* - * Start a log block write and advance to the next log block. - * Calls are serialized. + * Close the log block for being issued and allocate the next one. + * Has to be called under zl_issuer_lock to chain more lwbs. */ static lwb_t * -zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb) +zil_lwb_write_close(zilog_t *zilog, lwb_t *lwb) { lwb_t *nlwb = NULL; zil_chain_t *zilc; @@ -1748,7 +1747,7 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb) blkptr_t *bp; dmu_tx_t *tx; uint64_t txg; - uint64_t zil_blksz, wsz; + uint64_t zil_blksz; int i, error; boolean_t slog; @@ -1757,16 +1756,17 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb) ASSERT3P(lwb->lwb_write_zio, !=, NULL); ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED); - if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) { - zilc = (zil_chain_t *)lwb->lwb_buf; - bp = &zilc->zc_next_blk; - } else { - zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz); - bp = &zilc->zc_next_blk; + /* + * If this lwb includes indirect writes, we have to commit before + * creating the transaction, otherwise we may end up in dead lock. + */ + if (lwb->lwb_indirect) { + for (itx_t *itx = list_head(&lwb->lwb_itxs); itx; + itx = list_next(&lwb->lwb_itxs, itx)) + zil_lwb_commit(zilog, lwb, itx); + lwb->lwb_nused = lwb->lwb_nfilled; } - ASSERT(lwb->lwb_nused <= lwb->lwb_sz); - /* * Allocate the next block and save its address in this block * before writing it in order to establish the log chain. @@ -1816,17 +1816,13 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb) zil_blksz = MAX(zil_blksz, zilog->zl_prev_blks[i]); zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1); + if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) + zilc = (zil_chain_t *)lwb->lwb_buf; + else + zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz); + bp = &zilc->zc_next_blk; BP_ZERO(bp); error = zio_alloc_zil(spa, zilog->zl_os, txg, bp, zil_blksz, &slog); - if (slog) { - ZIL_STAT_BUMP(zilog, zil_itx_metaslab_slog_count); - ZIL_STAT_INCR(zilog, zil_itx_metaslab_slog_bytes, - lwb->lwb_nused); - } else { - ZIL_STAT_BUMP(zilog, zil_itx_metaslab_normal_count); - ZIL_STAT_INCR(zilog, zil_itx_metaslab_normal_bytes, - lwb->lwb_nused); - } if (error == 0) { ASSERT3U(bp->blk_birth, ==, txg); bp->blk_cksum = lwb->lwb_blk.blk_cksum; @@ -1838,17 +1834,47 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb) nlwb = zil_alloc_lwb(zilog, bp, slog, txg, TRUE); } + lwb->lwb_state = LWB_STATE_ISSUED; + + dmu_tx_commit(tx); + + /* + * If there was an allocation failure then nlwb will be null which + * forces a txg_wait_synced(). + */ + return (nlwb); +} + +/* + * Finalize previously closed block and issue the write zio. + * Does not require locking. + */ +static void +zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb) +{ + zil_chain_t *zilc; + int wsz; + + /* Actually fill the lwb with the data if not yet. */ + if (!lwb->lwb_indirect) { + for (itx_t *itx = list_head(&lwb->lwb_itxs); itx; + itx = list_next(&lwb->lwb_itxs, itx)) + zil_lwb_commit(zilog, lwb, itx); + lwb->lwb_nused = lwb->lwb_nfilled; + } + if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) { /* For Slim ZIL only write what is used. */ - wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, uint64_t); - ASSERT3U(wsz, <=, lwb->lwb_sz); + wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, int); + ASSERT3S(wsz, <=, lwb->lwb_sz); zio_shrink(lwb->lwb_write_zio, wsz); wsz = lwb->lwb_write_zio->io_size; + zilc = (zil_chain_t *)lwb->lwb_buf; } else { wsz = lwb->lwb_sz; + zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz); } - zilc->zc_pad = 0; zilc->zc_nused = lwb->lwb_nused; zilc->zc_eck.zec_cksum = lwb->lwb_blk.blk_cksum; @@ -1858,22 +1884,20 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb) */ memset(lwb->lwb_buf + lwb->lwb_nused, 0, wsz - lwb->lwb_nused); + if (lwb->lwb_slog) { + ZIL_STAT_BUMP(zilog, zil_itx_metaslab_slog_count); + ZIL_STAT_INCR(zilog, zil_itx_metaslab_slog_bytes, + lwb->lwb_nused); + } else { + ZIL_STAT_BUMP(zilog, zil_itx_metaslab_normal_count); + ZIL_STAT_INCR(zilog, zil_itx_metaslab_normal_bytes, + lwb->lwb_nused); + } spa_config_enter(zilog->zl_spa, SCL_STATE, lwb, RW_READER); - zil_lwb_add_block(lwb, &lwb->lwb_blk); lwb->lwb_issued_timestamp = gethrtime(); - lwb->lwb_state = LWB_STATE_ISSUED; - zio_nowait(lwb->lwb_root_zio); zio_nowait(lwb->lwb_write_zio); - - dmu_tx_commit(tx); - - /* - * If there was an allocation failure then nlwb will be null which - * forces a txg_wait_synced(). - */ - return (nlwb); } /* @@ -1909,13 +1933,19 @@ zil_max_copied_data(zilog_t *zilog) sizeof (lr_write_t)); } +/* + * Estimate space needed in the lwb for the itx. Allocate more lwbs or + * split the itx as needed, but don't touch the actual transaction data. + * Has to be called under zl_issuer_lock to call zil_lwb_write_close() + * to chain more lwbs. + */ static lwb_t * -zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb) +zil_lwb_assign(zilog_t *zilog, lwb_t *lwb, itx_t *itx, list_t *ilwbs) { - lr_t *lrcb, *lrc; - lr_write_t *lrwb, *lrw; - char *lr_buf; - uint64_t dlen, dnow, dpad, lwb_sp, reclen, txg, max_log_data; + itx_t *citx; + lr_t *lr, *clr; + lr_write_t *lrw; + uint64_t dlen, dnow, lwb_sp, reclen, max_log_data; ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock)); ASSERT3P(lwb, !=, NULL); @@ -1923,8 +1953,8 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb) zil_lwb_write_open(zilog, lwb); - lrc = &itx->itx_lr; - lrw = (lr_write_t *)lrc; + lr = &itx->itx_lr; + lrw = (lr_write_t *)lr; /* * A commit itx doesn't represent any on-disk state; instead @@ -1938,24 +1968,23 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb) * * For more details, see the comment above zil_commit(). */ - if (lrc->lrc_txtype == TX_COMMIT) { + if (lr->lrc_txtype == TX_COMMIT) { mutex_enter(&zilog->zl_lock); zil_commit_waiter_link_lwb(itx->itx_private, lwb); itx->itx_private = NULL; mutex_exit(&zilog->zl_lock); + list_insert_tail(&lwb->lwb_itxs, itx); return (lwb); } - if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY) { + if (lr->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY) { dlen = P2ROUNDUP_TYPED( lrw->lr_length, sizeof (uint64_t), uint64_t); - dpad = dlen - lrw->lr_length; } else { - dlen = dpad = 0; + dlen = 0; } - reclen = lrc->lrc_reclen; + reclen = lr->lrc_reclen; zilog->zl_cur_used += (reclen + dlen); - txg = lrc->lrc_txg; cont: /* @@ -1968,7 +1997,8 @@ cont: lwb_sp < zil_max_waste_space(zilog) && (dlen % max_log_data == 0 || lwb_sp < reclen + dlen % max_log_data))) { - lwb = zil_lwb_write_issue(zilog, lwb); + list_insert_tail(ilwbs, lwb); + lwb = zil_lwb_write_close(zilog, lwb); if (lwb == NULL) return (NULL); zil_lwb_write_open(zilog, lwb); @@ -1987,19 +2017,99 @@ cont: } dnow = MIN(dlen, lwb_sp - reclen); - lr_buf = lwb->lwb_buf + lwb->lwb_nused; - memcpy(lr_buf, lrc, reclen); - lrcb = (lr_t *)lr_buf; /* Like lrc, but inside lwb. */ - lrwb = (lr_write_t *)lrcb; /* Like lrw, but inside lwb. */ + if (dlen > dnow) { + ASSERT3U(lr->lrc_txtype, ==, TX_WRITE); + ASSERT3U(itx->itx_wr_state, ==, WR_NEED_COPY); + citx = zil_itx_clone(itx); + clr = &citx->itx_lr; + lr_write_t *clrw = (lr_write_t *)clr; + clrw->lr_length = dnow; + lrw->lr_offset += dnow; + lrw->lr_length -= dnow; + } else { + citx = itx; + clr = lr; + } + + /* + * We're actually making an entry, so update lrc_seq to be the + * log record sequence number. Note that this is generally not + * equal to the itx sequence number because not all transactions + * are synchronous, and sometimes spa_sync() gets there first. + */ + clr->lrc_seq = ++zilog->zl_lr_seq; + + lwb->lwb_nused += reclen + dnow; + ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz); + ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t))); + + zil_lwb_add_txg(lwb, lr->lrc_txg); + list_insert_tail(&lwb->lwb_itxs, citx); + + dlen -= dnow; + if (dlen > 0) { + zilog->zl_cur_used += reclen; + goto cont; + } + + /* + * We have to really issue all queued LWBs before we may have to + * wait for a txg sync. Otherwise we may end up in a dead lock. + */ + if (lr->lrc_txtype == TX_WRITE) { + boolean_t frozen = lr->lrc_txg > spa_freeze_txg(zilog->zl_spa); + if (frozen || itx->itx_wr_state == WR_INDIRECT) { + lwb_t *tlwb; + while ((tlwb = list_remove_head(ilwbs)) != NULL) + zil_lwb_write_issue(zilog, tlwb); + } + if (itx->itx_wr_state == WR_INDIRECT) + lwb->lwb_indirect = B_TRUE; + if (frozen) + txg_wait_synced(zilog->zl_dmu_pool, lr->lrc_txg); + } + + return (lwb); +} + +/* + * Fill the actual transaction data into the lwb, following zil_lwb_assign(). + * Does not require locking. + */ +static void +zil_lwb_commit(zilog_t *zilog, lwb_t *lwb, itx_t *itx) +{ + lr_t *lr, *lrb; + lr_write_t *lrw, *lrwb; + char *lr_buf; + uint64_t dlen, reclen; + + lr = &itx->itx_lr; + lrw = (lr_write_t *)lr; + + if (lr->lrc_txtype == TX_COMMIT) + return; + + if (lr->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY) { + dlen = P2ROUNDUP_TYPED( + lrw->lr_length, sizeof (uint64_t), uint64_t); + } else { + dlen = 0; + } + reclen = lr->lrc_reclen; + ASSERT3U(reclen + dlen, <=, lwb->lwb_nused - lwb->lwb_nfilled); + + lr_buf = lwb->lwb_buf + lwb->lwb_nfilled; + memcpy(lr_buf, lr, reclen); + lrb = (lr_t *)lr_buf; /* Like lr, but inside lwb. */ + lrwb = (lr_write_t *)lrb; /* Like lrw, but inside lwb. */ ZIL_STAT_BUMP(zilog, zil_itx_count); /* * If it's a write, fetch the data or get its blkptr as appropriate. */ - if (lrc->lrc_txtype == TX_WRITE) { - if (txg > spa_freeze_txg(zilog->zl_spa)) - txg_wait_synced(zilog->zl_dmu_pool, txg); + if (lr->lrc_txtype == TX_WRITE) { if (itx->itx_wr_state == WR_COPIED) { ZIL_STAT_BUMP(zilog, zil_itx_copied_count); ZIL_STAT_INCR(zilog, zil_itx_copied_bytes, @@ -2010,14 +2120,10 @@ cont: if (itx->itx_wr_state == WR_NEED_COPY) { dbuf = lr_buf + reclen; - lrcb->lrc_reclen += dnow; - if (lrwb->lr_length > dnow) - lrwb->lr_length = dnow; - lrw->lr_offset += dnow; - lrw->lr_length -= dnow; + lrb->lrc_reclen += dlen; ZIL_STAT_BUMP(zilog, zil_itx_needcopy_count); ZIL_STAT_INCR(zilog, zil_itx_needcopy_bytes, - dnow); + dlen); } else { ASSERT3S(itx->itx_wr_state, ==, WR_INDIRECT); dbuf = NULL; @@ -2044,9 +2150,11 @@ cont: error = zilog->zl_get_data(itx->itx_private, itx->itx_gen, lrwb, dbuf, lwb, lwb->lwb_write_zio); - if (dbuf != NULL && error == 0 && dnow == dlen) + if (dbuf != NULL && error == 0) { /* Zero any padding bytes in the last block. */ - memset((char *)dbuf + lrwb->lr_length, 0, dpad); + memset((char *)dbuf + lrwb->lr_length, 0, + dlen - lrwb->lr_length); + } /* * Typically, the only return values we should see from @@ -2074,39 +2182,26 @@ cont: error); zfs_fallthrough; case EIO: - txg_wait_synced(zilog->zl_dmu_pool, txg); + if (lwb->lwb_indirect) { + txg_wait_synced(zilog->zl_dmu_pool, + lr->lrc_txg); + } else { + lwb->lwb_write_zio->io_error = error; + } zfs_fallthrough; case ENOENT: zfs_fallthrough; case EEXIST: zfs_fallthrough; case EALREADY: - return (lwb); + return; } } } - /* - * We're actually making an entry, so update lrc_seq to be the - * log record sequence number. Note that this is generally not - * equal to the itx sequence number because not all transactions - * are synchronous, and sometimes spa_sync() gets there first. - */ - lrcb->lrc_seq = ++zilog->zl_lr_seq; - lwb->lwb_nused += reclen + dnow; - - zil_lwb_add_txg(lwb, txg); - - ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz); - ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t))); - - dlen -= dnow; - if (dlen > 0) { - zilog->zl_cur_used += reclen; - goto cont; - } - - return (lwb); + lwb->lwb_nfilled += reclen + dlen; + ASSERT3S(lwb->lwb_nfilled, <=, lwb->lwb_nused); + ASSERT0(P2PHASE(lwb->lwb_nfilled, sizeof (uint64_t))); } itx_t * @@ -2131,6 +2226,16 @@ zil_itx_create(uint64_t txtype, size_t olrsize) return (itx); } +static itx_t * +zil_itx_clone(itx_t *oitx) +{ + itx_t *itx = zio_data_buf_alloc(oitx->itx_size); + memcpy(itx, oitx, oitx->itx_size); + itx->itx_callback = NULL; + itx->itx_callback_data = NULL; + return (itx); +} + void zil_itx_destroy(itx_t *itx) { @@ -2162,7 +2267,7 @@ zil_itxg_clean(void *arg) /* * In the general case, commit itxs will not be found * here, as they'll be committed to an lwb via - * zil_lwb_commit(), and free'd in that function. Having + * zil_lwb_assign(), and free'd in that function. Having * said that, it is still possible for commit itxs to be * found here, due to the following race: * @@ -2561,7 +2666,7 @@ zil_commit_writer_stall(zilog_t *zilog) * lwb will be issued to the zio layer to be written to disk. */ static void -zil_process_commit_list(zilog_t *zilog) +zil_process_commit_list(zilog_t *zilog, zil_commit_waiter_t *zcw, list_t *ilwbs) { spa_t *spa = zilog->zl_spa; list_t nolwb_itxs; @@ -2663,18 +2768,23 @@ zil_process_commit_list(zilog_t *zilog) */ if (frozen || !synced || lrc->lrc_txtype == TX_COMMIT) { if (lwb != NULL) { - lwb = zil_lwb_commit(zilog, itx, lwb); - - if (lwb == NULL) + lwb = zil_lwb_assign(zilog, lwb, itx, ilwbs); + if (lwb == NULL) { list_insert_tail(&nolwb_itxs, itx); - else - list_insert_tail(&lwb->lwb_itxs, itx); + } else if ((zcw->zcw_lwb != NULL && + zcw->zcw_lwb != lwb) || zcw->zcw_done) { + /* + * Our lwb is done, leave the rest of + * itx list to somebody else who care. + */ + first = B_FALSE; + break; + } } else { if (lrc->lrc_txtype == TX_COMMIT) { zil_commit_waiter_link_nolwb( itx->itx_private, &nolwb_waiters); } - list_insert_tail(&nolwb_itxs, itx); } } else { @@ -2690,6 +2800,8 @@ zil_process_commit_list(zilog_t *zilog) * the ZIL write pipeline; see the comment within * zil_commit_writer_stall() for more details. */ + while ((lwb = list_remove_head(ilwbs)) != NULL) + zil_lwb_write_issue(zilog, lwb); zil_commit_writer_stall(zilog); /* @@ -2735,13 +2847,13 @@ zil_process_commit_list(zilog_t *zilog) * on the system, such that this function will be * immediately called again (not necessarily by the same * thread) and this lwb's zio will be issued via - * zil_lwb_commit(). This way, the lwb is guaranteed to + * zil_lwb_assign(). This way, the lwb is guaranteed to * be "full" when it is issued to disk, and we'll make * use of the lwb's size the best we can. * * 2. If there isn't sufficient ZIL activity occurring on * the system, such that this lwb's zio isn't issued via - * zil_lwb_commit(), zil_commit_waiter() will issue the + * zil_lwb_assign(), zil_commit_waiter() will issue the * lwb's zio. If this occurs, the lwb is not guaranteed * to be "full" by the time its zio is issued, and means * the size of the lwb was "too large" given the amount @@ -2773,10 +2885,15 @@ zil_process_commit_list(zilog_t *zilog) zfs_commit_timeout_pct / 100; if (sleep < zil_min_commit_timeout || lwb->lwb_sz - lwb->lwb_nused < lwb->lwb_sz / 8) { - lwb = zil_lwb_write_issue(zilog, lwb); + list_insert_tail(ilwbs, lwb); + lwb = zil_lwb_write_close(zilog, lwb); zilog->zl_cur_used = 0; - if (lwb == NULL) + if (lwb == NULL) { + while ((lwb = list_remove_head(ilwbs)) + != NULL) + zil_lwb_write_issue(zilog, lwb); zil_commit_writer_stall(zilog); + } } } } @@ -2799,9 +2916,13 @@ zil_process_commit_list(zilog_t *zilog) static void zil_commit_writer(zilog_t *zilog, zil_commit_waiter_t *zcw) { + list_t ilwbs; + lwb_t *lwb; + ASSERT(!MUTEX_HELD(&zilog->zl_lock)); ASSERT(spa_writeable(zilog->zl_spa)); + list_create(&ilwbs, sizeof (lwb_t), offsetof(lwb_t, lwb_issue_node)); mutex_enter(&zilog->zl_issuer_lock); if (zcw->zcw_lwb != NULL || zcw->zcw_done) { @@ -2828,10 +2949,13 @@ zil_commit_writer(zilog_t *zilog, zil_commit_waiter_t *zcw) zil_get_commit_list(zilog); zil_prune_commit_list(zilog); - zil_process_commit_list(zilog); + zil_process_commit_list(zilog, zcw, &ilwbs); out: mutex_exit(&zilog->zl_issuer_lock); + while ((lwb = list_remove_head(&ilwbs)) != NULL) + zil_lwb_write_issue(zilog, lwb); + list_destroy(&ilwbs); } static void @@ -2858,7 +2982,7 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw) return; /* - * In order to call zil_lwb_write_issue() we must hold the + * In order to call zil_lwb_write_close() we must hold the * zilog's "zl_issuer_lock". We can't simply acquire that lock, * since we're already holding the commit waiter's "zcw_lock", * and those two locks are acquired in the opposite order @@ -2876,8 +3000,10 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw) * the waiter is marked "done"), so without this check we could * wind up with a use-after-free error below. */ - if (zcw->zcw_done) + if (zcw->zcw_done) { + lwb = NULL; goto out; + } ASSERT3P(lwb, ==, zcw->zcw_lwb); @@ -2896,15 +3022,17 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw) * if it's ISSUED or OPENED, and block any other threads that might * attempt to issue this lwb. For that reason we hold the * zl_issuer_lock when checking the lwb_state; we must not call - * zil_lwb_write_issue() if the lwb had already been issued. + * zil_lwb_write_close() if the lwb had already been issued. * * See the comment above the lwb_state_t structure definition for * more details on the lwb states, and locking requirements. */ if (lwb->lwb_state == LWB_STATE_ISSUED || lwb->lwb_state == LWB_STATE_WRITE_DONE || - lwb->lwb_state == LWB_STATE_FLUSH_DONE) + lwb->lwb_state == LWB_STATE_FLUSH_DONE) { + lwb = NULL; goto out; + } ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED); @@ -2914,7 +3042,7 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw) * since we've reached the commit waiter's timeout and it still * hasn't been issued. */ - lwb_t *nlwb = zil_lwb_write_issue(zilog, lwb); + lwb_t *nlwb = zil_lwb_write_close(zilog, lwb); ASSERT3S(lwb->lwb_state, !=, LWB_STATE_OPENED); @@ -2934,7 +3062,7 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw) if (nlwb == NULL) { /* - * When zil_lwb_write_issue() returns NULL, this + * When zil_lwb_write_close() returns NULL, this * indicates zio_alloc_zil() failed to allocate the * "next" lwb on-disk. When this occurs, the ZIL write * pipeline must be stalled; see the comment within the @@ -2956,12 +3084,16 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw) * lock, which occurs prior to calling dmu_tx_commit() */ mutex_exit(&zcw->zcw_lock); + zil_lwb_write_issue(zilog, lwb); + lwb = NULL; zil_commit_writer_stall(zilog); mutex_enter(&zcw->zcw_lock); } out: mutex_exit(&zilog->zl_issuer_lock); + if (lwb) + zil_lwb_write_issue(zilog, lwb); ASSERT(MUTEX_HELD(&zcw->zcw_lock)); } @@ -2976,7 +3108,7 @@ out: * waited "long enough" and the lwb is still in the "open" state. * * Given a sufficient amount of itxs being generated and written using - * the ZIL, the lwb's zio will be issued via the zil_lwb_commit() + * the ZIL, the lwb's zio will be issued via the zil_lwb_assign() * function. If this does not occur, this secondary responsibility will * ensure the lwb is issued even if there is not other synchronous * activity on the system. @@ -3656,7 +3788,7 @@ zil_close(zilog_t *zilog) /* * zl_lwb_max_issued_txg may be larger than lwb_max_txg. It depends * on the time when the dmu_tx transaction is assigned in - * zil_lwb_write_issue(). + * zil_lwb_write_close(). */ mutex_enter(&zilog->zl_lwb_io_lock); txg = MAX(zilog->zl_lwb_max_issued_txg, txg); |