Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup msgpack related str/bytes mess #6668

23 changes: 9 additions & 14 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from .platform import uid2user, user2uid, gid2group, group2gid
from .helpers import parse_timestamp, to_localtime
from .helpers import OutputTimestamp, format_timedelta, format_file_size, file_status, FileSize
from .helpers import safe_encode, safe_decode, make_path_safe, remove_surrogates
from .helpers import safe_encode, make_path_safe, remove_surrogates
from .helpers import StableDict
from .helpers import bin_to_hex
from .helpers import safe_ns
Expand Down Expand Up @@ -492,7 +492,6 @@ def _load_meta(self, id):
def load(self, id):
self.id = id
self.metadata = self._load_meta(self.id)
self.metadata.cmdline = [safe_decode(arg) for arg in self.metadata.cmdline]
self.name = self.metadata.name
self.comment = self.metadata.get('comment', '')

Expand Down Expand Up @@ -1550,7 +1549,7 @@ class RobustUnpacker:
"""
def __init__(self, validator, item_keys):
super().__init__()
self.item_keys = [msgpack.packb(name.encode()) for name in item_keys]
self.item_keys = [msgpack.packb(name) for name in item_keys]
self.validator = validator
self._buffered_data = []
self._resync = False
Expand Down Expand Up @@ -1754,13 +1753,10 @@ def rebuild_manifest(self):

Iterates through all objects in the repository looking for archive metadata blocks.
"""
required_archive_keys = frozenset(key.encode() for key in REQUIRED_ARCHIVE_KEYS)

def valid_archive(obj):
if not isinstance(obj, dict):
return False
keys = set(obj)
return required_archive_keys.issubset(keys)
return REQUIRED_ARCHIVE_KEYS.issubset(obj)

logger.info('Rebuilding missing manifest, this might take some time...')
# as we have lost the manifest, we do not know any more what valid item keys we had.
Expand All @@ -1769,7 +1765,7 @@ def valid_archive(obj):
# lost manifest on a older borg version than the most recent one that was ever used
# within this repository (assuming that newer borg versions support more item keys).
manifest = Manifest(self.key, self.repository)
archive_keys_serialized = [msgpack.packb(name.encode()) for name in ARCHIVE_KEYS]
archive_keys_serialized = [msgpack.packb(name) for name in ARCHIVE_KEYS]
pi = ProgressIndicatorPercent(total=len(self.chunks), msg="Rebuilding manifest %6.2f%%", step=0.01,
msgid='check.rebuild_manifest')
for chunk_id, _ in self.chunks.iteritems():
Expand Down Expand Up @@ -1916,9 +1912,9 @@ def robust_iterator(archive):

Missing item chunks will be skipped and the msgpack stream will be restarted
"""
item_keys = frozenset(key.encode() for key in self.manifest.item_keys)
required_item_keys = frozenset(key.encode() for key in REQUIRED_ITEM_KEYS)
unpacker = RobustUnpacker(lambda item: isinstance(item, StableDict) and b'path' in item,
item_keys = self.manifest.item_keys
required_item_keys = REQUIRED_ITEM_KEYS
unpacker = RobustUnpacker(lambda item: isinstance(item, StableDict) and 'path' in item,
self.manifest.item_keys)
_state = 0

Expand All @@ -1940,9 +1936,9 @@ def list_keys_safe(keys):
def valid_item(obj):
if not isinstance(obj, StableDict):
return False, 'not a dictionary'
# A bug in Attic up to and including release 0.13 added a (meaningless) b'acl' key to every item.
# A bug in Attic up to and including release 0.13 added a (meaningless) 'acl' key to every item.
# We ignore it here, should it exist. See test_attic013_acl_bug for details.
obj.pop(b'acl', None)
obj.pop('acl', None)
keys = set(obj)
if not required_item_keys.issubset(keys):
return False, 'missing required keys: ' + list_keys_safe(required_item_keys - keys)
Expand Down Expand Up @@ -2029,7 +2025,6 @@ def valid_item(obj):
archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
if archive.version != 1:
raise Exception('Unknown archive metadata version')
archive.cmdline = [safe_decode(arg) for arg in archive.cmdline]
items_buffer = ChunkBuffer(self.key)
items_buffer.write_chunk = add_callback
for item in robust_iterator(archive):
Expand Down
148 changes: 140 additions & 8 deletions src/borg/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from .archive import has_link
from .cache import Cache, assert_secure, SecurityManager
from .constants import * # NOQA
from .compress import CompressionSpec
from .compress import CompressionSpec, ZLIB, ZLIB_legacy
from .crypto.key import key_creator, key_argument_names, tam_required_file, tam_required
from .crypto.key import RepoKey, KeyfileKey, Blake2RepoKey, Blake2KeyfileKey, FlexiKey
from .crypto.keymanager import KeyManager
Expand All @@ -54,7 +54,7 @@
from .helpers import PrefixSpec, GlobSpec, CommentSpec, SortBySpec, FilesCacheMode
from .helpers import BaseFormatter, ItemFormatter, ArchiveFormatter
from .helpers import format_timedelta, format_file_size, parse_file_size, format_archive
from .helpers import safe_encode, remove_surrogates, bin_to_hex, prepare_dump_dict, eval_escapes
from .helpers import remove_surrogates, bin_to_hex, prepare_dump_dict, eval_escapes
from .helpers import interval, prune_within, prune_split, PRUNING_PATTERNS
from .helpers import timestamp
from .helpers import get_cache_dir, os_stat
Expand Down Expand Up @@ -338,6 +338,101 @@ def do_serve(self, args):
).serve()
return EXIT_SUCCESS

@with_other_repository(manifest=True, key=True, compatibility=(Manifest.Operation.READ,))
@with_repository(exclusive=True, manifest=True, cache=True, compatibility=(Manifest.Operation.WRITE,))
def do_transfer(self, args, *,
repository, manifest, key, cache,
other_repository=None, other_manifest=None, other_key=None):
"""archives transfer from other repository"""

def upgrade_item(item):
"""upgrade item as needed, get rid of legacy crap"""
if item.get('hardlink_master', True) and 'source' not in item and hardlinkable(item.mode):
item._dict['hlid'] = hlid = hashlib.sha256(item._dict['path'])
hardlink_masters[hlid] = (item._dict.get('chunks'), item._dict.get('chunks_healthy'))
elif 'source' in item and hardlinkable(item.mode):
item._dict['hlid'] = hlid = hashlib.sha256(item._dict['source'])
chunks, chunks_healthy = hardlink_masters.get(hlid, (None, None))
if chunks is not None:
item._dict['chunks'] = chunks
for chunk_id, _, _ in chunks:
cache.chunk_incref(chunk_id, archive.stats)
if chunks_healthy is not None:
item._dict['chunks_healthy'] = chunks
item._dict.pop('source') # not used for hardlinks any more, replaced by hlid
for attr in 'atime', 'ctime', 'mtime', 'birthtime':
if attr in item:
ns = getattr(item, attr) # decode (bigint or Timestamp) --> int ns
setattr(item, attr, ns) # encode int ns --> msgpack.Timestamp only, no bigint any more
item._dict.pop('hardlink_master', None) # not used for hardlinks any more, replaced by hlid
item._dict.pop('acl', None) # remove remnants of bug in attic <= 0.13
item.get_size(memorize=True) # if not already present: compute+remember size for items with chunks
return item

def upgrade_compressed_chunk(chunk):
if ZLIB_legacy.detect(chunk):
chunk = ZLIB.ID + chunk # get rid of the attic legacy: prepend separate type bytes for zlib
return chunk

dry_run = args.dry_run

args.consider_checkpoints = True
archive_names = tuple(x.name for x in other_manifest.archives.list_considering(args))
if not archive_names:
return EXIT_SUCCESS

for name in archive_names:
transfer_size = 0
present_size = 0
if name in manifest.archives and not dry_run:
print(f"{name}: archive is already present in destination repo, skipping.")
else:
if not dry_run:
print(f"{name}: copying archive to destination repo...")
hardlink_masters = {}
other_archive = Archive(other_repository, other_key, other_manifest, name)
archive = Archive(repository, key, manifest, name, cache=cache, create=True) if not dry_run else None
for item in other_archive.iter_items():
if 'chunks' in item:
chunks = []
for chunk_id, size, _ in item.chunks:
refcount = cache.seen_chunk(chunk_id, size)
if refcount == 0: # target repo does not yet have this chunk
if not dry_run:
cdata = other_repository.get(chunk_id)
# keep compressed payload same, avoid decompression / recompression
data = other_key.decrypt(chunk_id, cdata, decompress=False)
data = upgrade_compressed_chunk(data)
chunk_entry = cache.add_chunk(chunk_id, data, archive.stats, wait=False,
compress=False, size=size)
cache.repository.async_response(wait=False)
chunks.append(chunk_entry)
transfer_size += size
else:
if not dry_run:
chunk_entry = cache.chunk_incref(chunk_id, archive.stats)
chunks.append(chunk_entry)
present_size += size
if not dry_run:
item.chunks = chunks # overwrite! IDs and sizes are same, csizes are likely different
archive.stats.nfiles += 1
if not dry_run:
archive.add_item(upgrade_item(item))
if not dry_run:
additional_metadata = {}
# keep all metadata except archive version and stats. also do not keep
# recreate_source_id, recreate_args, recreate_partial_chunks which were used only in 1.1.0b1 .. b2.
for attr in ('cmdline', 'hostname', 'username', 'time', 'time_end', 'comment',
'chunker_params', 'recreate_cmdline'):
if hasattr(other_archive.metadata, attr):
additional_metadata[attr] = getattr(other_archive.metadata, attr)
archive.save(stats=archive.stats, additional_metadata=additional_metadata)
print(f"{name}: finished. transfer_size: {transfer_size} present_size: {present_size}")
else:
print(f"{name}: completed" if transfer_size == 0 else
f"{name}: incomplete, transfer_size: {transfer_size} present_size: {present_size}")
return EXIT_SUCCESS

@with_repository(create=True, exclusive=True, manifest=False)
@with_other_repository(key=True, compatibility=(Manifest.Operation.READ, ))
def do_init(self, args, repository, *, other_repository=None, other_key=None):
Expand Down Expand Up @@ -1844,12 +1939,12 @@ def do_upgrade(self, args, repository, manifest=None, key=None):
print('This repository is not encrypted, cannot enable TAM.')
return EXIT_ERROR

if not manifest.tam_verified or not manifest.config.get(b'tam_required', False):
if not manifest.tam_verified or not manifest.config.get('tam_required', False):
# The standard archive listing doesn't include the archive ID like in borg 1.1.x
print('Manifest contents:')
for archive_info in manifest.archives.list(sort_by=['ts']):
print(format_archive(archive_info), '[%s]' % bin_to_hex(archive_info.id))
manifest.config[b'tam_required'] = True
manifest.config['tam_required'] = True
manifest.write()
repository.commit(compact=False)
if not key.tam_required:
Expand All @@ -1872,7 +1967,7 @@ def do_upgrade(self, args, repository, manifest=None, key=None):
print('Key updated')
if hasattr(key, 'find_key'):
print('Key location:', key.find_key())
manifest.config[b'tam_required'] = False
manifest.config['tam_required'] = False
manifest.write()
repository.commit(compact=False)
else:
Expand Down Expand Up @@ -2205,7 +2300,7 @@ def do_debug_dump_archive(self, args, repository, manifest, key):
"""dump decoded archive metadata (not: data)"""

try:
archive_meta_orig = manifest.archives.get_raw_dict()[safe_encode(args.location.archive)]
archive_meta_orig = manifest.archives.get_raw_dict()[args.location.archive]
except KeyError:
raise Archive.DoesNotExist(args.location.archive)

Expand All @@ -2222,7 +2317,7 @@ def output(fd):
fd.write(do_indent(prepare_dump_dict(archive_meta_orig)))
fd.write(',\n')

data = key.decrypt(archive_meta_orig[b'id'], repository.get(archive_meta_orig[b'id']))
data = key.decrypt(archive_meta_orig['id'], repository.get(archive_meta_orig['id']))
archive_org_dict = msgpack.unpackb(data, object_hook=StableDict)

fd.write(' "_meta":\n')
Expand All @@ -2232,7 +2327,7 @@ def output(fd):

unpacker = msgpack.Unpacker(use_list=False, object_hook=StableDict)
first = True
for item_id in archive_org_dict[b'items']:
for item_id in archive_org_dict['items']:
data = key.decrypt(item_id, repository.get(item_id))
unpacker.feed(data)
for item in unpacker:
Expand Down Expand Up @@ -4083,6 +4178,43 @@ def define_borg_mount(parser):
help='archives to delete')
define_archive_filters_group(subparser)

# borg transfer
transfer_epilog = process_epilog("""
This command transfers archives from one repository to another repository.

Suggested use:

# initialize DST_REPO reusing key material from SRC_REPO, so that
# chunking and chunk id generation will work in the same way as before.
borg init --other-location=SRC_REPO --encryption=DST_ENC DST_REPO

# transfer archives from SRC_REPO to DST_REPO
borg transfer --dry-run SRC_REPO DST_REPO # check what it would do
borg transfer SRC_REPO DST_REPO # do it!
borg transfer --dry-run SRC_REPO DST_REPO # check! anything left?

The default is to transfer all archives, including checkpoint archives.

You could use the misc. archive filter options to limit which archives it will
transfer, e.g. using the --prefix option. This is recommended for big
repositories with multiple data sets to keep the runtime per invocation lower.
""")
subparser = subparsers.add_parser('transfer', parents=[common_parser], add_help=False,
description=self.do_transfer.__doc__,
epilog=transfer_epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
help='transfer of archives from another repository')
subparser.set_defaults(func=self.do_transfer)
subparser.add_argument('-n', '--dry-run', dest='dry_run', action='store_true',
help='do not change repository, just check')
subparser.add_argument('other_location', metavar='SRC_REPOSITORY',
type=location_validator(archive=False, other=True),
help='source repository')
subparser.add_argument('location', metavar='DST_REPOSITORY',
type=location_validator(archive=False, other=False),
help='destination repository')
define_archive_filters_group(subparser)

# borg diff
diff_epilog = process_epilog("""
This command finds differences (file contents, user/group/mode) between archives.
Expand Down
11 changes: 6 additions & 5 deletions src/borg/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .helpers import Error
from .helpers import Manifest
from .helpers import get_cache_dir, get_security_dir
from .helpers import int_to_bigint, bigint_to_int, bin_to_hex, parse_stringified_list
from .helpers import bin_to_hex, parse_stringified_list
from .helpers import format_file_size
from .helpers import safe_ns
from .helpers import yes
Expand All @@ -28,6 +28,7 @@
from .helpers import set_ec, EXIT_WARNING
from .helpers import safe_unlink
from .helpers import msgpack
from .helpers.msgpack import int_to_timestamp, timestamp_to_int
from .item import ArchiveItem, ChunkListEntry
from .crypto.key import PlaintextKey
from .crypto.file_integrity import IntegrityCheckedFile, DetachedIntegrityCheckedFile, FileIntegrityError
Expand Down Expand Up @@ -623,7 +624,7 @@ def commit(self):
# this is to avoid issues with filesystem snapshots and cmtime granularity.
# Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
entry = FileCacheEntry(*msgpack.unpackb(item))
if entry.age == 0 and bigint_to_int(entry.cmtime) < self._newest_cmtime or \
if entry.age == 0 and timestamp_to_int(entry.cmtime) < self._newest_cmtime or \
entry.age > 0 and entry.age < ttl:
msgpack.pack((path_hash, entry), fd)
entry_count += 1
Expand Down Expand Up @@ -1018,10 +1019,10 @@ def file_known_and_unchanged(self, hashed_path, path_hash, st):
if 'i' in cache_mode and entry.inode != st.st_ino:
files_cache_logger.debug('KNOWN-CHANGED: file inode number has changed: %r', hashed_path)
return True, None
if 'c' in cache_mode and bigint_to_int(entry.cmtime) != st.st_ctime_ns:
if 'c' in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns:
files_cache_logger.debug('KNOWN-CHANGED: file ctime has changed: %r', hashed_path)
return True, None
elif 'm' in cache_mode and bigint_to_int(entry.cmtime) != st.st_mtime_ns:
elif 'm' in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns:
files_cache_logger.debug('KNOWN-CHANGED: file mtime has changed: %r', hashed_path)
return True, None
# we ignored the inode number in the comparison above or it is still same.
Expand Down Expand Up @@ -1049,7 +1050,7 @@ def memorize_file(self, hashed_path, path_hash, st, ids):
elif 'm' in cache_mode:
cmtime_type = 'mtime'
cmtime_ns = safe_ns(st.st_mtime_ns)
entry = FileCacheEntry(age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_bigint(cmtime_ns), chunk_ids=ids)
entry = FileCacheEntry(age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunk_ids=ids)
self.files[path_hash] = msgpack.packb(entry)
self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns)
files_cache_logger.debug('FILES-CACHE-UPDATE: put %r [has %s] <- %r',
Expand Down
Loading