-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xarray.backends refactor #2261
xarray.backends refactor #2261
Changes from 7 commits
4faaf3a
c82a38c
7a55a30
51463dd
23e132f
8fc8183
422944b
aea0a1a
4366c0b
f35b7e7
057cad2
0f3e656
1a0cc10
8784e6b
83d9b10
a0074ff
062ba96
2d41b29
2adf486
76f151c
769f079
3e97264
5e67efe
8dc77c4
1d38335
6350ca6
4aa0df7
67377c7
8c00f44
2a5d1f0
a6c170b
009e30d
14118ea
c778488
fe14ebf
f1026ce
e13406b
465dfae
55d35c8
c8fbadc
ede8ef0
220c302
36f1156
c6f43dd
8916bc7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import contextlib | ||
import threading | ||
|
||
from .lru_cache import LRUCache | ||
|
||
|
||
# Global cache for storing open files. | ||
FILE_CACHE = LRUCache(512, on_evict=lambda k, v: v.close()) | ||
|
||
# TODO(shoyer): add an option (xarray.set_options) for resizing the cache. | ||
# Note: the cache has a minimum size of one. | ||
|
||
|
||
class FileManager(object): | ||
"""Wrapper for automatically opening and closing file objects. | ||
|
||
Unlike files, FileManager objects can be safely pickled and passed between | ||
processes. They should be explicitly closed to release resources, but | ||
a per-process least-recently-used cache for open files ensures that you can | ||
safely create arbitrarily large numbers of FileManager objects. | ||
|
||
Example usage: | ||
|
||
manager = FileManager(open, 'example.txt', mode='w') | ||
with manager.acquire() as f: | ||
f.write(...) | ||
manager.close() | ||
""" | ||
|
||
def __init__(self, opener, *args, **kwargs): | ||
"""Initialize a FileManager. | ||
|
||
Parameters | ||
---------- | ||
opener : callable | ||
Function that when called like ``opener(*args, **kwargs)`` returns | ||
an open file object. The file object must implement a ``close()`` | ||
method. | ||
*args | ||
Positional arguments for opener. A ``mode`` argument should be | ||
provided as a keyword argument (see below). | ||
**kwargs | ||
Keyword arguments for opener. The keyword argument ``mode`` has | ||
special handling if it is provided with a value of 'w': on all | ||
calls after the first, it is changed to 'a' instead to avoid | ||
overriding the newly created file. All argument values must be | ||
hashable. | ||
""" | ||
self._opener = opener | ||
self._args = args | ||
self._kwargs = kwargs | ||
self._key = self._make_key() | ||
self._lock = threading.RLock() | ||
|
||
def _make_key(self): | ||
return _make_key(self._opener, self._args, self._kwargs) | ||
|
||
@contextlib.contextmanager | ||
def acquire(self): | ||
"""Context manager for acquiring a file object. | ||
|
||
A new file is only opened if it has expired from the | ||
least-recently-used cache. | ||
|
||
This method uses a reentrant lock, which ensures that it is | ||
thread-safe. You can safely acquire a file in multiple threads at the | ||
same time, as long as the underlying file object is thread-safe. | ||
|
||
Yields | ||
------ | ||
Open file object, as returned by ``opener(*args, **kwargs)``. | ||
""" | ||
with self._lock: | ||
try: | ||
file = FILE_CACHE[self._key] | ||
except KeyError: | ||
file = self._opener(*self._args, **self._kwargs) | ||
if self._kwargs.get('mode') == 'w': | ||
# ensure file doesn't get overriden when opened again | ||
self._kwargs['mode'] = 'a' | ||
self._key = self._make_key() | ||
FILE_CACHE[self._key] = file | ||
yield file | ||
|
||
def close(self): | ||
"""Explicitly close any associated file object (if necessary).""" | ||
file = FILE_CACHE.pop(self._key, default=None) | ||
if file is not None: | ||
file.close() | ||
|
||
def __getstate__(self): | ||
"""State for pickling.""" | ||
return (self._opener, self._args, self._kwargs) | ||
|
||
def __setstate__(self, state): | ||
"""Restore from a pickle.""" | ||
opener, args, kwargs = state | ||
self.__init__(opener, *args, **kwargs) | ||
|
||
|
||
class _HashedSequence(list): | ||
"""Speedup repeated look-ups by caching hash values. | ||
|
||
Based on what Python uses internally in functools.lru_cache. | ||
|
||
Python doesn't perform this optimization automatically: | ||
https://bugs.python.org/issue1462796 | ||
""" | ||
|
||
def __init__(self, tuple_value): | ||
self[:] = tuple_value | ||
self.hashvalue = hash(tuple_value) | ||
|
||
def __hash__(self): | ||
return self.hashvalue | ||
|
||
|
||
def _make_key(opener, args, kwargs): | ||
"""Make a key for caching files in the LRU cache.""" | ||
value = (opener, args, tuple(sorted(kwargs.items()))) | ||
return _HashedSequence(value) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
import collections | ||
import threading | ||
|
||
|
||
class LRUCache(collections.MutableMapping): | ||
"""Thread-safe LRUCache based on an OrderedDict. | ||
|
||
All dict operations (__getitem__, __setitem__, __contains__) update the | ||
priority of the relevant key and take O(1) time. The dict is iterated over | ||
in order from the oldest to newest key, which means that a complete pass | ||
over the dict should not affect the order of any entries. | ||
|
||
When a new item is set and the maximum size of the cache is exceeded, the | ||
oldest item is dropped and called with ``on_evict(key, value)``. | ||
|
||
The ``maxsize`` property can be used to view or adjust the capacity of | ||
the cache, e.g., ``cache.maxsize = new_size``. | ||
""" | ||
def __init__(self, maxsize, on_evict=None): | ||
""" | ||
Parameters | ||
---------- | ||
maxsize : int | ||
Integer maximum number of items to hold in the cache. | ||
on_evict: callable, optional | ||
Function to call like ``on_evict(key, value)`` when items are | ||
evicted. | ||
""" | ||
if maxsize < 0: | ||
raise ValueError('maxsize must be non-negative') | ||
self._maxsize = maxsize | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we enforce maxsize is an integer? I'm thinking that it may be easy to see There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
self._on_evict = on_evict | ||
self._cache = collections.OrderedDict() | ||
self._lock = threading.RLock() | ||
|
||
def __getitem__(self, key): | ||
# record recent use of the key by moving it to the front of the list | ||
with self._lock: | ||
value = self._cache[key] | ||
# On Python 3, could just use: self._cache.move_to_end(key) | ||
del self._cache[key] | ||
self._cache[key] = value | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thoughts on using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or maybe a little helper function in pycompat that we can cleanup when python 2 is dropped. def move_to_end(cache, key):
try:
cache.move_to_end(key)
except AttributeError:
del self._cache[key]
self._cache[key] = value There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, easy enough. Done. |
||
return value | ||
|
||
def _maybe_shrink(self, capacity): | ||
"""Shrink the cache if necessary, evicting the oldest items.""" | ||
while len(self._cache) > capacity: | ||
key, value = self._cache.popitem(last=False) | ||
if self._on_evict is not None: | ||
self._on_evict(key, value) | ||
|
||
def __setitem__(self, key, value): | ||
with self._lock: | ||
if key in self._cache: | ||
# insert the new value at the end | ||
del self._cache[key] | ||
self._cache[key] = value | ||
elif self._maxsize: | ||
# make room if necessary | ||
self._maybe_shrink(self._maxsize - 1) | ||
self._cache[key] = value | ||
elif self._on_evict is not None: | ||
# not saving, immediately evict | ||
self._on_evict(key, value) | ||
|
||
def __delitem__(self, key): | ||
del self._cache[key] | ||
|
||
def __iter__(self): | ||
# create a list, so accessing the cache during iteration cannot change | ||
# the iteration order | ||
return iter(list(self._cache)) | ||
|
||
def __len__(self): | ||
return len(self._cache) | ||
|
||
@property | ||
def maxsize(self): | ||
"""Maximum number of items can be held in the cache.""" | ||
return self._maxsize | ||
|
||
@maxsize.setter | ||
def maxsize(self, size): | ||
"""Resize the cache, evicting the oldest items if necessary.""" | ||
if size < 0: | ||
raise ValueError('maxsize must be non-negative') | ||
with self._lock: | ||
self._maybe_shrink(size) | ||
self._maxsize = size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I want to use dependency injection for the cache (e.g.,
cache=FILE_CACHE
), which unfortunately means that we'll need to change the signature here from using**kwargs
.Any opinions on what this should look like? I'm thinking maybe: