Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collection of a number of important efficiency improvements #96

Merged
merged 12 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions disk_objectstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

It does not require a server running.
"""
from .container import Container, ObjectType
from .container import Container, ObjectType, CompressMode

__all__ = ('Container', 'ObjectType')
__all__ = ('Container', 'ObjectType', 'CompressMode')

__version__ = '0.4.0'
690 changes: 610 additions & 80 deletions disk_objectstore/container.py

Large diffs are not rendered by default.

279 changes: 276 additions & 3 deletions disk_objectstore/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
Some might be useful also for end users, like the wrappers to get streams,
like the ``LazyOpener``.
"""
# pylint: disable= too-many-lines
import hashlib
import itertools
import os
import uuid
import zlib

from enum import Enum

try:
import fcntl
except ImportError:
Expand All @@ -25,6 +28,13 @@
_MACOS_ALWAYS_USE_FULLSYNC = False


class Location(Enum):
"""Enum that describes if an element is only on the left or right iterator, or on both."""
LEFTONLY = -1
BOTH = 0
RIGHTONLY = 1


class LazyOpener:
"""A class to return a stream to a given file, that however is opened lazily.

Expand Down Expand Up @@ -413,6 +423,129 @@ def read(self, size=-1):
return stream


class CallbackStreamWrapper:
"""A class to just wrap a read stream, but perform a callback every few bytes.

Should be used only for streams open in read mode.
"""

@property
def mode(self):
return self._stream.mode

@property
def seekable(self):
"""Return whether object supports random access."""
return self._stream.seekable

def seek(self, target, whence=0):
"""Change stream position."""
if target > self.tell():
if self._callback:
self._since_last_update += target - self.tell()
if self._since_last_update >= self._update_every:
self._callback(action='update', value=self._since_last_update)
self._since_last_update = 0
else:
self.close_callback()
if self._callback:
# If we have a callback, compute the total count of objects in this pack
self._callback(
action='init',
value={
'total': self._total_length,
'description': '{} [rewind]'.format(self._description)
}
)
# Counter of how many objects have been since since the last update.
# A new callback will be performed when this value is > update_every.
self._since_last_update = target
self._callback(action='update', value=self._since_last_update)

return self._stream.seek(target, whence)

def tell(self):
"""Return current stream position."""
return self._stream.tell()

def __init__(self, stream, callback, total_length=0, description='Streamed object'):
"""
Initialises the reader to a given stream.

:param stream: an open stream
:param callback: a callback to call to update the status (or None if not needed)
:param total_length: the expected length
"""
self._stream = stream
self._callback = callback
self._total_length = total_length
self._description = description

if self._callback:
# If we have a callback, compute the total count of objects in this pack
self._callback(action='init', value={'total': total_length, 'description': description})
# Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object.
self._update_every = max(int(total_length / 400), 1) if total_length else 1
# Counter of how many objects have been since since the last update.
# A new callback will be performed when this value is > update_every.
self._since_last_update = 0

def read(self, size=-1):
"""
Read and return up to n bytes.

If the argument is omitted, None, or negative, reads and
returns all data until EOF (that corresponds to the length specified
in the __init__ method).

Returns an empty bytes object on EOF.
"""
data = self._stream.read(size)

if self._callback:
self._since_last_update += len(data)
if self._since_last_update >= self._update_every:
self._callback(action='update', value=self._since_last_update)
self._since_last_update = 0

return data

def close_callback(self):
"""
Call the wrap up closing calls for the callback.

.. note:: it DOES NOT close the stream.
"""
if self._callback:
# Final call to complete the bar
if self._since_last_update:
self._callback(action='update', value=self._since_last_update)
# Perform any wrap-up, if needed
self._callback(action='close', value=None)


def rename_callback(callback, new_description):
"""Given a callback, return a new one where the description will be changed to `new_name`.

Works even if `callback` is None (in this case, it returns None).
:param callback: a callback function.
:param new_description: a string with a modified description for the callback.
This will be replaced during the `init` call to the callback.
"""
if callback is None:
return None

def wrapper_callback(action, value):
"""A wrapper callback with changed description."""
if action == 'init':
new_value = value.copy()
new_value['description'] = new_description
return callback(action, new_value)
return callback(action, value)

return wrapper_callback


class StreamDecompresser:
"""A class that gets a stream of compressed zlib bytes, and returns the corresponding
uncompressed bytes when being read via the .read() method.
Expand Down Expand Up @@ -590,9 +723,7 @@ def is_known_hash(hash_type):

def get_hash(hash_type):
"""Return a hash class with an update method and a hexdigest method."""
known_hashes = {
'sha256': hashlib.sha256,
}
known_hashes = {'sha1': hashlib.sha1, 'sha256': hashlib.sha256}

try:
return known_hashes[hash_type]
Expand Down Expand Up @@ -786,3 +917,145 @@ def compute_hash_and_size(stream, hash_type):
size += len(next_chunk)

return hasher.hexdigest(), size


def detect_where_sorted(left_iterator, right_iterator, left_key=None): # pylint: disable=too-many-branches, too-many-statements
"""Generator that loops in alternation (but only once each) the two iterators and yields an element, specifying if
it's only on the left, only on the right, or in both.

.. note:: IMPORTANT! The two iterators MUST return unique and sorted results.

.. note:: if it's on both, the one on the left is returned.

This function will check and raise a ValueError if it detects non-unique or non-sorted elements.
HOWEVER, this exception is raised only at the first occurrence of the issue, that can be very late in the execution,
so if you process results in a streamed way, please ensure that you pass sorted iterators.

:param left_iterator: a left iterator
:param right_iterator: a right iterator
:param left_key: if specified, it's a lambda that determines how to process each element
of the left iterator when comparing with the right iterator. For instance, the left
iterator might be a tuple, whose first element is a hash key, while the right iterator
just a list of hash keys. In this case, left_key could be defined as a lambda returning
the first element of the tuple.
Note that when the element is in both iterators, the left one is returned (i.e. the
full tuple, in this example).
"""
left_exhausted = False
right_exhausted = False

if left_key is None:
left_key = lambda x: x

# Convert first in iterators (in case they are, e.g., lists)
left_iterator = iter(left_iterator)
right_iterator = iter(right_iterator)

try:
last_left = next(left_iterator)
except StopIteration:
left_exhausted = True

try:
last_right = next(right_iterator)
except StopIteration:
right_exhausted = True

if left_exhausted and right_exhausted:
# Nothing to be done, both iterators are empty
return

now_left = True
if left_exhausted or (not right_exhausted and left_key(last_left) > last_right):
now_left = False # I want the 'current' (now) to be behind or at the same position of the other at any time

while not (left_exhausted and right_exhausted):
advance_both = False
if now_left:
if right_exhausted:
yield last_left, Location.LEFTONLY
else:
if left_key(last_left) == last_right:
# They are equal: add to intersection and continue
yield last_left, Location.BOTH
# I need to consume and advance on both iterators at the next iteration
advance_both = True
elif left_key(last_left) < last_right:
# the new entry (last_left) is still smaller: it's on the left only
yield last_left, Location.LEFTONLY
else:
# the new entry (last_left) is now larger: then, last_right is only on the right
# and I switch to now_right
yield last_right, Location.RIGHTONLY
now_left = False
else:
if left_exhausted:
yield last_right, Location.RIGHTONLY
else:
if left_key(last_left) == last_right:
# They are equal: add to intersection and continue
yield last_left, Location.BOTH
# I need to consume and advance on both iterators at the next iteration
advance_both = True
elif left_key(last_left) > last_right:
# the new entry (last_right) is still smaller: it's on the right only
yield last_right, Location.RIGHTONLY
else:
# the new entry (last_right) is now larger: then, last_left is only on the left
# and I switch to now_left
yield last_left, Location.LEFTONLY
now_left = True

# When we are here: if now_left, then last_left has been inserted in one of the lists;
# if not now_left, then last_right has been insterted in one of the lists.
# If advance both, they both can be discarded. So if I exhausted an iterator, I am not losing
# any entry.

# I will need to cache the old value, see comments below in the `except StopIteration` block
new_now_left = now_left
if now_left or advance_both:
try:
new = next(left_iterator)
if left_key(new) <= left_key(last_left):
raise ValueError(
"The left iterator does not return sorted unique entries, I got '{}' after '{}'".format(
left_key(new), left_key(last_left)
)
)
last_left = new
except StopIteration:
left_exhausted = True
# I need to store in a different variable, otherwise in this case
# I would also enter the next iteration even if advance_both is False!
new_now_left = False

if not now_left or advance_both:
try:
new = next(right_iterator)
if new <= last_right:
raise ValueError(
"The right iterator does not return sorted unique entries, I got '{}' after '{}'".format(
new, last_right
)
)
last_right = new
except StopIteration:
right_exhausted = True
# For consistency, also here I set new_now_left
new_now_left = True

# Set the new now_left value
now_left = new_now_left


def yield_first_element(iterator):
"""Given an iterator that returns a tuple, return an iterator that yields only the first element of the tuple."""
for elem in iterator:
yield elem[0]


def merge_sorted(iterator1, iterator2):
"""Given two sorted iterators, return another sorted iterator being the union of the two."""
for item, _ in detect_where_sorted(iterator1, iterator2):
# Whereever it is (only left, only right, on both) I return the object.
yield item
33 changes: 33 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,39 @@ def pytest_generate_tests(metafunc):
metafunc.parametrize('concurrency_repetition_index', range(metafunc.config.option.concurrency_repetitions))


@pytest.fixture(scope='function')
def callback_instance():
"""Return the CallbackClass for the tests."""

class CallbackClass:
"""Class that manages the callback and checks that it is correctly called."""

def __init__(self):
"""Initialise the class."""
self.current_action = None
self.performed_actions = []

def callback(self, action, value):
"""Check how the callback is called."""

if action == 'init':
assert self.current_action is None, "Starting a new action '{}' without closing the old one {}".format(
action, self.current_action
)
self.current_action = {'start_value': value, 'value': 0}
elif action == 'update':
# Track the current position
self.current_action['value'] += value
elif action == 'close':
# Add to list of performed actions
self.performed_actions.append(self.current_action)
self.current_action = None
else:
raise AssertionError("Unknown action '{}'".format(action))

yield CallbackClass()


@pytest.fixture(scope='function')
def temp_container(temp_dir): # pylint: disable=redefined-outer-name
"""Return an object-store container in a given temporary directory.
Expand Down
Loading