diff --git a/attic/archive.py b/attic/archive.py index d78ce4b8..d91f9adb 100644 --- a/attic/archive.py +++ b/attic/archive.py @@ -1,6 +1,7 @@ from datetime import datetime, timedelta, timezone from getpass import getuser from itertools import groupby +import bintrees import errno import shutil import tempfile @@ -12,6 +13,7 @@ import stat import sys import time +from functools import total_ordering from io import BytesIO from attic import xattr from attic.platform import acl_get, acl_set @@ -63,6 +65,42 @@ def fetch_many(self, ids, is_preloaded=False): yield self.key.decrypt(id_, data) +@total_ordering +class BFSPath(): + """ Container for segments of file paths to make them comparable in breadth-first order. + """ + + def __init__(self, path=[]): + self.path = path + + def __lt__(self, other): + if len(self.path) != len(other.path): + return len(self.path) < len(other.path) + return list.__lt__(self.path, other.path) + + def __eq__(self, other): + return self.path == other.path + + +class MetadataIndex: + + def __init__(self): + self.tree = bintrees.RBTree() + + def add(self, path, id, offset, length): + self.tree[BFSPath(path)] = (id, offset, length) + + def lookup(self, path): + try: + return self.tree.floor_item(BFSPath(path)) + except KeyError: + return self.tree.min_item() + + def lookup_many(self, path): + startkey = self.lookup(path)[0] + return self.tree.value_slice(startkey, None) + + class ChunkBuffer: BUFFER_SIZE = 1 * 1024 * 1024 @@ -72,8 +110,15 @@ def __init__(self, key): self.chunks = [] self.key = key self.chunker = Chunker(WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed) + # Keep track of first item in each series of ChunkBuffer fill-ups + self.firstitem = None + self.partialchunk = 0 + self.chunks_index = [] def add(self, item): + if self.firstitem is None: + self.firstitem = item + self.partialchunk = self.buffer.tell() self.buffer.write(self.packer.pack(StableDict(item))) if self.is_full(): self.flush() @@ -86,10 +131,14 @@ def flush(self, flush=False): return self.buffer.seek(0) chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer)) + length = self.buffer.tell() self.buffer.seek(0) self.buffer.truncate(0) # Leave the last partial chunk in the buffer unless flush is True end = None if flush or len(chunks) == 1 else -1 + # Put index and offset of next item into chunks index + self.chunks_index.append((self.firstitem[b'path'], (len(self.chunks), self.partialchunk, length))) + self.firstitem = None for chunk in chunks[:end]: self.chunks.append(self.write_chunk(chunk)) if end == -1: @@ -154,11 +203,16 @@ def load(self, id): self.id = id data = self.key.decrypt(self.id, self.repository.get(self.id)) self.metadata = msgpack.unpackb(data) - if self.metadata[b'version'] != 1: - raise Exception('Unknown archive metadata version') + if self.metadata[b'version'] not in [1, 2]: + raise Exception('Unknown archive metadata version: {}'.format(self.metadata[b'version'])) decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time')) self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']] self.name = self.metadata[b'name'] + self.metadata_index = None + if self.metadata[b'version'] >= 2: + self.metadata_index = MetadataIndex() + for (path, (idx, offset, length)) in self.metadata[b'metadata_index']: + self.metadata_index.add(path.split(b'/'), idx, offset, length) @property def ts(self): @@ -190,9 +244,10 @@ def save(self, name=None): raise self.AlreadyExists(name) self.items_buffer.flush(flush=True) metadata = StableDict({ - 'version': 1, + 'version': 2, 'name': name, 'items': self.items_buffer.chunks, + 'metadata_index': self.items_buffer.chunks_index, 'cmdline': sys.argv, 'hostname': socket.gethostname(), 'username': getuser(), @@ -583,7 +638,7 @@ def rebuild_manifest(self): # Some basic sanity checks of the payload before feeding it into msgpack if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0): continue - if not b'cmdline' in data or not b'\xa7version\x01' in data: + if not b'cmdline' in data or not any(v in data for v in [b'\xa7version\x01', b'\xa7version\x02']): continue try: archive = msgpack.unpackb(data) @@ -683,8 +738,8 @@ def missing_chunk_detector(chunk_id): cdata = self.repository.get(archive_id) data = self.key.decrypt(archive_id, cdata) archive = StableDict(msgpack.unpackb(data)) - if archive[b'version'] != 1: - raise Exception('Unknown archive metadata version') + if archive[b'version'] not in [1, 2]: + raise Exception('Unknown archive metadata version: {}'.format(archive[b'version'])) decode_dict(archive, (b'name', b'hostname', b'username', b'time')) # fixme: argv items_buffer = ChunkBuffer(self.key) items_buffer.write_chunk = add_callback diff --git a/attic/archiver.py b/attic/archiver.py index 47650c2d..45f5780c 100644 --- a/attic/archiver.py +++ b/attic/archiver.py @@ -2,6 +2,7 @@ from binascii import hexlify from datetime import datetime from operator import attrgetter +from collections import deque import functools import io import os @@ -142,47 +143,49 @@ def do_create(self, args): return self.exit_code def _process(self, archive, cache, excludes, exclude_caches, skip_inodes, path, restrict_dev): - if exclude_path(path, excludes): - return - try: - st = os.lstat(path) - except OSError as e: - self.print_error('%s: %s', path, e) - return - if (st.st_ino, st.st_dev) in skip_inodes: - return - # Entering a new filesystem? - if restrict_dev and st.st_dev != restrict_dev: - return - # Ignore unix sockets - if stat.S_ISSOCK(st.st_mode): - return - self.print_verbose(remove_surrogates(path)) - if stat.S_ISREG(st.st_mode): - try: - archive.process_file(path, st, cache) - except IOError as e: - self.print_error('%s: %s', path, e) - elif stat.S_ISDIR(st.st_mode): - if exclude_caches and is_cachedir(path): - return - archive.process_item(path, st) + queue = deque([path]) + while queue: + path = queue.popleft() + if exclude_path(path, excludes): + continue try: - entries = os.listdir(path) + st = os.lstat(path) except OSError as e: self.print_error('%s: %s', path, e) + continue + if (st.st_ino, st.st_dev) in skip_inodes: + continue + # Entering a new filesystem? + if restrict_dev and st.st_dev != restrict_dev: + continue + # Ignore unix sockets + if stat.S_ISSOCK(st.st_mode): + continue + self.print_verbose(remove_surrogates(path)) + if stat.S_ISREG(st.st_mode): + try: + archive.process_file(path, st, cache) + except IOError as e: + self.print_error('%s: %s', path, e) + elif stat.S_ISDIR(st.st_mode): + if exclude_caches and is_cachedir(path): + continue + archive.process_item(path, st) + try: + entries = os.listdir(path) + except OSError as e: + self.print_error('%s: %s', path, e) + else: + for filename in sorted(entries): + queue.append(os.path.join(path, filename)) + elif stat.S_ISLNK(st.st_mode): + archive.process_symlink(path, st) + elif stat.S_ISFIFO(st.st_mode): + archive.process_item(path, st) + elif stat.S_ISCHR(st.st_mode) or stat.S_ISBLK(st.st_mode): + archive.process_dev(path, st) else: - for filename in sorted(entries): - self._process(archive, cache, excludes, exclude_caches, skip_inodes, - os.path.join(path, filename), restrict_dev) - elif stat.S_ISLNK(st.st_mode): - archive.process_symlink(path, st) - elif stat.S_ISFIFO(st.st_mode): - archive.process_item(path, st) - elif stat.S_ISCHR(st.st_mode) or stat.S_ISBLK(st.st_mode): - archive.process_dev(path, st) - else: - self.print_error('Unknown file type: %s', path) + self.print_error('Unknown file type: %s', path) def do_extract(self, args): """Extract archive contents""" @@ -195,7 +198,7 @@ def do_extract(self, args): patterns = adjust_patterns(args.paths, args.excludes) dry_run = args.dry_run strip_components = args.strip_components - dirs = [] + dirs = deque() for item in archive.iter_items(lambda item: not exclude_path(item[b'path'], patterns), preload=True): orig_path = item[b'path'] if strip_components: @@ -203,8 +206,21 @@ def do_extract(self, args): if not item[b'path']: continue if not args.dry_run: - while dirs and not item[b'path'].startswith(dirs[-1][b'path']): - archive.extract_item(dirs.pop(-1)) + if archive.metadata[b'version'] >= 2: + # Items are in breadth-first order + current = item[b'path'].split(os.sep) + while dirs: + nextpending = dirs[0][b'path'].split(os.sep) + # Directory attributes can be applied if we're one + # level further and "right" of the pending subtree + if len(current) > len(nextpending) and current[:-1] > nextpending: + archive.extract_item(dirs.popleft()) + else: + break + else: + # Items are in depth-first order + while dirs and not item[b'path'].startswith(dirs[-1][b'path']): + archive.extract_item(dirs.pop()) self.print_verbose(remove_surrogates(orig_path)) try: if dry_run: @@ -220,7 +236,7 @@ def do_extract(self, args): if not args.dry_run: while dirs: - archive.extract_item(dirs.pop(-1)) + archive.extract_item(dirs.popleft()) return self.exit_code def do_delete(self, args): @@ -256,7 +272,7 @@ def do_mount(self, args): archive = Archive(repository, key, manifest, args.src.archive) else: archive = None - operations = AtticOperations(key, repository, manifest, archive) + operations = AtticOperations(key, repository, manifest, archive, args.verbose and args.foreground) self.print_verbose("Mounting filesystem") try: operations.mount(args.mountpoint, args.options, args.foreground) diff --git a/attic/cache.py b/attic/cache.py index acbc7665..f70b4d6a 100644 --- a/attic/cache.py +++ b/attic/cache.py @@ -156,7 +156,7 @@ def add(id, size, csize): data = self.key.decrypt(archive_id, cdata) add(archive_id, len(data), len(cdata)) archive = msgpack.unpackb(data) - if archive[b'version'] != 1: + if archive[b'version'] not in [1, 2]: raise Exception('Unknown archive metadata version') decode_dict(archive, (b'name',)) print('Analyzing archive:', archive[b'name']) diff --git a/attic/fuse.py b/attic/fuse.py index bc102173..ed8639dd 100644 --- a/attic/fuse.py +++ b/attic/fuse.py @@ -7,8 +7,8 @@ import stat import tempfile import time -from attic.archive import Archive -from attic.helpers import daemonize +from attic.archive import Archive, BFSPath +from attic.helpers import daemonize, format_file_size from attic.remote import cache_if_remote # Does this version of llfuse support ns precision? @@ -33,49 +33,83 @@ def get(self, inode): class AtticOperations(llfuse.Operations): """Export Attic archive as a fuse filesystem """ - def __init__(self, key, repository, manifest, archive): + def __init__(self, key, repository, manifest, archive, verbose=False): super(AtticOperations, self).__init__() self._inode_count = 0 self.key = key self.repository = cache_if_remote(repository) self.items = {} self.parent = {} + self.itemnames = {} self.contents = defaultdict(dict) self.default_dir = {b'mode': 0o40755, b'mtime': int(time.time() * 1e9), b'uid': os.getuid(), b'gid': os.getgid()} - self.pending_archives = {} + self.archives = {} + self.processed_items_subsets = {} # per archive self.cache = ItemCache() + self.verbose = verbose + + # Create root inode + self.parent[1] = self.allocate_inode() + self.items[1] = self.default_dir if archive: - self.process_archive(archive) + self.archives[1] = archive else: - # Create root inode - self.parent[1] = self.allocate_inode() - self.items[1] = self.default_dir for archive_name in manifest.archives: # Create archive placeholder inode archive_inode = self.allocate_inode() self.items[archive_inode] = self.default_dir self.parent[archive_inode] = 1 self.contents[1][os.fsencode(archive_name)] = archive_inode - self.pending_archives[archive_inode] = Archive(repository, key, manifest, archive_name) + self.itemnames[archive_inode] = os.fsencode(archive_name) + self.archives[archive_inode] = Archive(repository, key, manifest, archive_name) + self.processed_items_subsets[archive_inode] = {} + + def _load_items_subset(self, items, processed_items, index, skipbytes, length, prefix): + stats = { 'bytes': 0, 'chunks': 0, 'inodes': 0 } + + # Skip subset if already loaded: + if index in processed_items: + return processed_items[index] - def process_archive(self, archive, prefix=[]): - """Build fuse inode hierarchy from archive metadata - """ unpacker = msgpack.Unpacker() - for key, chunk in zip(archive.metadata[b'items'], self.repository.get_many(archive.metadata[b'items'])): + # Keep track of current segments to return afterwards + final_segments = None + for id in range(index, len(items)): + key = items[id] + + if length == 0: + break + + chunk = self.repository.get(key) + stats['bytes'] += len(chunk) + stats['chunks'] += 1 data = self.key.decrypt(key, chunk) + datalength = len(data) + + if length: + if length >= datalength: + length -= datalength + else: + data = data[:length] + length = 0 + + if skipbytes > datalength: + skipbytes -= datalength + continue + elif skipbytes > 0: + data = data[skipbytes:] + skipbytes = 0 + unpacker.feed(data) for item in unpacker: + stats['inodes'] += 1 segments = prefix + os.fsencode(os.path.normpath(item[b'path'])).split(b'/') del item[b'path'] num_segments = len(segments) + final_segments = segments + parent = 1 for i, segment in enumerate(segments, 1): - # Insert a default root inode if needed - if self._inode_count == 0 and segment: - archive_inode = self.allocate_inode() - self.items[archive_inode] = self.default_dir - self.parent[archive_inode] = parent # Leaf segment? if i == num_segments: if b'source' in item and stat.S_ISREG(item[b'mode']): @@ -88,6 +122,7 @@ def process_archive(self, archive, prefix=[]): self.parent[inode] = parent if segment: self.contents[parent][segment] = inode + self.itemnames[inode] = segment elif segment in self.contents[parent]: parent = self.contents[parent][segment] else: @@ -96,7 +131,56 @@ def process_archive(self, archive, prefix=[]): self.parent[inode] = parent if segment: self.contents[parent][segment] = inode + self.itemnames[inode] = segment parent = inode + if self.verbose: + print('Fetched {} chunks ({}), unpacked {} inodes' .format(stats['chunks'], format_file_size(stats['bytes']), stats['inodes'])) + processed_items[index] = final_segments + return final_segments + + def _load_pending_item(self, inode, name=None): + # Ignore root inode unless it is an archive + if inode == 1 and inode not in self.archives: + return + + # Follow inode upwards to find archive and obtain the full path of the + # requested item: + full_segments = [name if name else b''] + archive_inode = inode + while archive_inode not in self.archives: + full_segments.append(self.itemnames[archive_inode]) + archive_inode = self.parent[archive_inode] + full_segments.reverse() + + archive = self.archives[archive_inode] + + if archive_inode != 1: + prefix = [os.fsencode(archive.name)] + else: + prefix = [] + + items = archive.metadata[b'items'] + processed_items = self.processed_items_subsets[archive_inode] + + if archive.metadata[b'version'] >= 2: + if name: + # Fetch subset containing the requested name + index, skipbytes, length = archive.metadata_index.lookup(full_segments)[1] + self._load_items_subset(items, processed_items, index, skipbytes, length, prefix) + else: + # If no specific name is queried, load entire directory by + # unpacking subsets for as long as the most recently unpacked path + # is 'less than' (in breadth-first ordering) the requested path. + for index, skipbytes, length in archive.metadata_index.lookup_many(full_segments): + final_segments = self._load_items_subset(items, processed_items, index, skipbytes, length, prefix) + if final_segments: + if prefix: + final_segments = final_segments[1:] + if BFSPath(final_segments) > BFSPath(full_segments): + break + else: + # Fetch everything + self._load_items_subset(items, processed_items, index=0, skipbytes=0, length=None, prefix=prefix) def allocate_inode(self): self._inode_count += 1 @@ -168,14 +252,8 @@ def getxattr(self, inode, name): except KeyError: raise llfuse.FUSEError(errno.ENODATA) - def _load_pending_archive(self, inode): - # Check if this is an archive we need to load - archive = self.pending_archives.pop(inode, None) - if archive: - self.process_archive(archive, [os.fsencode(archive.name)]) - def lookup(self, parent_inode, name): - self._load_pending_archive(parent_inode) + self._load_pending_item(parent_inode, name) if name == b'.': inode = parent_inode elif name == b'..': @@ -190,7 +268,7 @@ def open(self, inode, flags): return inode def opendir(self, inode): - self._load_pending_archive(inode) + self._load_pending_item(inode) return inode def read(self, fh, offset, size): diff --git a/attic/testsuite/archive.py b/attic/testsuite/archive.py index 8d478f5f..3a9bd3f9 100644 --- a/attic/testsuite/archive.py +++ b/attic/testsuite/archive.py @@ -17,7 +17,7 @@ def add_chunk(self, id, data, stats=None): class ChunkBufferTestCase(AtticTestCase): def test(self): - data = [{b'foo': 1}, {b'bar': 2}] + data = [{b'path': 1}, {b'path': 2}] cache = MockCache() key = PlaintextKey() chunks = CacheChunkBuffer(cache, key, None) diff --git a/setup.py b/setup.py index 2c1432b1..aed1be5e 100644 --- a/setup.py +++ b/setup.py @@ -122,5 +122,5 @@ def detect_openssl(prefixes): scripts=['scripts/attic'], cmdclass=cmdclass, ext_modules=ext_modules, - install_requires=['msgpack-python'] + install_requires=['msgpack-python', 'bintrees'] )