Skip to content

Commit

Permalink
Add metadata search index for more responsive FUSE
Browse files Browse the repository at this point in the history
This adds the construction of a metadata index during archive creation,
which can be used to narrow down the location of particular entries
within the items list. The FUSE mount uses this index to fetch only
those chunks that are relevant to the specific operation instead of
fetching the metadata of the entire archive.

As a result, using FUSE mounts of large archives is consirably more
reponsive. And more importantly, the performance decrease linearly with
the archive size anymore. Bulk operations that require the full metadata
tree anyways (such as running "find" on the entire archive) are not
negatively impacted.

For this to work, the filesystem traversal order had to be changed from
depth-first to breadth-first, which introduces the new metadata version
number 2. Any otherwise unrelated parts of the code and tests that
relied on the previous behavior are adjusted accordingly.
  • Loading branch information
dnnr committed Jan 22, 2015
1 parent 4ab4ecc commit ad3d3f4
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 77 deletions.
67 changes: 61 additions & 6 deletions attic/archive.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, timedelta, timezone
from getpass import getuser
from itertools import groupby
import bintrees
import errno
import shutil
import tempfile
Expand All @@ -12,6 +13,7 @@
import stat
import sys
import time
from functools import total_ordering
from io import BytesIO
from attic import xattr
from attic.platform import acl_get, acl_set
Expand Down Expand Up @@ -63,6 +65,42 @@ def fetch_many(self, ids, is_preloaded=False):
yield self.key.decrypt(id_, data)


@total_ordering
class BFSPath():
""" Container for segments of file paths to make them comparable in breadth-first order.
"""

def __init__(self, path=[]):
self.path = path

def __lt__(self, other):
if len(self.path) != len(other.path):
return len(self.path) < len(other.path)
return list.__lt__(self.path, other.path)

def __eq__(self, other):
return self.path == other.path


class MetadataIndex:

def __init__(self):
self.tree = bintrees.RBTree()

def add(self, path, id, offset, length):
self.tree[BFSPath(path)] = (id, offset, length)

def lookup(self, path):
try:
return self.tree.floor_item(BFSPath(path))
except KeyError:
return self.tree.min_item()

def lookup_many(self, path):
startkey = self.lookup(path)[0]
return self.tree.value_slice(startkey, None)


class ChunkBuffer:
BUFFER_SIZE = 1 * 1024 * 1024

Expand All @@ -72,8 +110,15 @@ def __init__(self, key):
self.chunks = []
self.key = key
self.chunker = Chunker(WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed)
# Keep track of first item in each series of ChunkBuffer fill-ups
self.firstitem = None
self.partialchunk = 0
self.chunks_index = []

def add(self, item):
if self.firstitem is None:
self.firstitem = item
self.partialchunk = self.buffer.tell()
self.buffer.write(self.packer.pack(StableDict(item)))
if self.is_full():
self.flush()
Expand All @@ -86,10 +131,14 @@ def flush(self, flush=False):
return
self.buffer.seek(0)
chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
length = self.buffer.tell()
self.buffer.seek(0)
self.buffer.truncate(0)
# Leave the last partial chunk in the buffer unless flush is True
end = None if flush or len(chunks) == 1 else -1
# Put index and offset of next item into chunks index
self.chunks_index.append((self.firstitem[b'path'], (len(self.chunks), self.partialchunk, length)))
self.firstitem = None
for chunk in chunks[:end]:
self.chunks.append(self.write_chunk(chunk))
if end == -1:
Expand Down Expand Up @@ -154,11 +203,16 @@ def load(self, id):
self.id = id
data = self.key.decrypt(self.id, self.repository.get(self.id))
self.metadata = msgpack.unpackb(data)
if self.metadata[b'version'] != 1:
raise Exception('Unknown archive metadata version')
if self.metadata[b'version'] not in [1, 2]:
raise Exception('Unknown archive metadata version: {}'.format(self.metadata[b'version']))
decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
self.name = self.metadata[b'name']
self.metadata_index = None
if self.metadata[b'version'] >= 2:
self.metadata_index = MetadataIndex()
for (path, (idx, offset, length)) in self.metadata[b'metadata_index']:
self.metadata_index.add(path.split(b'/'), idx, offset, length)

@property
def ts(self):
Expand Down Expand Up @@ -190,9 +244,10 @@ def save(self, name=None):
raise self.AlreadyExists(name)
self.items_buffer.flush(flush=True)
metadata = StableDict({
'version': 1,
'version': 2,
'name': name,
'items': self.items_buffer.chunks,
'metadata_index': self.items_buffer.chunks_index,
'cmdline': sys.argv,
'hostname': socket.gethostname(),
'username': getuser(),
Expand Down Expand Up @@ -583,7 +638,7 @@ def rebuild_manifest(self):
# Some basic sanity checks of the payload before feeding it into msgpack
if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
continue
if not b'cmdline' in data or not b'\xa7version\x01' in data:
if not b'cmdline' in data or not any(v in data for v in [b'\xa7version\x01', b'\xa7version\x02']):
continue
try:
archive = msgpack.unpackb(data)
Expand Down Expand Up @@ -683,8 +738,8 @@ def missing_chunk_detector(chunk_id):
cdata = self.repository.get(archive_id)
data = self.key.decrypt(archive_id, cdata)
archive = StableDict(msgpack.unpackb(data))
if archive[b'version'] != 1:
raise Exception('Unknown archive metadata version')
if archive[b'version'] not in [1, 2]:
raise Exception('Unknown archive metadata version: {}'.format(archive[b'version']))
decode_dict(archive, (b'name', b'hostname', b'username', b'time')) # fixme: argv
items_buffer = ChunkBuffer(self.key)
items_buffer.write_chunk = add_callback
Expand Down
100 changes: 58 additions & 42 deletions attic/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from binascii import hexlify
from datetime import datetime
from operator import attrgetter
from collections import deque
import functools
import io
import os
Expand Down Expand Up @@ -142,47 +143,49 @@ def do_create(self, args):
return self.exit_code

def _process(self, archive, cache, excludes, exclude_caches, skip_inodes, path, restrict_dev):
if exclude_path(path, excludes):
return
try:
st = os.lstat(path)
except OSError as e:
self.print_error('%s: %s', path, e)
return
if (st.st_ino, st.st_dev) in skip_inodes:
return
# Entering a new filesystem?
if restrict_dev and st.st_dev != restrict_dev:
return
# Ignore unix sockets
if stat.S_ISSOCK(st.st_mode):
return
self.print_verbose(remove_surrogates(path))
if stat.S_ISREG(st.st_mode):
try:
archive.process_file(path, st, cache)
except IOError as e:
self.print_error('%s: %s', path, e)
elif stat.S_ISDIR(st.st_mode):
if exclude_caches and is_cachedir(path):
return
archive.process_item(path, st)
queue = deque([path])
while queue:
path = queue.popleft()
if exclude_path(path, excludes):
continue
try:
entries = os.listdir(path)
st = os.lstat(path)
except OSError as e:
self.print_error('%s: %s', path, e)
continue
if (st.st_ino, st.st_dev) in skip_inodes:
continue
# Entering a new filesystem?
if restrict_dev and st.st_dev != restrict_dev:
continue
# Ignore unix sockets
if stat.S_ISSOCK(st.st_mode):
continue
self.print_verbose(remove_surrogates(path))
if stat.S_ISREG(st.st_mode):
try:
archive.process_file(path, st, cache)
except IOError as e:
self.print_error('%s: %s', path, e)
elif stat.S_ISDIR(st.st_mode):
if exclude_caches and is_cachedir(path):
continue
archive.process_item(path, st)
try:
entries = os.listdir(path)
except OSError as e:
self.print_error('%s: %s', path, e)
else:
for filename in sorted(entries):
queue.append(os.path.join(path, filename))
elif stat.S_ISLNK(st.st_mode):
archive.process_symlink(path, st)
elif stat.S_ISFIFO(st.st_mode):
archive.process_item(path, st)
elif stat.S_ISCHR(st.st_mode) or stat.S_ISBLK(st.st_mode):
archive.process_dev(path, st)
else:
for filename in sorted(entries):
self._process(archive, cache, excludes, exclude_caches, skip_inodes,
os.path.join(path, filename), restrict_dev)
elif stat.S_ISLNK(st.st_mode):
archive.process_symlink(path, st)
elif stat.S_ISFIFO(st.st_mode):
archive.process_item(path, st)
elif stat.S_ISCHR(st.st_mode) or stat.S_ISBLK(st.st_mode):
archive.process_dev(path, st)
else:
self.print_error('Unknown file type: %s', path)
self.print_error('Unknown file type: %s', path)

def do_extract(self, args):
"""Extract archive contents"""
Expand All @@ -195,16 +198,29 @@ def do_extract(self, args):
patterns = adjust_patterns(args.paths, args.excludes)
dry_run = args.dry_run
strip_components = args.strip_components
dirs = []
dirs = deque()
for item in archive.iter_items(lambda item: not exclude_path(item[b'path'], patterns), preload=True):
orig_path = item[b'path']
if strip_components:
item[b'path'] = os.sep.join(orig_path.split(os.sep)[strip_components:])
if not item[b'path']:
continue
if not args.dry_run:
while dirs and not item[b'path'].startswith(dirs[-1][b'path']):
archive.extract_item(dirs.pop(-1))
if archive.metadata[b'version'] >= 2:
# Items are in breadth-first order
current = item[b'path'].split(os.sep)
while dirs:
nextpending = dirs[0][b'path'].split(os.sep)
# Directory attributes can be applied if we're one
# level further and "right" of the pending subtree
if len(current) > len(nextpending) and current[:-1] > nextpending:
archive.extract_item(dirs.popleft())
else:
break
else:
# Items are in depth-first order
while dirs and not item[b'path'].startswith(dirs[-1][b'path']):
archive.extract_item(dirs.pop())
self.print_verbose(remove_surrogates(orig_path))
try:
if dry_run:
Expand All @@ -220,7 +236,7 @@ def do_extract(self, args):

if not args.dry_run:
while dirs:
archive.extract_item(dirs.pop(-1))
archive.extract_item(dirs.popleft())
return self.exit_code

def do_delete(self, args):
Expand Down Expand Up @@ -256,7 +272,7 @@ def do_mount(self, args):
archive = Archive(repository, key, manifest, args.src.archive)
else:
archive = None
operations = AtticOperations(key, repository, manifest, archive)
operations = AtticOperations(key, repository, manifest, archive, args.verbose and args.foreground)
self.print_verbose("Mounting filesystem")
try:
operations.mount(args.mountpoint, args.options, args.foreground)
Expand Down
2 changes: 1 addition & 1 deletion attic/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def add(id, size, csize):
data = self.key.decrypt(archive_id, cdata)
add(archive_id, len(data), len(cdata))
archive = msgpack.unpackb(data)
if archive[b'version'] != 1:
if archive[b'version'] not in [1, 2]:
raise Exception('Unknown archive metadata version')
decode_dict(archive, (b'name',))
print('Analyzing archive:', archive[b'name'])
Expand Down
Loading

0 comments on commit ad3d3f4

Please sign in to comment.