diff --git a/include/sys/zil_impl.h b/include/sys/zil_impl.h index bb85bf6d1eb1..a4b6bdaac79e 100644 --- a/include/sys/zil_impl.h +++ b/include/sys/zil_impl.h @@ -44,7 +44,7 @@ extern "C" { * must be held. * * After the lwb is "opened", it can transition into the "issued" state - * via zil_lwb_write_issue(). Again, the zilog's "zl_issuer_lock" must + * via zil_lwb_write_close(). Again, the zilog's "zl_issuer_lock" must * be held when making this transition. * * After the lwb's write zio completes, it transitions into the "write @@ -94,6 +94,7 @@ typedef struct lwb { boolean_t lwb_fastwrite; /* is blk marked for fastwrite? */ boolean_t lwb_slog; /* lwb_blk is on SLOG device */ int lwb_nused; /* # used bytes in buffer */ + int lwb_nfilled; /* # filled bytes in buffer */ int lwb_sz; /* size of block and buffer */ lwb_state_t lwb_state; /* the state of this lwb */ char *lwb_buf; /* log write buffer */ @@ -107,6 +108,7 @@ typedef struct lwb { avl_tree_t lwb_vdev_tree; /* vdevs to flush after lwb write */ kmutex_t lwb_vdev_lock; /* protects lwb_vdev_tree */ hrtime_t lwb_issued_timestamp; /* when was the lwb issued? */ + list_node_t lwb_issue_node; /* linkage of lwbs ready for issue */ } lwb_t; /* diff --git a/module/zfs/zil.c b/module/zfs/zil.c index ec9da706a806..2f887f364880 100644 --- a/module/zfs/zil.c +++ b/module/zfs/zil.c @@ -146,8 +146,8 @@ static uint64_t zil_slog_bulk = 768 * 1024; static kmem_cache_t *zil_lwb_cache; static kmem_cache_t *zil_zcw_cache; -#define LWB_EMPTY(lwb) ((BP_GET_LSIZE(&lwb->lwb_blk) - \ - sizeof (zil_chain_t)) == (lwb->lwb_sz - lwb->lwb_nused)) +static void zil_lwb_commit(zilog_t *zilog, lwb_t *lwb, itx_t *itx); +static itx_t *zil_itx_clone(itx_t *oitx); static int zil_bp_compare(const void *x1, const void *x2) @@ -758,10 +758,10 @@ zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, boolean_t slog, uint64_t txg, lwb->lwb_issued_timestamp = 0; lwb->lwb_issued_txg = 0; if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) { - lwb->lwb_nused = sizeof (zil_chain_t); + lwb->lwb_nused = lwb->lwb_nfilled = sizeof (zil_chain_t); lwb->lwb_sz = BP_GET_LSIZE(bp); } else { - lwb->lwb_nused = 0; + lwb->lwb_nused = lwb->lwb_nfilled = 0; lwb->lwb_sz = BP_GET_LSIZE(bp) - sizeof (zil_chain_t); } @@ -1689,46 +1689,41 @@ zil_lwb_write_open(zilog_t *zilog, lwb_t *lwb) EQUIV(lwb->lwb_root_zio == NULL, lwb->lwb_state == LWB_STATE_CLOSED); EQUIV(lwb->lwb_root_zio != NULL, lwb->lwb_state == LWB_STATE_OPENED); + if (lwb->lwb_root_zio != NULL) + return; + + lwb->lwb_root_zio = zio_root(zilog->zl_spa, + zil_lwb_flush_vdevs_done, lwb, ZIO_FLAG_CANFAIL); + + abd_t *lwb_abd = abd_get_from_buf(lwb->lwb_buf, + BP_GET_LSIZE(&lwb->lwb_blk)); + + if (!lwb->lwb_slog || zilog->zl_cur_used <= zil_slog_bulk) + prio = ZIO_PRIORITY_SYNC_WRITE; + else + prio = ZIO_PRIORITY_ASYNC_WRITE; + SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET], ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ]); /* Lock so zil_sync() doesn't fastwrite_unmark after zio is created */ mutex_enter(&zilog->zl_lock); - if (lwb->lwb_root_zio == NULL) { - abd_t *lwb_abd = abd_get_from_buf(lwb->lwb_buf, - BP_GET_LSIZE(&lwb->lwb_blk)); - - if (!lwb->lwb_fastwrite) { - metaslab_fastwrite_mark(zilog->zl_spa, &lwb->lwb_blk); - lwb->lwb_fastwrite = 1; - } - - if (!lwb->lwb_slog || zilog->zl_cur_used <= zil_slog_bulk) - prio = ZIO_PRIORITY_SYNC_WRITE; - else - prio = ZIO_PRIORITY_ASYNC_WRITE; - - lwb->lwb_root_zio = zio_root(zilog->zl_spa, - zil_lwb_flush_vdevs_done, lwb, ZIO_FLAG_CANFAIL); - ASSERT3P(lwb->lwb_root_zio, !=, NULL); + if (!lwb->lwb_fastwrite) { + metaslab_fastwrite_mark(zilog->zl_spa, &lwb->lwb_blk); + lwb->lwb_fastwrite = 1; + } - lwb->lwb_write_zio = zio_rewrite(lwb->lwb_root_zio, - zilog->zl_spa, 0, &lwb->lwb_blk, lwb_abd, - BP_GET_LSIZE(&lwb->lwb_blk), zil_lwb_write_done, lwb, - prio, ZIO_FLAG_CANFAIL | ZIO_FLAG_FASTWRITE, &zb); - ASSERT3P(lwb->lwb_write_zio, !=, NULL); + lwb->lwb_write_zio = zio_rewrite(lwb->lwb_root_zio, zilog->zl_spa, 0, + &lwb->lwb_blk, lwb_abd, BP_GET_LSIZE(&lwb->lwb_blk), + zil_lwb_write_done, lwb, prio, + ZIO_FLAG_CANFAIL | ZIO_FLAG_FASTWRITE, &zb); - lwb->lwb_state = LWB_STATE_OPENED; + lwb->lwb_state = LWB_STATE_OPENED; - zil_lwb_set_zio_dependency(zilog, lwb); - zilog->zl_last_lwb_opened = lwb; - } + zil_lwb_set_zio_dependency(zilog, lwb); + zilog->zl_last_lwb_opened = lwb; mutex_exit(&zilog->zl_lock); - - ASSERT3P(lwb->lwb_root_zio, !=, NULL); - ASSERT3P(lwb->lwb_write_zio, !=, NULL); - ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED); } /* @@ -1759,11 +1754,11 @@ static const struct { static uint_t zil_maxblocksize = SPA_OLD_MAXBLOCKSIZE; /* - * Start a log block write and advance to the next log block. + * Finalize a log block for write and advance to the next log block. * Calls are serialized. */ static lwb_t * -zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb) +zil_lwb_write_close(zilog_t *zilog, lwb_t *lwb) { lwb_t *nlwb = NULL; zil_chain_t *zilc; @@ -1771,7 +1766,7 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb) blkptr_t *bp; dmu_tx_t *tx; uint64_t txg; - uint64_t zil_blksz, wsz; + uint64_t zil_blksz; int i, error; boolean_t slog; @@ -1780,16 +1775,6 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb) ASSERT3P(lwb->lwb_write_zio, !=, NULL); ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED); - if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) { - zilc = (zil_chain_t *)lwb->lwb_buf; - bp = &zilc->zc_next_blk; - } else { - zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz); - bp = &zilc->zc_next_blk; - } - - ASSERT(lwb->lwb_nused <= lwb->lwb_sz); - /* * Allocate the next block and save its address in this block * before writing it in order to establish the log chain. @@ -1839,17 +1824,13 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb) zil_blksz = MAX(zil_blksz, zilog->zl_prev_blks[i]); zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1); + if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) + zilc = (zil_chain_t *)lwb->lwb_buf; + else + zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz); + bp = &zilc->zc_next_blk; BP_ZERO(bp); error = zio_alloc_zil(spa, zilog->zl_os, txg, bp, zil_blksz, &slog); - if (slog) { - ZIL_STAT_BUMP(zilog, zil_itx_metaslab_slog_count); - ZIL_STAT_INCR(zilog, zil_itx_metaslab_slog_bytes, - lwb->lwb_nused); - } else { - ZIL_STAT_BUMP(zilog, zil_itx_metaslab_normal_count); - ZIL_STAT_INCR(zilog, zil_itx_metaslab_normal_bytes, - lwb->lwb_nused); - } if (error == 0) { ASSERT3U(bp->blk_birth, ==, txg); bp->blk_cksum = lwb->lwb_blk.blk_cksum; @@ -1861,41 +1842,59 @@ zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb) nlwb = zil_alloc_lwb(zilog, bp, slog, txg, TRUE); } + lwb->lwb_state = LWB_STATE_ISSUED; + + dmu_tx_commit(tx); + + /* + * If there was an allocation failure then nlwb will be null which + * forces a txg_wait_synced(). + */ + return (nlwb); +} + +static void +zil_lwb_write_issue(zilog_t *zilog, lwb_t *lwb) +{ + zil_chain_t *zilc; + + /* Actually fill the lwb with the data. */ + for (itx_t *itx = list_head(&lwb->lwb_itxs); itx; + itx = list_next(&lwb->lwb_itxs, itx)) + zil_lwb_commit(zilog, lwb, itx); + lwb->lwb_nused = lwb->lwb_nfilled; + if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) { /* For Slim ZIL only write what is used. */ - wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, uint64_t); - ASSERT3U(wsz, <=, lwb->lwb_sz); + int wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, int); + ASSERT3S(wsz, <=, lwb->lwb_sz); zio_shrink(lwb->lwb_write_zio, wsz); + zilc = (zil_chain_t *)lwb->lwb_buf; } else { - wsz = lwb->lwb_sz; + zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz); } - zilc->zc_pad = 0; zilc->zc_nused = lwb->lwb_nused; zilc->zc_eck.zec_cksum = lwb->lwb_blk.blk_cksum; - /* - * clear unused data for security - */ - memset(lwb->lwb_buf + lwb->lwb_nused, 0, wsz - lwb->lwb_nused); + /* Clear unused space for security. */ + memset(lwb->lwb_buf + lwb->lwb_nused, 0, lwb->lwb_sz - lwb->lwb_nused); + if (lwb->lwb_slog) { + ZIL_STAT_BUMP(zilog, zil_itx_metaslab_slog_count); + ZIL_STAT_INCR(zilog, zil_itx_metaslab_slog_bytes, + lwb->lwb_write_zio->io_size); + } else { + ZIL_STAT_BUMP(zilog, zil_itx_metaslab_normal_count); + ZIL_STAT_INCR(zilog, zil_itx_metaslab_normal_bytes, + lwb->lwb_write_zio->io_size); + } spa_config_enter(zilog->zl_spa, SCL_STATE, lwb, RW_READER); - zil_lwb_add_block(lwb, &lwb->lwb_blk); lwb->lwb_issued_timestamp = gethrtime(); - lwb->lwb_state = LWB_STATE_ISSUED; - zio_nowait(lwb->lwb_root_zio); zio_nowait(lwb->lwb_write_zio); - - dmu_tx_commit(tx); - - /* - * If there was an allocation failure then nlwb will be null which - * forces a txg_wait_synced(). - */ - return (nlwb); } /* @@ -1932,12 +1931,12 @@ zil_max_copied_data(zilog_t *zilog) } static lwb_t * -zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb) +zil_lwb_assign(zilog_t *zilog, lwb_t *lwb, itx_t *itx, list_t *ilwbs) { - lr_t *lrcb, *lrc; - lr_write_t *lrwb, *lrw; - char *lr_buf; - uint64_t dlen, dnow, dpad, lwb_sp, reclen, txg, max_log_data; + itx_t *citx; + lr_t *lr, *clr; + lr_write_t *lrw; + uint64_t dlen, dnow, lwb_sp, reclen, max_log_data; ASSERT(MUTEX_HELD(&zilog->zl_issuer_lock)); ASSERT3P(lwb, !=, NULL); @@ -1945,8 +1944,8 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb) zil_lwb_write_open(zilog, lwb); - lrc = &itx->itx_lr; - lrw = (lr_write_t *)lrc; + lr = &itx->itx_lr; + lrw = (lr_write_t *)lr; /* * A commit itx doesn't represent any on-disk state; instead @@ -1960,26 +1959,23 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb) * * For more details, see the comment above zil_commit(). */ - if (lrc->lrc_txtype == TX_COMMIT) { + if (lr->lrc_txtype == TX_COMMIT) { mutex_enter(&zilog->zl_lock); zil_commit_waiter_link_lwb(itx->itx_private, lwb); itx->itx_private = NULL; mutex_exit(&zilog->zl_lock); + list_insert_tail(&lwb->lwb_itxs, itx); return (lwb); } - if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY) { + if (lr->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY) { dlen = P2ROUNDUP_TYPED( lrw->lr_length, sizeof (uint64_t), uint64_t); - dpad = dlen - lrw->lr_length; } else { - dlen = dpad = 0; + dlen = 0; } - reclen = lrc->lrc_reclen; + reclen = lr->lrc_reclen; zilog->zl_cur_used += (reclen + dlen); - txg = lrc->lrc_txg; - - ASSERT3U(zilog->zl_cur_used, <, UINT64_MAX - (reclen + dlen)); cont: /* @@ -1992,11 +1988,11 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb) lwb_sp < zil_max_waste_space(zilog) && (dlen % max_log_data == 0 || lwb_sp < reclen + dlen % max_log_data))) { - lwb = zil_lwb_write_issue(zilog, lwb); + list_insert_tail(ilwbs, lwb); + lwb = zil_lwb_write_close(zilog, lwb); if (lwb == NULL) return (NULL); zil_lwb_write_open(zilog, lwb); - ASSERT(LWB_EMPTY(lwb)); lwb_sp = lwb->lwb_sz - lwb->lwb_nused; /* @@ -2012,17 +2008,79 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb) } dnow = MIN(dlen, lwb_sp - reclen); - lr_buf = lwb->lwb_buf + lwb->lwb_nused; - memcpy(lr_buf, lrc, reclen); - lrcb = (lr_t *)lr_buf; /* Like lrc, but inside lwb. */ - lrwb = (lr_write_t *)lrcb; /* Like lrw, but inside lwb. */ + if (dlen > dnow) { + ASSERT3U(lr->lrc_txtype, ==, TX_WRITE); + ASSERT3U(itx->itx_wr_state, ==, WR_NEED_COPY); + citx = zil_itx_clone(itx); + clr = &citx->itx_lr; + lr_write_t *clrw = (lr_write_t *)clr; + clrw->lr_length = dnow; + lrw->lr_offset += dnow; + lrw->lr_length -= dnow; + } else { + citx = itx; + clr = lr; + } + + /* + * We're actually making an entry, so update lrc_seq to be the + * log record sequence number. Note that this is generally not + * equal to the itx sequence number because not all transactions + * are synchronous, and sometimes spa_sync() gets there first. + */ + clr->lrc_seq = ++zilog->zl_lr_seq; + + lwb->lwb_nused += reclen + dnow; + ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz); + ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t))); + + zil_lwb_add_txg(lwb, lr->lrc_txg); + list_insert_tail(&lwb->lwb_itxs, citx); + + dlen -= dnow; + if (dlen > 0) { + zilog->zl_cur_used += reclen; + goto cont; + } + + return (lwb); +} + +static void +zil_lwb_commit(zilog_t *zilog, lwb_t *lwb, itx_t *itx) +{ + lr_t *lr, *lrb; + lr_write_t *lrw, *lrwb; + char *lr_buf; + uint64_t dlen, reclen, txg; + + lr = &itx->itx_lr; + lrw = (lr_write_t *)lr; + + if (lr->lrc_txtype == TX_COMMIT) + return; + + if (lr->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY) { + dlen = P2ROUNDUP_TYPED( + lrw->lr_length, sizeof (uint64_t), uint64_t); + } else { + dlen = 0; + } + reclen = lr->lrc_reclen; + ASSERT3U(reclen + dlen, <=, lwb->lwb_nused - lwb->lwb_nfilled); + + lr_buf = lwb->lwb_buf + lwb->lwb_nfilled; + memcpy(lr_buf, lr, reclen); + lrb = (lr_t *)lr_buf; /* Like lr, but inside lwb. */ + lrwb = (lr_write_t *)lrb; /* Like lrw, but inside lwb. */ ZIL_STAT_BUMP(zilog, zil_itx_count); /* * If it's a write, fetch the data or get its blkptr as appropriate. */ - if (lrc->lrc_txtype == TX_WRITE) { + if (lr->lrc_txtype == TX_WRITE) { + txg = lr->lrc_txg; if (txg > spa_freeze_txg(zilog->zl_spa)) txg_wait_synced(zilog->zl_dmu_pool, txg); if (itx->itx_wr_state == WR_COPIED) { @@ -2035,14 +2093,10 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb) if (itx->itx_wr_state == WR_NEED_COPY) { dbuf = lr_buf + reclen; - lrcb->lrc_reclen += dnow; - if (lrwb->lr_length > dnow) - lrwb->lr_length = dnow; - lrw->lr_offset += dnow; - lrw->lr_length -= dnow; + lrb->lrc_reclen += dlen; ZIL_STAT_BUMP(zilog, zil_itx_needcopy_count); ZIL_STAT_INCR(zilog, zil_itx_needcopy_bytes, - dnow); + dlen); } else { ASSERT3S(itx->itx_wr_state, ==, WR_INDIRECT); dbuf = NULL; @@ -2069,9 +2123,11 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb) error = zilog->zl_get_data(itx->itx_private, itx->itx_gen, lrwb, dbuf, lwb, lwb->lwb_write_zio); - if (dbuf != NULL && error == 0 && dnow == dlen) + if (dbuf != NULL && error == 0) { /* Zero any padding bytes in the last block. */ - memset((char *)dbuf + lrwb->lr_length, 0, dpad); + memset((char *)dbuf + lrwb->lr_length, 0, + dlen - lrwb->lr_length); + } /* * Typically, the only return values we should see from @@ -2106,32 +2162,14 @@ zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb) case EEXIST: zfs_fallthrough; case EALREADY: - return (lwb); + return; } } } - /* - * We're actually making an entry, so update lrc_seq to be the - * log record sequence number. Note that this is generally not - * equal to the itx sequence number because not all transactions - * are synchronous, and sometimes spa_sync() gets there first. - */ - lrcb->lrc_seq = ++zilog->zl_lr_seq; - lwb->lwb_nused += reclen + dnow; - - zil_lwb_add_txg(lwb, txg); - - ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz); - ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t))); - - dlen -= dnow; - if (dlen > 0) { - zilog->zl_cur_used += reclen; - goto cont; - } - - return (lwb); + lwb->lwb_nfilled += reclen + dlen; + ASSERT3S(lwb->lwb_nfilled, <=, lwb->lwb_nused); + ASSERT0(P2PHASE(lwb->lwb_nfilled, sizeof (uint64_t))); } itx_t * @@ -2156,6 +2194,16 @@ zil_itx_create(uint64_t txtype, size_t olrsize) return (itx); } +static itx_t * +zil_itx_clone(itx_t *oitx) +{ + itx_t *itx = zio_data_buf_alloc(oitx->itx_size); + memcpy(itx, oitx, oitx->itx_size); + itx->itx_callback = NULL; + itx->itx_callback_data = NULL; + return (itx); +} + void zil_itx_destroy(itx_t *itx) { @@ -2187,7 +2235,7 @@ zil_itxg_clean(void *arg) /* * In the general case, commit itxs will not be found * here, as they'll be committed to an lwb via - * zil_lwb_commit(), and free'd in that function. Having + * zil_lwb_assign(), and free'd in that function. Having * said that, it is still possible for commit itxs to be * found here, due to the following race: * @@ -2589,7 +2637,7 @@ zil_commit_writer_stall(zilog_t *zilog) * lwb will be issued to the zio layer to be written to disk. */ static void -zil_process_commit_list(zilog_t *zilog) +zil_process_commit_list(zilog_t *zilog, list_t *ilwbs) { spa_t *spa = zilog->zl_spa; list_t nolwb_itxs; @@ -2693,18 +2741,14 @@ zil_process_commit_list(zilog_t *zilog) */ if (frozen || !synced || lrc->lrc_txtype == TX_COMMIT) { if (lwb != NULL) { - lwb = zil_lwb_commit(zilog, itx, lwb); - + lwb = zil_lwb_assign(zilog, lwb, itx, ilwbs); if (lwb == NULL) list_insert_tail(&nolwb_itxs, itx); - else - list_insert_tail(&lwb->lwb_itxs, itx); } else { if (lrc->lrc_txtype == TX_COMMIT) { zil_commit_waiter_link_nolwb( itx->itx_private, &nolwb_waiters); } - list_insert_tail(&nolwb_itxs, itx); } } else { @@ -2720,6 +2764,8 @@ zil_process_commit_list(zilog_t *zilog) * the ZIL write pipeline; see the comment within * zil_commit_writer_stall() for more details. */ + while ((lwb = list_remove_head(ilwbs)) != NULL) + zil_lwb_write_issue(zilog, lwb); zil_commit_writer_stall(zilog); /* @@ -2769,13 +2815,13 @@ zil_process_commit_list(zilog_t *zilog) * on the system, such that this function will be * immediately called again (not necessarily by the same * thread) and this lwb's zio will be issued via - * zil_lwb_commit(). This way, the lwb is guaranteed to + * zil_lwb_assign(). This way, the lwb is guaranteed to * be "full" when it is issued to disk, and we'll make * use of the lwb's size the best we can. * * 2. If there isn't sufficient ZIL activity occurring on * the system, such that this lwb's zio isn't issued via - * zil_lwb_commit(), zil_commit_waiter() will issue the + * zil_lwb_assign(), zil_commit_waiter() will issue the * lwb's zio. If this occurs, the lwb is not guaranteed * to be "full" by the time its zio is issued, and means * the size of the lwb was "too large" given the amount @@ -2807,10 +2853,15 @@ zil_process_commit_list(zilog_t *zilog) zfs_commit_timeout_pct / 100; if (sleep < zil_min_commit_timeout || lwb->lwb_sz - lwb->lwb_nused < lwb->lwb_sz / 8) { - lwb = zil_lwb_write_issue(zilog, lwb); + list_insert_tail(ilwbs, lwb); + lwb = zil_lwb_write_close(zilog, lwb); zilog->zl_cur_used = 0; - if (lwb == NULL) + if (lwb == NULL) { + while ((lwb = list_remove_head(ilwbs)) + != NULL) + zil_lwb_write_issue(zilog, lwb); zil_commit_writer_stall(zilog); + } } } } @@ -2833,9 +2884,13 @@ zil_process_commit_list(zilog_t *zilog) static void zil_commit_writer(zilog_t *zilog, zil_commit_waiter_t *zcw) { + list_t ilwbs; + lwb_t *lwb; + ASSERT(!MUTEX_HELD(&zilog->zl_lock)); ASSERT(spa_writeable(zilog->zl_spa)); + list_create(&ilwbs, sizeof (lwb_t), offsetof(lwb_t, lwb_issue_node)); mutex_enter(&zilog->zl_issuer_lock); if (zcw->zcw_lwb != NULL || zcw->zcw_done) { @@ -2862,10 +2917,12 @@ zil_commit_writer(zilog_t *zilog, zil_commit_waiter_t *zcw) zil_get_commit_list(zilog); zil_prune_commit_list(zilog); - zil_process_commit_list(zilog); + zil_process_commit_list(zilog, &ilwbs); out: mutex_exit(&zilog->zl_issuer_lock); + while ((lwb = list_remove_head(&ilwbs)) != NULL) + zil_lwb_write_issue(zilog, lwb); } static void @@ -2892,7 +2949,7 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw) return; /* - * In order to call zil_lwb_write_issue() we must hold the + * In order to call zil_lwb_write_close() we must hold the * zilog's "zl_issuer_lock". We can't simply acquire that lock, * since we're already holding the commit waiter's "zcw_lock", * and those two locks are acquired in the opposite order @@ -2910,8 +2967,10 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw) * the waiter is marked "done"), so without this check we could * wind up with a use-after-free error below. */ - if (zcw->zcw_done) + if (zcw->zcw_done) { + lwb = NULL; goto out; + } ASSERT3P(lwb, ==, zcw->zcw_lwb); @@ -2930,15 +2989,17 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw) * if it's ISSUED or OPENED, and block any other threads that might * attempt to issue this lwb. For that reason we hold the * zl_issuer_lock when checking the lwb_state; we must not call - * zil_lwb_write_issue() if the lwb had already been issued. + * zil_lwb_write_close() if the lwb had already been issued. * * See the comment above the lwb_state_t structure definition for * more details on the lwb states, and locking requirements. */ if (lwb->lwb_state == LWB_STATE_ISSUED || lwb->lwb_state == LWB_STATE_WRITE_DONE || - lwb->lwb_state == LWB_STATE_FLUSH_DONE) + lwb->lwb_state == LWB_STATE_FLUSH_DONE) { + lwb = NULL; goto out; + } ASSERT3S(lwb->lwb_state, ==, LWB_STATE_OPENED); @@ -2948,9 +3009,9 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw) * since we've reached the commit waiter's timeout and it still * hasn't been issued. */ - lwb_t *nlwb = zil_lwb_write_issue(zilog, lwb); + lwb_t *nlwb = zil_lwb_write_close(zilog, lwb); - IMPLY(nlwb != NULL, lwb->lwb_state != LWB_STATE_OPENED); + ASSERT3S(lwb->lwb_state, !=, LWB_STATE_OPENED); /* * Since the lwb's zio hadn't been issued by the time this thread @@ -2968,7 +3029,7 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw) if (nlwb == NULL) { /* - * When zil_lwb_write_issue() returns NULL, this + * When zil_lwb_write_close() returns NULL, this * indicates zio_alloc_zil() failed to allocate the * "next" lwb on-disk. When this occurs, the ZIL write * pipeline must be stalled; see the comment within the @@ -2990,12 +3051,16 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw) * lock, which occurs prior to calling dmu_tx_commit() */ mutex_exit(&zcw->zcw_lock); + zil_lwb_write_issue(zilog, lwb); + lwb = NULL; zil_commit_writer_stall(zilog); mutex_enter(&zcw->zcw_lock); } out: mutex_exit(&zilog->zl_issuer_lock); + if (lwb) + zil_lwb_write_issue(zilog, lwb); ASSERT(MUTEX_HELD(&zcw->zcw_lock)); } @@ -3010,7 +3075,7 @@ zil_commit_waiter_timeout(zilog_t *zilog, zil_commit_waiter_t *zcw) * waited "long enough" and the lwb is still in the "open" state. * * Given a sufficient amount of itxs being generated and written using - * the ZIL, the lwb's zio will be issued via the zil_lwb_commit() + * the ZIL, the lwb's zio will be issued via the zil_lwb_assign() * function. If this does not occur, this secondary responsibility will * ensure the lwb is issued even if there is not other synchronous * activity on the system. @@ -3689,7 +3754,7 @@ zil_close(zilog_t *zilog) /* * zl_lwb_max_issued_txg may be larger than lwb_max_txg. It depends * on the time when the dmu_tx transaction is assigned in - * zil_lwb_write_issue(). + * zil_lwb_write_close(). */ mutex_enter(&zilog->zl_lwb_io_lock); txg = MAX(zilog->zl_lwb_max_issued_txg, txg); diff --git a/module/zfs/zio.c b/module/zfs/zio.c index 365d34832c3a..c17ca5e1d651 100644 --- a/module/zfs/zio.c +++ b/module/zfs/zio.c @@ -2341,7 +2341,7 @@ zio_nowait(zio_t *zio) ASSERT3P(zio->io_executor, ==, NULL); if (zio->io_child_type == ZIO_CHILD_LOGICAL && - zio_unique_parent(zio) == NULL) { + list_is_empty(&zio->io_parent_list)) { zio_t *pio; /*