Skip to content

Commit

Permalink
🔀👌 Efficiency improvements (#96)
Browse files Browse the repository at this point in the history
This merge collects a number of important efficiency improvements, and a few features that were tightly bound to these efficiency changes, so they are shipped together.

In particular:

- objects are now sorted and returned in the order in which they are on disk, with an important performance benefit. Fixes #92 
- When there are many objects to list (currently set to 9500 objects, 10x the ones we could fit in a single IN SQL statement), performing many queries is slow, so we just resort to listing all objects and doing an efficient intersection (if the hash keys are sorted, both iterators can be looped over only once - fixes #93)
- Since VACUUMing the DB is very important for efficiency, when the DB does not fit fully in the disk cache, `clean_storage` now provides an option to VACUUM the DB. VACUUM is also called after repacking. Fixes #94 
- We implement now a function to perform a full repack of the repository (fixes #12). This is important and needed to reclaim space after deleting an object
- For efficiency, we have moved the logic from an `export` function (still existing but deprecated) to a `import_objects` function
- Still for efficiency, now functions like `pack_all_loose` and `import_objects` provide an option to perform a fsync to disk or not (see also #54 - there are still however calls that always use - or don't use - fsync and full_fsync on Mac). Also, `add_objects_to_pack` allows now to choose if you want to commit the changes to the SQLite DB, or not (delegating the responsibility to the caller, but this is important e.g. in `import_objects`: calling `commit` only once at the very end gives a factor of 2 speedup for very big repos).
- A number of functions, including (but not exclusively) `import_objects` provide a callback to e.g. show a progress bar.
- a `CallbackStreamWrapper` has been implemented, allowing to provide a callback (e.g. for a progress bar) when streaming a big file.
- a new hash algorithm is now supported (`sha1`) in addition to the default `sha256` (fixes #82). This is faster even if a bit less robust. This was also needed to test completely some feature in `import_objects`, where the logic is optimised if both containers use the same algorithm. By default is still better to use everywhere sha256, also because then all export files that will be generated will use this algorithm and importing will be more efficient.
- tests have been added for all new functionality, achieving again 100% coverage

As a reference, with these changes, exporting the full large SDB database (6.8M nodes) takes ~ 50 minutes:
```
6714808it [00:24, 274813.02it/s]
All hashkeys listed in 24.444787740707397s.
Listing objects: 100%|████████| 6714808/6714808 [00:06<00:00, 978896.65it/s]
Copy objects: 100%|███████████| 6714808/6714808 [48:15<00:00, 2319.08it/s]
Final flush: 100%|████████████| 63236/63236 [00:07<00:00, 8582.35it/s]
Everything re-exported in 2960.980943918228s.
```

This can be compared to:

- ~10 minutes to copy the whole 90GB, or ~15 minutes to read all and validate the packs. We will never be able to be faster than just copying the pack files, and we are only 3x slower.
- ~2 days to just list all files in the old legacy AiiDA repo (or all objects if they are loose), and this does not take into account the time to rewrite everything, probably comparable. So we are almost 2 orders of magnitude faster than before.
  • Loading branch information
chrisjsewell authored Oct 2, 2020
2 parents e7e3627 + 34d6671 commit 64326c7
Show file tree
Hide file tree
Showing 6 changed files with 1,581 additions and 135 deletions.
4 changes: 2 additions & 2 deletions disk_objectstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
It does not require a server running.
"""
from .container import Container, ObjectType
from .container import Container, ObjectType, CompressMode

__all__ = ('Container', 'ObjectType')
__all__ = ('Container', 'ObjectType', 'CompressMode')

__version__ = '0.4.0'
690 changes: 610 additions & 80 deletions disk_objectstore/container.py

Large diffs are not rendered by default.

279 changes: 276 additions & 3 deletions disk_objectstore/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
Some might be useful also for end users, like the wrappers to get streams,
like the ``LazyOpener``.
"""
# pylint: disable= too-many-lines
import hashlib
import itertools
import os
import uuid
import zlib

from enum import Enum

try:
import fcntl
except ImportError:
Expand All @@ -25,6 +28,13 @@
_MACOS_ALWAYS_USE_FULLSYNC = False


class Location(Enum):
"""Enum that describes if an element is only on the left or right iterator, or on both."""
LEFTONLY = -1
BOTH = 0
RIGHTONLY = 1


class LazyOpener:
"""A class to return a stream to a given file, that however is opened lazily.
Expand Down Expand Up @@ -413,6 +423,129 @@ def read(self, size=-1):
return stream


class CallbackStreamWrapper:
"""A class to just wrap a read stream, but perform a callback every few bytes.
Should be used only for streams open in read mode.
"""

@property
def mode(self):
return self._stream.mode

@property
def seekable(self):
"""Return whether object supports random access."""
return self._stream.seekable

def seek(self, target, whence=0):
"""Change stream position."""
if target > self.tell():
if self._callback:
self._since_last_update += target - self.tell()
if self._since_last_update >= self._update_every:
self._callback(action='update', value=self._since_last_update)
self._since_last_update = 0
else:
self.close_callback()
if self._callback:
# If we have a callback, compute the total count of objects in this pack
self._callback(
action='init',
value={
'total': self._total_length,
'description': '{} [rewind]'.format(self._description)
}
)
# Counter of how many objects have been since since the last update.
# A new callback will be performed when this value is > update_every.
self._since_last_update = target
self._callback(action='update', value=self._since_last_update)

return self._stream.seek(target, whence)

def tell(self):
"""Return current stream position."""
return self._stream.tell()

def __init__(self, stream, callback, total_length=0, description='Streamed object'):
"""
Initialises the reader to a given stream.
:param stream: an open stream
:param callback: a callback to call to update the status (or None if not needed)
:param total_length: the expected length
"""
self._stream = stream
self._callback = callback
self._total_length = total_length
self._description = description

if self._callback:
# If we have a callback, compute the total count of objects in this pack
self._callback(action='init', value={'total': total_length, 'description': description})
# Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object.
self._update_every = max(int(total_length / 400), 1) if total_length else 1
# Counter of how many objects have been since since the last update.
# A new callback will be performed when this value is > update_every.
self._since_last_update = 0

def read(self, size=-1):
"""
Read and return up to n bytes.
If the argument is omitted, None, or negative, reads and
returns all data until EOF (that corresponds to the length specified
in the __init__ method).
Returns an empty bytes object on EOF.
"""
data = self._stream.read(size)

if self._callback:
self._since_last_update += len(data)
if self._since_last_update >= self._update_every:
self._callback(action='update', value=self._since_last_update)
self._since_last_update = 0

return data

def close_callback(self):
"""
Call the wrap up closing calls for the callback.
.. note:: it DOES NOT close the stream.
"""
if self._callback:
# Final call to complete the bar
if self._since_last_update:
self._callback(action='update', value=self._since_last_update)
# Perform any wrap-up, if needed
self._callback(action='close', value=None)


def rename_callback(callback, new_description):
"""Given a callback, return a new one where the description will be changed to `new_name`.
Works even if `callback` is None (in this case, it returns None).
:param callback: a callback function.
:param new_description: a string with a modified description for the callback.
This will be replaced during the `init` call to the callback.
"""
if callback is None:
return None

def wrapper_callback(action, value):
"""A wrapper callback with changed description."""
if action == 'init':
new_value = value.copy()
new_value['description'] = new_description
return callback(action, new_value)
return callback(action, value)

return wrapper_callback


class StreamDecompresser:
"""A class that gets a stream of compressed zlib bytes, and returns the corresponding
uncompressed bytes when being read via the .read() method.
Expand Down Expand Up @@ -590,9 +723,7 @@ def is_known_hash(hash_type):

def get_hash(hash_type):
"""Return a hash class with an update method and a hexdigest method."""
known_hashes = {
'sha256': hashlib.sha256,
}
known_hashes = {'sha1': hashlib.sha1, 'sha256': hashlib.sha256}

try:
return known_hashes[hash_type]
Expand Down Expand Up @@ -786,3 +917,145 @@ def compute_hash_and_size(stream, hash_type):
size += len(next_chunk)

return hasher.hexdigest(), size


def detect_where_sorted(left_iterator, right_iterator, left_key=None): # pylint: disable=too-many-branches, too-many-statements
"""Generator that loops in alternation (but only once each) the two iterators and yields an element, specifying if
it's only on the left, only on the right, or in both.
.. note:: IMPORTANT! The two iterators MUST return unique and sorted results.
.. note:: if it's on both, the one on the left is returned.
This function will check and raise a ValueError if it detects non-unique or non-sorted elements.
HOWEVER, this exception is raised only at the first occurrence of the issue, that can be very late in the execution,
so if you process results in a streamed way, please ensure that you pass sorted iterators.
:param left_iterator: a left iterator
:param right_iterator: a right iterator
:param left_key: if specified, it's a lambda that determines how to process each element
of the left iterator when comparing with the right iterator. For instance, the left
iterator might be a tuple, whose first element is a hash key, while the right iterator
just a list of hash keys. In this case, left_key could be defined as a lambda returning
the first element of the tuple.
Note that when the element is in both iterators, the left one is returned (i.e. the
full tuple, in this example).
"""
left_exhausted = False
right_exhausted = False

if left_key is None:
left_key = lambda x: x

# Convert first in iterators (in case they are, e.g., lists)
left_iterator = iter(left_iterator)
right_iterator = iter(right_iterator)

try:
last_left = next(left_iterator)
except StopIteration:
left_exhausted = True

try:
last_right = next(right_iterator)
except StopIteration:
right_exhausted = True

if left_exhausted and right_exhausted:
# Nothing to be done, both iterators are empty
return

now_left = True
if left_exhausted or (not right_exhausted and left_key(last_left) > last_right):
now_left = False # I want the 'current' (now) to be behind or at the same position of the other at any time

while not (left_exhausted and right_exhausted):
advance_both = False
if now_left:
if right_exhausted:
yield last_left, Location.LEFTONLY
else:
if left_key(last_left) == last_right:
# They are equal: add to intersection and continue
yield last_left, Location.BOTH
# I need to consume and advance on both iterators at the next iteration
advance_both = True
elif left_key(last_left) < last_right:
# the new entry (last_left) is still smaller: it's on the left only
yield last_left, Location.LEFTONLY
else:
# the new entry (last_left) is now larger: then, last_right is only on the right
# and I switch to now_right
yield last_right, Location.RIGHTONLY
now_left = False
else:
if left_exhausted:
yield last_right, Location.RIGHTONLY
else:
if left_key(last_left) == last_right:
# They are equal: add to intersection and continue
yield last_left, Location.BOTH
# I need to consume and advance on both iterators at the next iteration
advance_both = True
elif left_key(last_left) > last_right:
# the new entry (last_right) is still smaller: it's on the right only
yield last_right, Location.RIGHTONLY
else:
# the new entry (last_right) is now larger: then, last_left is only on the left
# and I switch to now_left
yield last_left, Location.LEFTONLY
now_left = True

# When we are here: if now_left, then last_left has been inserted in one of the lists;
# if not now_left, then last_right has been insterted in one of the lists.
# If advance both, they both can be discarded. So if I exhausted an iterator, I am not losing
# any entry.

# I will need to cache the old value, see comments below in the `except StopIteration` block
new_now_left = now_left
if now_left or advance_both:
try:
new = next(left_iterator)
if left_key(new) <= left_key(last_left):
raise ValueError(
"The left iterator does not return sorted unique entries, I got '{}' after '{}'".format(
left_key(new), left_key(last_left)
)
)
last_left = new
except StopIteration:
left_exhausted = True
# I need to store in a different variable, otherwise in this case
# I would also enter the next iteration even if advance_both is False!
new_now_left = False

if not now_left or advance_both:
try:
new = next(right_iterator)
if new <= last_right:
raise ValueError(
"The right iterator does not return sorted unique entries, I got '{}' after '{}'".format(
new, last_right
)
)
last_right = new
except StopIteration:
right_exhausted = True
# For consistency, also here I set new_now_left
new_now_left = True

# Set the new now_left value
now_left = new_now_left


def yield_first_element(iterator):
"""Given an iterator that returns a tuple, return an iterator that yields only the first element of the tuple."""
for elem in iterator:
yield elem[0]


def merge_sorted(iterator1, iterator2):
"""Given two sorted iterators, return another sorted iterator being the union of the two."""
for item, _ in detect_where_sorted(iterator1, iterator2):
# Whereever it is (only left, only right, on both) I return the object.
yield item
33 changes: 33 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,39 @@ def pytest_generate_tests(metafunc):
metafunc.parametrize('concurrency_repetition_index', range(metafunc.config.option.concurrency_repetitions))


@pytest.fixture(scope='function')
def callback_instance():
"""Return the CallbackClass for the tests."""

class CallbackClass:
"""Class that manages the callback and checks that it is correctly called."""

def __init__(self):
"""Initialise the class."""
self.current_action = None
self.performed_actions = []

def callback(self, action, value):
"""Check how the callback is called."""

if action == 'init':
assert self.current_action is None, "Starting a new action '{}' without closing the old one {}".format(
action, self.current_action
)
self.current_action = {'start_value': value, 'value': 0}
elif action == 'update':
# Track the current position
self.current_action['value'] += value
elif action == 'close':
# Add to list of performed actions
self.performed_actions.append(self.current_action)
self.current_action = None
else:
raise AssertionError("Unknown action '{}'".format(action))

yield CallbackClass()


@pytest.fixture(scope='function')
def temp_container(temp_dir): # pylint: disable=redefined-outer-name
"""Return an object-store container in a given temporary directory.
Expand Down
Loading

0 comments on commit 64326c7

Please sign in to comment.