Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata search index for more responsive FUSE #182

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 61 additions & 6 deletions attic/archive.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, timedelta, timezone
from getpass import getuser
from itertools import groupby
import bintrees
import errno
import shutil
import tempfile
Expand All @@ -12,6 +13,7 @@
import stat
import sys
import time
from functools import total_ordering
from io import BytesIO
from attic import xattr
from attic.platform import acl_get, acl_set
Expand Down Expand Up @@ -63,6 +65,42 @@ def fetch_many(self, ids, is_preloaded=False):
yield self.key.decrypt(id_, data)


@total_ordering
class BFSPath():
""" Container for segments of file paths to make them comparable in breadth-first order.
"""

def __init__(self, path=[]):
self.path = path

def __lt__(self, other):
if len(self.path) != len(other.path):
return len(self.path) < len(other.path)
return list.__lt__(self.path, other.path)

def __eq__(self, other):
return self.path == other.path


class MetadataIndex:

def __init__(self):
self.tree = bintrees.RBTree()

def add(self, path, id, offset, length):
self.tree[BFSPath(path)] = (id, offset, length)

def lookup(self, path):
try:
return self.tree.floor_item(BFSPath(path))
except KeyError:
return self.tree.min_item()

def lookup_many(self, path):
startkey = self.lookup(path)[0]
return self.tree.value_slice(startkey, None)


class ChunkBuffer:
BUFFER_SIZE = 1 * 1024 * 1024

Expand All @@ -72,8 +110,15 @@ def __init__(self, key):
self.chunks = []
self.key = key
self.chunker = Chunker(WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed)
# Keep track of first item in each series of ChunkBuffer fill-ups
self.firstitem = None
self.partialchunk = 0
self.chunks_index = []

def add(self, item):
if self.firstitem is None:
self.firstitem = item
self.partialchunk = self.buffer.tell()
self.buffer.write(self.packer.pack(StableDict(item)))
if self.is_full():
self.flush()
Expand All @@ -86,10 +131,14 @@ def flush(self, flush=False):
return
self.buffer.seek(0)
chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
length = self.buffer.tell()
self.buffer.seek(0)
self.buffer.truncate(0)
# Leave the last partial chunk in the buffer unless flush is True
end = None if flush or len(chunks) == 1 else -1
# Put index and offset of next item into chunks index
self.chunks_index.append((self.firstitem[b'path'], (len(self.chunks), self.partialchunk, length)))
self.firstitem = None
for chunk in chunks[:end]:
self.chunks.append(self.write_chunk(chunk))
if end == -1:
Expand Down Expand Up @@ -154,11 +203,16 @@ def load(self, id):
self.id = id
data = self.key.decrypt(self.id, self.repository.get(self.id))
self.metadata = msgpack.unpackb(data)
if self.metadata[b'version'] != 1:
raise Exception('Unknown archive metadata version')
if self.metadata[b'version'] not in [1, 2]:
raise Exception('Unknown archive metadata version: {}'.format(self.metadata[b'version']))
decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
self.name = self.metadata[b'name']
self.metadata_index = None
if self.metadata[b'version'] >= 2:
self.metadata_index = MetadataIndex()
for (path, (idx, offset, length)) in self.metadata[b'metadata_index']:
self.metadata_index.add(path.split(b'/'), idx, offset, length)

@property
def ts(self):
Expand Down Expand Up @@ -190,9 +244,10 @@ def save(self, name=None):
raise self.AlreadyExists(name)
self.items_buffer.flush(flush=True)
metadata = StableDict({
'version': 1,
'version': 2,
'name': name,
'items': self.items_buffer.chunks,
'metadata_index': self.items_buffer.chunks_index,
'cmdline': sys.argv,
'hostname': socket.gethostname(),
'username': getuser(),
Expand Down Expand Up @@ -583,7 +638,7 @@ def rebuild_manifest(self):
# Some basic sanity checks of the payload before feeding it into msgpack
if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
continue
if not b'cmdline' in data or not b'\xa7version\x01' in data:
if not b'cmdline' in data or not any(v in data for v in [b'\xa7version\x01', b'\xa7version\x02']):
continue
try:
archive = msgpack.unpackb(data)
Expand Down Expand Up @@ -683,8 +738,8 @@ def missing_chunk_detector(chunk_id):
cdata = self.repository.get(archive_id)
data = self.key.decrypt(archive_id, cdata)
archive = StableDict(msgpack.unpackb(data))
if archive[b'version'] != 1:
raise Exception('Unknown archive metadata version')
if archive[b'version'] not in [1, 2]:
raise Exception('Unknown archive metadata version: {}'.format(archive[b'version']))
decode_dict(archive, (b'name', b'hostname', b'username', b'time')) # fixme: argv
items_buffer = ChunkBuffer(self.key)
items_buffer.write_chunk = add_callback
Expand Down
100 changes: 58 additions & 42 deletions attic/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from binascii import hexlify
from datetime import datetime
from operator import attrgetter
from collections import deque
import functools
import io
import os
Expand Down Expand Up @@ -142,47 +143,49 @@ def do_create(self, args):
return self.exit_code

def _process(self, archive, cache, excludes, exclude_caches, skip_inodes, path, restrict_dev):
if exclude_path(path, excludes):
return
try:
st = os.lstat(path)
except OSError as e:
self.print_error('%s: %s', path, e)
return
if (st.st_ino, st.st_dev) in skip_inodes:
return
# Entering a new filesystem?
if restrict_dev and st.st_dev != restrict_dev:
return
# Ignore unix sockets
if stat.S_ISSOCK(st.st_mode):
return
self.print_verbose(remove_surrogates(path))
if stat.S_ISREG(st.st_mode):
try:
archive.process_file(path, st, cache)
except IOError as e:
self.print_error('%s: %s', path, e)
elif stat.S_ISDIR(st.st_mode):
if exclude_caches and is_cachedir(path):
return
archive.process_item(path, st)
queue = deque([path])
while queue:
path = queue.popleft()
if exclude_path(path, excludes):
continue
try:
entries = os.listdir(path)
st = os.lstat(path)
except OSError as e:
self.print_error('%s: %s', path, e)
continue
if (st.st_ino, st.st_dev) in skip_inodes:
continue
# Entering a new filesystem?
if restrict_dev and st.st_dev != restrict_dev:
continue
# Ignore unix sockets
if stat.S_ISSOCK(st.st_mode):
continue
self.print_verbose(remove_surrogates(path))
if stat.S_ISREG(st.st_mode):
try:
archive.process_file(path, st, cache)
except IOError as e:
self.print_error('%s: %s', path, e)
elif stat.S_ISDIR(st.st_mode):
if exclude_caches and is_cachedir(path):
continue
archive.process_item(path, st)
try:
entries = os.listdir(path)
except OSError as e:
self.print_error('%s: %s', path, e)
else:
for filename in sorted(entries):
queue.append(os.path.join(path, filename))
elif stat.S_ISLNK(st.st_mode):
archive.process_symlink(path, st)
elif stat.S_ISFIFO(st.st_mode):
archive.process_item(path, st)
elif stat.S_ISCHR(st.st_mode) or stat.S_ISBLK(st.st_mode):
archive.process_dev(path, st)
else:
for filename in sorted(entries):
self._process(archive, cache, excludes, exclude_caches, skip_inodes,
os.path.join(path, filename), restrict_dev)
elif stat.S_ISLNK(st.st_mode):
archive.process_symlink(path, st)
elif stat.S_ISFIFO(st.st_mode):
archive.process_item(path, st)
elif stat.S_ISCHR(st.st_mode) or stat.S_ISBLK(st.st_mode):
archive.process_dev(path, st)
else:
self.print_error('Unknown file type: %s', path)
self.print_error('Unknown file type: %s', path)

def do_extract(self, args):
"""Extract archive contents"""
Expand All @@ -195,16 +198,29 @@ def do_extract(self, args):
patterns = adjust_patterns(args.paths, args.excludes)
dry_run = args.dry_run
strip_components = args.strip_components
dirs = []
dirs = deque()
for item in archive.iter_items(lambda item: not exclude_path(item[b'path'], patterns), preload=True):
orig_path = item[b'path']
if strip_components:
item[b'path'] = os.sep.join(orig_path.split(os.sep)[strip_components:])
if not item[b'path']:
continue
if not args.dry_run:
while dirs and not item[b'path'].startswith(dirs[-1][b'path']):
archive.extract_item(dirs.pop(-1))
if archive.metadata[b'version'] >= 2:
# Items are in breadth-first order
current = item[b'path'].split(os.sep)
while dirs:
nextpending = dirs[0][b'path'].split(os.sep)
# Directory attributes can be applied if we're one
# level further and "right" of the pending subtree
if len(current) > len(nextpending) and current[:-1] > nextpending:
archive.extract_item(dirs.popleft())
else:
break
else:
# Items are in depth-first order
while dirs and not item[b'path'].startswith(dirs[-1][b'path']):
archive.extract_item(dirs.pop())
self.print_verbose(remove_surrogates(orig_path))
try:
if dry_run:
Expand All @@ -220,7 +236,7 @@ def do_extract(self, args):

if not args.dry_run:
while dirs:
archive.extract_item(dirs.pop(-1))
archive.extract_item(dirs.popleft())
return self.exit_code

def do_delete(self, args):
Expand Down Expand Up @@ -256,7 +272,7 @@ def do_mount(self, args):
archive = Archive(repository, key, manifest, args.src.archive)
else:
archive = None
operations = AtticOperations(key, repository, manifest, archive)
operations = AtticOperations(key, repository, manifest, archive, args.verbose and args.foreground)
self.print_verbose("Mounting filesystem")
try:
operations.mount(args.mountpoint, args.options, args.foreground)
Expand Down
2 changes: 1 addition & 1 deletion attic/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def add(id, size, csize):
data = self.key.decrypt(archive_id, cdata)
add(archive_id, len(data), len(cdata))
archive = msgpack.unpackb(data)
if archive[b'version'] != 1:
if archive[b'version'] not in [1, 2]:
raise Exception('Unknown archive metadata version')
decode_dict(archive, (b'name',))
print('Analyzing archive:', archive[b'name'])
Expand Down
Loading