Skip to content

Commit

Permalink
Implement a new type of zfs receive: corrective receive (-c)
Browse files Browse the repository at this point in the history
This type of recv is used to heal corrupted data when a replica
of the data already exists (in the form of a send file for example).
With the provided send stream, corrective receive will read from
disk blocks described by the WRITE records. When any of the reads
come back with ECKSUM we use the data from the corresponding WRITE
record to rewrite the corrupted block.

Reviewed-by: Paul Dagnelie <pcd@delphix.com>
Reviewed-by: Brian Behlendorf <behlendorf1@llnl.gov>
Reviewed-by: Paul Zuchowski <pzuchowski@datto.com>
Signed-off-by: Alek Pinchuk <apinchuk@axcient.com>
Closes openzfs#9372
  • Loading branch information
alek-p authored and andrewc12 committed Sep 23, 2022
1 parent 59d679a commit c9114a0
Show file tree
Hide file tree
Showing 28 changed files with 1,233 additions and 74 deletions.
5 changes: 4 additions & 1 deletion cmd/zfs/zfs_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -4746,7 +4746,7 @@ zfs_do_receive(int argc, char **argv)
nomem();

/* check options */
while ((c = getopt(argc, argv, ":o:x:dehMnuvFsA")) != -1) {
while ((c = getopt(argc, argv, ":o:x:dehMnuvFsAc")) != -1) {
switch (c) {
case 'o':
if (!parseprop(props, optarg)) {
Expand Down Expand Up @@ -4802,6 +4802,9 @@ zfs_do_receive(int argc, char **argv)
case 'A':
abort_resumable = B_TRUE;
break;
case 'c':
flags.heal = B_TRUE;
break;
case ':':
(void) fprintf(stderr, gettext("missing argument for "
"'%c' option\n"), optopt);
Expand Down
2 changes: 2 additions & 0 deletions contrib/pyzfs/libzfs_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
lzc_receive_resumable,
lzc_receive_with_cmdprops,
lzc_receive_with_header,
lzc_receive_with_heal,
lzc_release,
lzc_reopen,
lzc_rollback,
Expand Down Expand Up @@ -127,6 +128,7 @@
'lzc_receive_resumable',
'lzc_receive_with_cmdprops',
'lzc_receive_with_header',
'lzc_receive_with_heal',
'lzc_release',
'lzc_reopen',
'lzc_rollback',
Expand Down
2 changes: 2 additions & 0 deletions contrib/pyzfs/libzfs_core/_error_translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,8 @@ def _map(ret, name):
raise lzc_exc.ReadOnlyPool(_pool_name(snapname))
if ret == errno.EAGAIN:
raise lzc_exc.SuspendedPool(_pool_name(snapname))
if ret == errno.EACCES:
raise lzc_exc.EncryptionKeyNotLoaded()
if ret == ECKSUM:
raise lzc_exc.BadStream()
if ret == ZFS_ERR_WRONG_PARENT:
Expand Down
129 changes: 129 additions & 0 deletions contrib/pyzfs/libzfs_core/_libzfs_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,135 @@ def lzc_receive_with_cmdprops(
return (int(c_read_bytes[0]), action_handle)


@_uncommitted()
def lzc_receive_with_heal(
snapname, fd, begin_record, force=False, corrective=True, resumable=False,
raw=False, origin=None, props=None, cmdprops=None, key=None, cleanup_fd=-1,
action_handle=0
):
'''
Like :func:`lzc_receive_cmdprops`, but allows the caller to pass an
additional 'corrective' argument. The 'corrective' boolean set to true
indicates that a corruption healing receive should be performed.
:param bytes snapname: the name of the snapshot to create.
:param int fd: the file descriptor from which to read the stream.
:param begin_record: the stream's begin record.
:type begin_record: ``cffi`` `CData` representing the dmu_replay_record_t
structure.
:param bool force: whether to roll back or destroy the target filesystem
if that is required to receive the stream.
:param bool corrective: whether this stream should be used to heal data.
:param bool resumable: whether this stream should be treated as resumable.
If the receive fails due to premature stream termination, the
intermediate state will be preserved on disk and may subsequently be
resumed with :func:`lzc_send_resume`.
:param bool raw: whether this is a "raw" stream.
:param origin: the optional origin snapshot name if the stream is for a
clone.
:type origin: bytes or None
:param props: the properties to set on the snapshot as *received*
properties.
:type props: dict of bytes : Any
:param cmdprops: the properties to set on the snapshot as local overrides
to *received* properties. `bool` values are forcefully inherited while
every other value is set locally as if the command "zfs set" was
invoked immediately before the receive.
:type cmdprops: dict of bytes : Any
:param key: raw bytes representing user's wrapping key
:type key: bytes
:param int cleanup_fd: file descriptor used to set a cleanup-on-exit file
descriptor.
:param int action_handle: variable used to pass the handle for guid/ds
mapping: this should be set to zero on first call and will contain an
updated handle on success, it should be passed in subsequent calls.
:return: a tuple with two elements where the first one is the number of
bytes read from the file descriptor and the second one is the
action_handle return value.
:raises IOError: if an input / output error occurs while reading from the
``fd``.
:raises DatasetExists: if the snapshot named ``snapname`` already exists.
:raises DatasetExists: if the stream is a full stream and the destination
filesystem already exists.
:raises DatasetExists: if ``force`` is `True` but the destination
filesystem could not be rolled back to a matching snapshot because a
newer snapshot exists and it is an origin of a cloned filesystem.
:raises StreamMismatch: if an incremental stream is received and the latest
snapshot of the destination filesystem does not match the source
snapshot of the stream.
:raises StreamMismatch: if a full stream is received and the destination
filesystem already exists and it has at least one snapshot, and
``force`` is `False`.
:raises StreamMismatch: if an incremental clone stream is received but the
specified ``origin`` is not the actual received origin.
:raises DestinationModified: if an incremental stream is received and the
destination filesystem has been modified since the last snapshot and
``force`` is `False`.
:raises DestinationModified: if a full stream is received and the
destination filesystem already exists and it does not have any
snapshots, and ``force`` is `False`.
:raises DatasetNotFound: if the destination filesystem and its parent do
not exist.
:raises DatasetNotFound: if the ``origin`` is not `None` and does not
exist.
:raises DatasetBusy: if ``force`` is `True` but the destination filesystem
could not be rolled back to a matching snapshot because a newer
snapshot is held and could not be destroyed.
:raises DatasetBusy: if another receive operation is being performed on the
destination filesystem.
:raises EncryptionKeyNotLoaded: if corrective is set to true indicates the
key must be loaded to do a non-raw corrective recv on an encrypted
dataset.
:raises BadStream: if corrective is set to true indicates that
corrective recv was not able to reconstruct a corrupted block.
:raises BadStream: if the stream is corrupt or it is not recognized or it
is a compound stream or it is a clone stream, but ``origin`` is `None`.
:raises BadStream: if a clone stream is received and the destination
filesystem already exists.
:raises StreamFeatureNotSupported: if corrective is set to true indicates
stream is not compatible with the data in the pool.
:raises StreamFeatureNotSupported: if the stream has a feature that is not
supported on this side.
:raises ReceivePropertyFailure: if one or more of the specified properties
is invalid or has an invalid type or value.
:raises NameInvalid: if the name of either snapshot is invalid.
:raises NameTooLong: if the name of either snapshot is too long.
'''

if origin is not None:
c_origin = origin
else:
c_origin = _ffi.NULL
if action_handle is not None:
c_action_handle = _ffi.new("uint64_t *")
else:
c_action_handle = _ffi.NULL
c_read_bytes = _ffi.new("uint64_t *")
c_errflags = _ffi.new("uint64_t *")
if props is None:
props = {}
if cmdprops is None:
cmdprops = {}
if key is None:
key = b""
else:
key = bytes(key)

nvlist = nvlist_in(props)
cmdnvlist = nvlist_in(cmdprops)
properrs = {}
with nvlist_out(properrs) as c_errors:
ret = _lib.lzc_receive_with_heal(
snapname, nvlist, cmdnvlist, key, len(key), c_origin,
force, corrective, resumable, raw, fd, begin_record, cleanup_fd,
c_read_bytes, c_errflags, c_action_handle, c_errors)
errors.lzc_receive_translate_errors(
ret, snapname, fd, force, raw, False, False, origin, properrs)
return (int(c_read_bytes[0]), action_handle)


@_uncommitted()
def lzc_reopen(poolname, restart=True):
'''
Expand Down
4 changes: 4 additions & 0 deletions contrib/pyzfs/libzfs_core/bindings/libzfs_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@
uint8_t *, uint_t, const char *, boolean_t, boolean_t,
boolean_t, int, const dmu_replay_record_t *, int, uint64_t *,
uint64_t *, uint64_t *, nvlist_t **);
int lzc_receive_with_heal(const char *, nvlist_t *, nvlist_t *,
uint8_t *, uint_t, const char *, boolean_t, boolean_t, boolean_t,
boolean_t, int, const dmu_replay_record_t *, int, uint64_t *,
uint64_t *, uint64_t *, nvlist_t **);
int lzc_receive_with_header(const char *, nvlist_t *, const char *,
boolean_t, boolean_t, boolean_t, int, const dmu_replay_record_t *);
int lzc_release(nvlist_t *, nvlist_t **);
Expand Down
21 changes: 21 additions & 0 deletions contrib/pyzfs/libzfs_core/test/test_libzfs_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2911,6 +2911,27 @@ def test_recv_with_cmdprops(self):
self.assertEqual(fs.getProperty("compression"), b"on")
self.assertEqual(fs.getProperty("ns:prop"), b"val")

def test_recv_with_heal(self):
snap = ZFSTest.pool.makeName(b"fs1@snap1")
fs = ZFSTest.pool.getFilesystem(b"fs1")
props = {}
cmdprops = {
b"compression": 0x01,
b"ns:prop": b"val"
}

lzc.lzc_snapshot([snap])
with tempfile.TemporaryFile(suffix='.zstream') as stream:
lzc.lzc_send(snap, None, stream.fileno())
stream.seek(0)
(header, c_header) = lzc.receive_header(stream.fileno())
lzc.lzc_receive_with_heal(
snap, stream.fileno(), c_header, props=props,
cmdprops=cmdprops)
self.assertExists(snap)
self.assertEqual(fs.getProperty("compression"), b"on")
self.assertEqual(fs.getProperty("ns:prop"), b"val")

def test_recv_with_cmdprops_and_recvprops(self):
fromsnap = ZFSTest.pool.makeName(b"fs1@snap1")
fs = ZFSTest.pool.getFilesystem(b"recv")
Expand Down
3 changes: 3 additions & 0 deletions include/libzfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,9 @@ typedef struct recvflags {

/* force unmount while recv snapshot (private) */
boolean_t forceunmount;

/* use this recv to check (and heal if needed) an existing snapshot */
boolean_t heal;
} recvflags_t;

_LIBZFS_H int zfs_receive(libzfs_handle_t *, const char *, nvlist_t *,
Expand Down
6 changes: 5 additions & 1 deletion include/libzfs_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

/*
* Copyright (c) 2012, 2020 by Delphix. All rights reserved.
* Copyright (c) 2017 Datto Inc.
* Copyright 2017 RackTop Systems.
* Copyright (c) 2017 Open-E, Inc. All Rights Reserved.
* Copyright (c) 2019 Datto Inc.
*/

#ifndef _LIBZFS_CORE_H
Expand Down Expand Up @@ -114,6 +114,10 @@ _LIBZFS_CORE_H int lzc_receive_with_cmdprops(const char *, nvlist_t *,
nvlist_t *, uint8_t *, uint_t, const char *, boolean_t, boolean_t,
boolean_t, int, const struct dmu_replay_record *, int, uint64_t *,
uint64_t *, uint64_t *, nvlist_t **);
_LIBZFS_CORE_H int lzc_receive_with_heal(const char *, nvlist_t *, nvlist_t *,
uint8_t *, uint_t, const char *, boolean_t, boolean_t, boolean_t, boolean_t,
int, const struct dmu_replay_record *, int, uint64_t *, uint64_t *,
uint64_t *, nvlist_t **);
_LIBZFS_CORE_H int lzc_send_space(const char *, const char *,
enum lzc_send_flags, uint64_t *);
_LIBZFS_CORE_H int lzc_send_space_resume_redacted(const char *, const char *,
Expand Down
4 changes: 3 additions & 1 deletion include/sys/dmu_recv.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* Copyright (c) 2012, 2020 by Delphix. All rights reserved.
* Copyright 2011 Nexenta Systems, Inc. All rights reserved.
* Copyright (c) 2013, Joyent, Inc. All rights reserved.
* Copyright (c) 2019 Datto Inc.
*/

#ifndef _DMU_RECV_H
Expand All @@ -47,6 +48,7 @@ typedef struct dmu_recv_cookie {
boolean_t drc_byteswap;
uint64_t drc_featureflags;
boolean_t drc_force;
boolean_t drc_heal;
boolean_t drc_resumable;
boolean_t drc_should_save;
boolean_t drc_raw;
Expand Down Expand Up @@ -78,7 +80,7 @@ typedef struct dmu_recv_cookie {
} dmu_recv_cookie_t;

int dmu_recv_begin(char *, char *, dmu_replay_record_t *,
boolean_t, boolean_t, nvlist_t *, nvlist_t *, char *,
boolean_t, boolean_t, boolean_t, nvlist_t *, nvlist_t *, char *,
dmu_recv_cookie_t *, zfs_file_t *, offset_t *);
int dmu_recv_stream(dmu_recv_cookie_t *, offset_t *);
int dmu_recv_end(dmu_recv_cookie_t *, void *);
Expand Down
3 changes: 2 additions & 1 deletion include/sys/spa.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
* Copyright 2013 Saso Kiselkov. All rights reserved.
* Copyright (c) 2014 Integros [integros.com]
* Copyright 2017 Joyent, Inc.
* Copyright (c) 2017, 2019, Datto Inc. All rights reserved.
* Copyright (c) 2017, Intel Corporation.
* Copyright (c) 2019, Allan Jude
* Copyright (c) 2019, Klara Inc.
* Copyright (c) 2019, Datto Inc.
*/

#ifndef _SYS_SPA_H
Expand Down Expand Up @@ -1134,6 +1134,7 @@ extern const char *spa_state_to_name(spa_t *spa);
/* error handling */
struct zbookmark_phys;
extern void spa_log_error(spa_t *spa, const zbookmark_phys_t *zb);
extern void spa_remove_error(spa_t *spa, zbookmark_phys_t *zb);
extern int zfs_ereport_post(const char *clazz, spa_t *spa, vdev_t *vd,
const zbookmark_phys_t *zb, zio_t *zio, uint64_t state);
extern boolean_t zfs_ereport_is_valid(const char *clazz, spa_t *spa, vdev_t *vd,
Expand Down
3 changes: 2 additions & 1 deletion include/sys/spa_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
* Copyright (c) 2014 Spectra Logic Corporation, All rights reserved.
* Copyright 2013 Saso Kiselkov. All rights reserved.
* Copyright (c) 2016 Actifio, Inc. All rights reserved.
* Copyright (c) 2017 Datto Inc.
* Copyright (c) 2017, Intel Corporation.
* Copyright (c) 2019 Datto Inc.
*/

#ifndef _SYS_SPA_IMPL_H
Expand Down Expand Up @@ -349,6 +349,7 @@ struct spa {
kmutex_t spa_errlist_lock; /* error list/ereport lock */
avl_tree_t spa_errlist_last; /* last error list */
avl_tree_t spa_errlist_scrub; /* scrub error list */
avl_tree_t spa_errlist_healed; /* list of healed blocks */
uint64_t spa_deflate; /* should we deflate? */
uint64_t spa_history; /* history object */
kmutex_t spa_history_lock; /* history lock */
Expand Down
2 changes: 2 additions & 0 deletions include/sys/zio.h
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,8 @@ extern zio_t *zio_null(zio_t *pio, spa_t *spa, vdev_t *vd,
extern zio_t *zio_root(spa_t *spa,
zio_done_func_t *done, void *priv, enum zio_flag flags);

extern void zio_destroy(zio_t *zio);

extern zio_t *zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
struct abd *data, uint64_t lsize, zio_done_func_t *done, void *priv,
zio_priority_t priority, enum zio_flag flags, const zbookmark_phys_t *zb);
Expand Down
Loading

0 comments on commit c9114a0

Please sign in to comment.