Skip to content

Commit

Permalink
Raw send stability fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Caputi <tcaputi@datto.com>
  • Loading branch information
Tom Caputi committed Dec 3, 2017
1 parent 9c885c4 commit 426a446
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 17 deletions.
32 changes: 23 additions & 9 deletions module/zfs/dmu_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -2422,6 +2422,8 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
dmu_object_info_t doi;
dmu_tx_t *tx;
uint64_t object;
uint32_t indblksz;
int nblkptr;
int err;

if (drro->drr_type == DMU_OT_NONE ||
Expand Down Expand Up @@ -2454,6 +2456,10 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
return (SET_ERROR(EINVAL));
}

indblksz = drro->drr_indblkshift ?
1ULL << drro->drr_indblkshift : 0;
nblkptr = deduce_nblkptr(drro->drr_bonustype, drro->drr_bonuslen);

err = dmu_object_info(rwa->os, drro->drr_object, &doi);

if (err != 0 && err != ENOENT)
Expand All @@ -2471,25 +2477,28 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
* dnode hasn't changed.
*/
if (err == 0) {
uint32_t indblksz = drro->drr_indblkshift ?
1ULL << drro->drr_indblkshift : 0;
int nblkptr = deduce_nblkptr(drro->drr_bonustype,
drro->drr_bonuslen);

/* nblkptr will be bounded by the bonus size and type */
if (rwa->raw && nblkptr != drro->drr_nblkptr)
return (SET_ERROR(EINVAL));

if (drro->drr_blksz != doi.doi_data_block_size ||
nblkptr < doi.doi_nblkptr ||
(rwa->raw &&
(indblksz != doi.doi_metadata_block_size ||
(rwa->raw && (indblksz != doi.doi_metadata_block_size ||
drro->drr_nlevels < doi.doi_indirection))) {
err = dmu_free_long_range(rwa->os, drro->drr_object,
0, DMU_OBJECT_END);
if (err != 0)
return (SET_ERROR(EINVAL));
}

if (rwa->raw && drro->drr_nlevels < doi.doi_indirection) {
err = dmu_free_long_object_raw(rwa->os, object);
if (err != 0)
return (SET_ERROR(EINVAL));

txg_wait_synced(dmu_objset_pool(rwa->os), 0);
object = DMU_NEW_OBJECT;
}
}

tx = dmu_tx_create(rwa->os);
Expand All @@ -2510,7 +2519,9 @@ receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
} else if (drro->drr_type != doi.doi_type ||
drro->drr_blksz != doi.doi_data_block_size ||
drro->drr_bonustype != doi.doi_bonus_type ||
drro->drr_bonuslen != doi.doi_bonus_size) {
drro->drr_bonuslen != doi.doi_bonus_size ||
(rwa->raw && (indblksz != doi.doi_metadata_block_size ||
drro->drr_nlevels < doi.doi_indirection))) {
/* currently allocated, but with different properties */
err = dmu_object_reclaim(rwa->os, drro->drr_object,
drro->drr_type, drro->drr_blksz,
Expand Down Expand Up @@ -2664,6 +2675,7 @@ receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw,
DRR_WRITE_PAYLOAD_SIZE(drrw));
}

IMPLY(rwa->raw, arc_is_encrypted(abuf));
VERIFY0(dnode_hold(rwa->os, drrw->drr_object, FTAG, &dn));
dmu_assign_arcbuf_by_dnode(dn, drrw->drr_offset, abuf, tx);
dnode_rele(dn, FTAG);
Expand Down Expand Up @@ -2844,8 +2856,10 @@ receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
return (err);
}
dmu_buf_will_dirty(db_spill, tx);
if (rwa->raw)
if (rwa->raw) {
VERIFY0(dmu_object_dirty_raw(rwa->os, drrs->drr_object, tx));
dmu_buf_will_change_crypt_params(db_spill, tx);
}

if (db_spill->db_size < drrs->drr_length)
VERIFY(0 == dbuf_spill_set_blksz(db_spill,
Expand Down
43 changes: 35 additions & 8 deletions module/zfs/dnode.c
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,6 @@ dnode_create(objset_t *os, dnode_phys_t *dnp, dmu_buf_impl_t *db,

ASSERT(DMU_OT_IS_VALID(dn->dn_phys->dn_type));
ASSERT(zrl_is_locked(&dnh->dnh_zrlock));
ASSERT(!DN_SLOT_IS_PTR(dnh->dnh_dnode));

mutex_enter(&os->os_lock);

Expand Down Expand Up @@ -497,7 +496,9 @@ dnode_create(objset_t *os, dnode_phys_t *dnp, dmu_buf_impl_t *db,
}

/*
* Caller must be holding the dnode handle, which is released upon return.
* Caller must be holding the dnode handle. If it is held for read, it is
* released upon return. Otherwise, the responsibility remains with the
* caller.
*/
static void
dnode_destroy(dnode_t *dn)
Expand All @@ -517,8 +518,15 @@ dnode_destroy(dnode_t *dn)
}
mutex_exit(&os->os_lock);

/* the dnode can no longer move, so we can release the handle */
zrl_remove(&dn->dn_handle->dnh_zrlock);
/*
* The dnode can no longer move, so we can release the handle.
* If the zrl is locked, it is the responsibility of the caller
* to drop the lock.
*/
if (!zrl_is_locked(&dn->dn_handle->dnh_zrlock)) {
ASSERT(!zrl_is_zero(&dn->dn_handle->dnh_zrlock));
zrl_remove(&dn->dn_handle->dnh_zrlock);
}

dn->dn_allocated_txg = 0;
dn->dn_free_txg = 0;
Expand Down Expand Up @@ -1067,14 +1075,33 @@ dnode_set_slots(dnode_children_t *children, int idx, int slots, void *ptr)
}

static boolean_t
dnode_check_slots(dnode_children_t *children, int idx, int slots, void *ptr)
dnode_check_slots_free(dnode_children_t *children, int idx, int slots)
{
ASSERT3S(idx + slots, <=, DNODES_PER_BLOCK);

for (int i = idx; i < idx + slots; i++) {
dnode_handle_t *dnh = &children->dnc_children[i];
if (dnh->dnh_dnode != ptr)

if (DN_SLOT_IS_PTR(dnh->dnh_dnode)) {
dnode_t *dn = dnh->dnh_dnode;

mutex_enter(&dn->dn_mtx);
if (dn->dn_type == DMU_OT_NONE &&
dn->dn_free_txg == 0) {
mutex_exit(&dn->dn_mtx);

if (!zrl_is_locked(&dnh->dnh_zrlock))
continue;

dnode_destroy(dn);
dnh->dnh_dnode = DN_SLOT_FREE;
} else {
mutex_exit(&dn->dn_mtx);
return (B_FALSE);
}
} else if (dnh->dnh_dnode != DN_SLOT_FREE) {
return (B_FALSE);
}
}

return (B_TRUE);
Expand Down Expand Up @@ -1377,7 +1404,7 @@ dnode_hold_impl(objset_t *os, uint64_t object, int flag, int slots,
while (dn == DN_SLOT_UNINIT) {
dnode_slots_hold(dnc, idx, slots);

if (!dnode_check_slots(dnc, idx, slots, DN_SLOT_FREE)) {
if (!dnode_check_slots_free(dnc, idx, slots)) {
DNODE_STAT_BUMP(dnode_hold_free_misses);
dnode_slots_rele(dnc, idx, slots);
dbuf_rele(db, FTAG);
Expand All @@ -1390,7 +1417,7 @@ dnode_hold_impl(objset_t *os, uint64_t object, int flag, int slots,
continue;
}

if (!dnode_check_slots(dnc, idx, slots, DN_SLOT_FREE)) {
if (!dnode_check_slots_free(dnc, idx, slots)) {
DNODE_STAT_BUMP(dnode_hold_free_lock_misses);
dnode_slots_rele(dnc, idx, slots);
dbuf_rele(db, FTAG);
Expand Down

0 comments on commit 426a446

Please sign in to comment.